Fix the merge_caches_sorted function

This commit is contained in:
Kerollmops 2024-12-12 16:15:37 +01:00
parent c177210b1b
commit 2f3cc8cdd2
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -477,21 +477,16 @@ where
F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>, F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>,
{ {
let mut maps = Vec::new(); let mut maps = Vec::new();
let mut readers = Vec::new();
let mut current_bucket = None;
for FrozenCache { bucket, cache, ref mut spilled } in frozen {
assert_eq!(*current_bucket.get_or_insert(bucket), bucket);
maps.push(cache);
readers.append(spilled);
}
// First manage the spilled entries by looking into the HashMaps,
// merge them and mark them as dummy.
let mut heap = BinaryHeap::new(); let mut heap = BinaryHeap::new();
for (source_index, source) in readers.into_iter().enumerate() { let mut current_bucket = None;
let mut cursor = source.into_cursor()?; for FrozenCache { bucket, cache, spilled } in frozen {
assert_eq!(*current_bucket.get_or_insert(bucket), bucket);
maps.push((bucket, cache));
for reader in spilled {
let mut cursor = reader.into_cursor()?;
if cursor.move_on_next()?.is_some() { if cursor.move_on_next()?.is_some() {
heap.push(Entry { cursor, source_index }); heap.push(Entry { cursor, bucket });
}
} }
} }
@ -508,25 +503,25 @@ where
let mut output = DelAddRoaringBitmap::from_bytes(first_value)?; let mut output = DelAddRoaringBitmap::from_bytes(first_value)?;
while let Some(mut entry) = heap.peek_mut() { while let Some(mut entry) = heap.peek_mut() {
if let Some((key, _value)) = entry.cursor.current() { if let Some((key, value)) = entry.cursor.current() {
if first_key == key { if first_key != key {
let new = DelAddRoaringBitmap::from_bytes(first_value)?; break;
}
let new = DelAddRoaringBitmap::from_bytes(value)?;
output = output.merge(new); output = output.merge(new);
// When we are done we the current value of this entry move make // When we are done we the current value of this entry move make
// it move forward and let the heap reorganize itself (on drop) // it move forward and let the heap reorganize itself (on drop)
if entry.cursor.move_on_next()?.is_none() { if entry.cursor.move_on_next()?.is_none() {
PeekMut::pop(entry); PeekMut::pop(entry);
} }
} else {
break;
}
} }
} }
// Once we merged all of the spilled bitmaps we must also // Once we merged all of the spilled bitmaps we must also
// fetch the entries from the non-spilled entries (the HashMaps). // fetch the entries from the non-spilled entries (the HashMaps).
for (map_index, map) in maps.iter_mut().enumerate() { for (map_bucket, map) in maps.iter_mut() {
if first_entry.source_index != map_index { if first_entry.bucket != *map_bucket {
if let Some(new) = map.get_mut(first_key) { if let Some(new) = map.get_mut(first_key) {
output.union_and_clear_bbbul(new); output.union_and_clear_bbbul(new);
} }
@ -538,12 +533,12 @@ where
// Don't forget to put the first entry back into the heap. // Don't forget to put the first entry back into the heap.
if first_entry.cursor.move_on_next()?.is_some() { if first_entry.cursor.move_on_next()?.is_some() {
heap.push(first_entry) heap.push(first_entry);
} }
} }
// Then manage the content on the HashMap entries that weren't taken (mem::take). // Then manage the content on the HashMap entries that weren't taken (mem::take).
while let Some(mut map) = maps.pop() { while let Some((_, mut map)) = maps.pop() {
// Make sure we don't try to work with entries already managed by the spilled // Make sure we don't try to work with entries already managed by the spilled
let mut ordered_entries: Vec<_> = let mut ordered_entries: Vec<_> =
map.iter_mut().filter(|(_, bbbul)| !bbbul.is_empty()).collect(); map.iter_mut().filter(|(_, bbbul)| !bbbul.is_empty()).collect();
@ -553,7 +548,7 @@ where
let mut output = DelAddRoaringBitmap::empty(); let mut output = DelAddRoaringBitmap::empty();
output.union_and_clear_bbbul(bbbul); output.union_and_clear_bbbul(bbbul);
for rhs in maps.iter_mut() { for (_, rhs) in maps.iter_mut() {
if let Some(new) = rhs.get_mut(key) { if let Some(new) = rhs.get_mut(key) {
output.union_and_clear_bbbul(new); output.union_and_clear_bbbul(new);
} }
@ -569,14 +564,14 @@ where
struct Entry<R> { struct Entry<R> {
cursor: ReaderCursor<R>, cursor: ReaderCursor<R>,
source_index: usize, bucket: usize,
} }
impl<R> Ord for Entry<R> { impl<R> Ord for Entry<R> {
fn cmp(&self, other: &Entry<R>) -> Ordering { fn cmp(&self, other: &Entry<R>) -> Ordering {
let skey = self.cursor.current().map(|(k, _)| k); let skey = self.cursor.current().map(|(k, _)| k);
let okey = other.cursor.current().map(|(k, _)| k); let okey = other.cursor.current().map(|(k, _)| k);
skey.cmp(&okey).then(self.source_index.cmp(&other.source_index)).reverse() skey.cmp(&okey).then(self.bucket.cmp(&other.bucket)).reverse()
} }
} }