mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-22 12:40:04 +01:00
Merge #5159
5159: Fix the New Indexer Spilling r=irevoire a=Kerollmops
Fix two bugs in the merging of the spilled caches. Thanks to `@ManyTheFish` and `@irevoire` 👏
Co-authored-by: Kerollmops <clement@meilisearch.com>
Co-authored-by: ManyTheFish <many@meilisearch.com>
This commit is contained in:
commit
ba11121cfc
@ -177,12 +177,12 @@ impl<'extractor> BalancedCaches<'extractor> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn freeze(&mut self) -> Result<Vec<FrozenCache<'_, 'extractor>>> {
|
pub fn freeze(&mut self, source_id: usize) -> Result<Vec<FrozenCache<'_, 'extractor>>> {
|
||||||
match &mut self.caches {
|
match &mut self.caches {
|
||||||
InnerCaches::Normal(NormalCaches { caches }) => caches
|
InnerCaches::Normal(NormalCaches { caches }) => caches
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|(bucket, map)| {
|
.map(|(bucket_id, map)| {
|
||||||
// safety: we are transmuting the Bbbul into a FrozenBbbul
|
// safety: we are transmuting the Bbbul into a FrozenBbbul
|
||||||
// that are the same size.
|
// that are the same size.
|
||||||
let map = unsafe {
|
let map = unsafe {
|
||||||
@ -201,14 +201,19 @@ impl<'extractor> BalancedCaches<'extractor> {
|
|||||||
>,
|
>,
|
||||||
>(map)
|
>(map)
|
||||||
};
|
};
|
||||||
Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled: Vec::new() })
|
Ok(FrozenCache {
|
||||||
|
source_id,
|
||||||
|
bucket_id,
|
||||||
|
cache: FrozenMap::new(map),
|
||||||
|
spilled: Vec::new(),
|
||||||
|
})
|
||||||
})
|
})
|
||||||
.collect(),
|
.collect(),
|
||||||
InnerCaches::Spilling(SpillingCaches { caches, spilled_entries, .. }) => caches
|
InnerCaches::Spilling(SpillingCaches { caches, spilled_entries, .. }) => caches
|
||||||
.iter_mut()
|
.iter_mut()
|
||||||
.zip(mem::take(spilled_entries))
|
.zip(mem::take(spilled_entries))
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|(bucket, (map, sorter))| {
|
.map(|(bucket_id, (map, sorter))| {
|
||||||
let spilled = sorter
|
let spilled = sorter
|
||||||
.into_reader_cursors()?
|
.into_reader_cursors()?
|
||||||
.into_iter()
|
.into_iter()
|
||||||
@ -234,7 +239,7 @@ impl<'extractor> BalancedCaches<'extractor> {
|
|||||||
>,
|
>,
|
||||||
>(map)
|
>(map)
|
||||||
};
|
};
|
||||||
Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled })
|
Ok(FrozenCache { source_id, bucket_id, cache: FrozenMap::new(map), spilled })
|
||||||
})
|
})
|
||||||
.collect(),
|
.collect(),
|
||||||
}
|
}
|
||||||
@ -440,7 +445,8 @@ fn spill_entry_to_sorter(
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub struct FrozenCache<'a, 'extractor> {
|
pub struct FrozenCache<'a, 'extractor> {
|
||||||
bucket: usize,
|
bucket_id: usize,
|
||||||
|
source_id: usize,
|
||||||
cache: FrozenMap<
|
cache: FrozenMap<
|
||||||
'a,
|
'a,
|
||||||
'extractor,
|
'extractor,
|
||||||
@ -457,9 +463,9 @@ pub fn transpose_and_freeze_caches<'a, 'extractor>(
|
|||||||
let width = caches.first().map(BalancedCaches::buckets).unwrap_or(0);
|
let width = caches.first().map(BalancedCaches::buckets).unwrap_or(0);
|
||||||
let mut bucket_caches: Vec<_> = iter::repeat_with(Vec::new).take(width).collect();
|
let mut bucket_caches: Vec<_> = iter::repeat_with(Vec::new).take(width).collect();
|
||||||
|
|
||||||
for thread_cache in caches {
|
for (thread_index, thread_cache) in caches.iter_mut().enumerate() {
|
||||||
for frozen in thread_cache.freeze()? {
|
for frozen in thread_cache.freeze(thread_index)? {
|
||||||
bucket_caches[frozen.bucket].push(frozen);
|
bucket_caches[frozen.bucket_id].push(frozen);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -477,21 +483,16 @@ where
|
|||||||
F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>,
|
F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>,
|
||||||
{
|
{
|
||||||
let mut maps = Vec::new();
|
let mut maps = Vec::new();
|
||||||
let mut readers = Vec::new();
|
|
||||||
let mut current_bucket = None;
|
|
||||||
for FrozenCache { bucket, cache, ref mut spilled } in frozen {
|
|
||||||
assert_eq!(*current_bucket.get_or_insert(bucket), bucket);
|
|
||||||
maps.push(cache);
|
|
||||||
readers.append(spilled);
|
|
||||||
}
|
|
||||||
|
|
||||||
// First manage the spilled entries by looking into the HashMaps,
|
|
||||||
// merge them and mark them as dummy.
|
|
||||||
let mut heap = BinaryHeap::new();
|
let mut heap = BinaryHeap::new();
|
||||||
for (source_index, source) in readers.into_iter().enumerate() {
|
let mut current_bucket = None;
|
||||||
let mut cursor = source.into_cursor()?;
|
for FrozenCache { source_id, bucket_id, cache, spilled } in frozen {
|
||||||
|
assert_eq!(*current_bucket.get_or_insert(bucket_id), bucket_id);
|
||||||
|
maps.push((source_id, cache));
|
||||||
|
for reader in spilled {
|
||||||
|
let mut cursor = reader.into_cursor()?;
|
||||||
if cursor.move_on_next()?.is_some() {
|
if cursor.move_on_next()?.is_some() {
|
||||||
heap.push(Entry { cursor, source_index });
|
heap.push(Entry { cursor, source_id });
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -508,25 +509,29 @@ where
|
|||||||
|
|
||||||
let mut output = DelAddRoaringBitmap::from_bytes(first_value)?;
|
let mut output = DelAddRoaringBitmap::from_bytes(first_value)?;
|
||||||
while let Some(mut entry) = heap.peek_mut() {
|
while let Some(mut entry) = heap.peek_mut() {
|
||||||
if let Some((key, _value)) = entry.cursor.current() {
|
if let Some((key, value)) = entry.cursor.current() {
|
||||||
if first_key == key {
|
if first_key != key {
|
||||||
let new = DelAddRoaringBitmap::from_bytes(first_value)?;
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let new = DelAddRoaringBitmap::from_bytes(value)?;
|
||||||
output = output.merge(new);
|
output = output.merge(new);
|
||||||
// When we are done we the current value of this entry move make
|
// When we are done we the current value of this entry move make
|
||||||
// it move forward and let the heap reorganize itself (on drop)
|
// it move forward and let the heap reorganize itself (on drop)
|
||||||
if entry.cursor.move_on_next()?.is_none() {
|
if entry.cursor.move_on_next()?.is_none() {
|
||||||
PeekMut::pop(entry);
|
PeekMut::pop(entry);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Once we merged all of the spilled bitmaps we must also
|
// Once we merged all of the spilled bitmaps we must also
|
||||||
// fetch the entries from the non-spilled entries (the HashMaps).
|
// fetch the entries from the non-spilled entries (the HashMaps).
|
||||||
for (map_index, map) in maps.iter_mut().enumerate() {
|
for (source_id, map) in maps.iter_mut() {
|
||||||
if first_entry.source_index != map_index {
|
debug_assert!(
|
||||||
|
!(map.get(first_key).is_some() && first_entry.source_id == *source_id),
|
||||||
|
"A thread should not have spiled a key that has been inserted in the cache"
|
||||||
|
);
|
||||||
|
if first_entry.source_id != *source_id {
|
||||||
if let Some(new) = map.get_mut(first_key) {
|
if let Some(new) = map.get_mut(first_key) {
|
||||||
output.union_and_clear_bbbul(new);
|
output.union_and_clear_bbbul(new);
|
||||||
}
|
}
|
||||||
@ -538,12 +543,12 @@ where
|
|||||||
|
|
||||||
// Don't forget to put the first entry back into the heap.
|
// Don't forget to put the first entry back into the heap.
|
||||||
if first_entry.cursor.move_on_next()?.is_some() {
|
if first_entry.cursor.move_on_next()?.is_some() {
|
||||||
heap.push(first_entry)
|
heap.push(first_entry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Then manage the content on the HashMap entries that weren't taken (mem::take).
|
// Then manage the content on the HashMap entries that weren't taken (mem::take).
|
||||||
while let Some(mut map) = maps.pop() {
|
while let Some((_, mut map)) = maps.pop() {
|
||||||
// Make sure we don't try to work with entries already managed by the spilled
|
// Make sure we don't try to work with entries already managed by the spilled
|
||||||
let mut ordered_entries: Vec<_> =
|
let mut ordered_entries: Vec<_> =
|
||||||
map.iter_mut().filter(|(_, bbbul)| !bbbul.is_empty()).collect();
|
map.iter_mut().filter(|(_, bbbul)| !bbbul.is_empty()).collect();
|
||||||
@ -553,7 +558,7 @@ where
|
|||||||
let mut output = DelAddRoaringBitmap::empty();
|
let mut output = DelAddRoaringBitmap::empty();
|
||||||
output.union_and_clear_bbbul(bbbul);
|
output.union_and_clear_bbbul(bbbul);
|
||||||
|
|
||||||
for rhs in maps.iter_mut() {
|
for (_, rhs) in maps.iter_mut() {
|
||||||
if let Some(new) = rhs.get_mut(key) {
|
if let Some(new) = rhs.get_mut(key) {
|
||||||
output.union_and_clear_bbbul(new);
|
output.union_and_clear_bbbul(new);
|
||||||
}
|
}
|
||||||
@ -569,14 +574,14 @@ where
|
|||||||
|
|
||||||
struct Entry<R> {
|
struct Entry<R> {
|
||||||
cursor: ReaderCursor<R>,
|
cursor: ReaderCursor<R>,
|
||||||
source_index: usize,
|
source_id: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<R> Ord for Entry<R> {
|
impl<R> Ord for Entry<R> {
|
||||||
fn cmp(&self, other: &Entry<R>) -> Ordering {
|
fn cmp(&self, other: &Entry<R>) -> Ordering {
|
||||||
let skey = self.cursor.current().map(|(k, _)| k);
|
let skey = self.cursor.current().map(|(k, _)| k);
|
||||||
let okey = other.cursor.current().map(|(k, _)| k);
|
let okey = other.cursor.current().map(|(k, _)| k);
|
||||||
skey.cmp(&okey).then(self.source_index.cmp(&other.source_index)).reverse()
|
skey.cmp(&okey).then(self.source_id.cmp(&other.source_id)).reverse()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user