diff --git a/crates/milli/src/update/new/extract/cache.rs b/crates/milli/src/update/new/extract/cache.rs index be077d142..ae5ade17e 100644 --- a/crates/milli/src/update/new/extract/cache.rs +++ b/crates/milli/src/update/new/extract/cache.rs @@ -566,6 +566,116 @@ where Ok(()) } +/// Merges the caches that must be all associated to the same bucket. +/// +/// It merges entries like the `merge_caches` function +pub fn merge_caches_alt(frozen: Vec, mut f: F) -> Result<()> +where + F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>, +{ + let mut maps = Vec::new(); + let mut readers = Vec::new(); + let mut current_bucket = None; + for FrozenCache { bucket, cache, ref mut spilled } in frozen { + assert_eq!(*current_bucket.get_or_insert(bucket), bucket); + maps.push(cache); + readers.append(spilled); + } + + // First manage the spilled entries by looking into the HashMaps, + // merge them and mark them as dummy. + let mut heap = BinaryHeap::new(); + for (source_index, source) in readers.into_iter().enumerate() { + let mut cursor = source.into_cursor()?; + if cursor.move_on_next()?.is_some() { + heap.push(Entry { cursor, source_index }); + } + } + + loop { + let mut first_entry = match heap.pop() { + Some(entry) => entry, + None => break, + }; + + let (first_key, first_value) = match first_entry.cursor.current() { + Some((key, value)) => (key, value), + None => break, + }; + + let mut output = DelAddRoaringBitmap::from_bytes(first_value)?; + while let Some(mut entry) = heap.peek_mut() { + if let Some((key, _value)) = entry.cursor.current() { + if first_key == key { + let new = DelAddRoaringBitmap::from_bytes(first_value)?; + output = output.merge(new); + // When we are done we the current value of this entry move make + // it move forward and let the heap reorganize itself (on drop) + if entry.cursor.move_on_next()?.is_none() { + PeekMut::pop(entry); + } + } else { + break; + } + } + } + + // Once we merged all of the spilled bitmaps we must also + // fetch the entries from the non-spilled entries (the HashMaps). + for (map_index, map) in maps.iter_mut().enumerate() { + if first_entry.source_index != map_index { + if let Some(new) = map.get_mut(first_key) { + output.union_and_clear_bbbul(new); + } + } + } + + // We send the merged entry outside. + (f)(first_key, output)?; + + // Don't forget to put the first entry back into the heap. + if first_entry.cursor.move_on_next()?.is_some() { + heap.push(first_entry) + } + } + + // Then manage the content on the HashMap entries that weren't taken (mem::take). + let order_count = 1000; + while let Some(mut map) = maps.pop() { + let mut iter = map.iter_mut(); + + loop { + let mut ordered_buffer: Vec<_> = iter.by_ref().take(order_count).collect(); + ordered_buffer.sort_unstable_by_key(|(key, _)| *key); + + if ordered_buffer.is_empty() { + break; + } + + for (key, bbbul) in ordered_buffer.drain(..) { + // Make sure we don't try to work with entries already managed by the spilled + if bbbul.is_empty() { + continue; + } + + let mut output = DelAddRoaringBitmap::empty(); + output.union_and_clear_bbbul(bbbul); + + for rhs in maps.iter_mut() { + if let Some(new) = rhs.get_mut(key) { + output.union_and_clear_bbbul(new); + } + } + + // We send the merged entry outside. + (f)(key, output)?; + } + } + } + + Ok(()) +} + struct Entry { cursor: ReaderCursor, source_index: usize,