diff --git a/crates/milli/src/update/new/extract/cache.rs b/crates/milli/src/update/new/extract/cache.rs index 09ca60211..e2c8bb5fe 100644 --- a/crates/milli/src/update/new/extract/cache.rs +++ b/crates/milli/src/update/new/extract/cache.rs @@ -177,12 +177,12 @@ impl<'extractor> BalancedCaches<'extractor> { Ok(()) } - pub fn freeze(&mut self) -> Result>> { + pub fn freeze(&mut self, source_id: usize) -> Result>> { match &mut self.caches { InnerCaches::Normal(NormalCaches { caches }) => caches .iter_mut() .enumerate() - .map(|(bucket, map)| { + .map(|(bucket_id, map)| { // safety: we are transmuting the Bbbul into a FrozenBbbul // that are the same size. let map = unsafe { @@ -201,14 +201,19 @@ impl<'extractor> BalancedCaches<'extractor> { >, >(map) }; - Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled: Vec::new() }) + Ok(FrozenCache { + source_id, + bucket_id, + cache: FrozenMap::new(map), + spilled: Vec::new(), + }) }) .collect(), InnerCaches::Spilling(SpillingCaches { caches, spilled_entries, .. }) => caches .iter_mut() .zip(mem::take(spilled_entries)) .enumerate() - .map(|(bucket, (map, sorter))| { + .map(|(bucket_id, (map, sorter))| { let spilled = sorter .into_reader_cursors()? .into_iter() @@ -234,7 +239,7 @@ impl<'extractor> BalancedCaches<'extractor> { >, >(map) }; - Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled }) + Ok(FrozenCache { source_id, bucket_id, cache: FrozenMap::new(map), spilled }) }) .collect(), } @@ -440,7 +445,8 @@ fn spill_entry_to_sorter( } pub struct FrozenCache<'a, 'extractor> { - bucket: usize, + bucket_id: usize, + source_id: usize, cache: FrozenMap< 'a, 'extractor, @@ -457,9 +463,9 @@ pub fn transpose_and_freeze_caches<'a, 'extractor>( let width = caches.first().map(BalancedCaches::buckets).unwrap_or(0); let mut bucket_caches: Vec<_> = iter::repeat_with(Vec::new).take(width).collect(); - for thread_cache in caches { - for frozen in thread_cache.freeze()? { - bucket_caches[frozen.bucket].push(frozen); + for (thread_index, thread_cache) in caches.iter_mut().enumerate() { + for frozen in thread_cache.freeze(thread_index)? { + bucket_caches[frozen.bucket_id].push(frozen); } } @@ -477,21 +483,16 @@ where F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>, { let mut maps = Vec::new(); - let mut readers = Vec::new(); - let mut current_bucket = None; - for FrozenCache { bucket, cache, ref mut spilled } in frozen { - assert_eq!(*current_bucket.get_or_insert(bucket), bucket); - maps.push(cache); - readers.append(spilled); - } - - // First manage the spilled entries by looking into the HashMaps, - // merge them and mark them as dummy. let mut heap = BinaryHeap::new(); - for (source_index, source) in readers.into_iter().enumerate() { - let mut cursor = source.into_cursor()?; - if cursor.move_on_next()?.is_some() { - heap.push(Entry { cursor, source_index }); + let mut current_bucket = None; + for FrozenCache { source_id, bucket_id, cache, spilled } in frozen { + assert_eq!(*current_bucket.get_or_insert(bucket_id), bucket_id); + maps.push((source_id, cache)); + for reader in spilled { + let mut cursor = reader.into_cursor()?; + if cursor.move_on_next()?.is_some() { + heap.push(Entry { cursor, source_id }); + } } } @@ -508,25 +509,29 @@ where let mut output = DelAddRoaringBitmap::from_bytes(first_value)?; while let Some(mut entry) = heap.peek_mut() { - if let Some((key, _value)) = entry.cursor.current() { - if first_key == key { - let new = DelAddRoaringBitmap::from_bytes(first_value)?; - output = output.merge(new); - // When we are done we the current value of this entry move make - // it move forward and let the heap reorganize itself (on drop) - if entry.cursor.move_on_next()?.is_none() { - PeekMut::pop(entry); - } - } else { + if let Some((key, value)) = entry.cursor.current() { + if first_key != key { break; } + + let new = DelAddRoaringBitmap::from_bytes(value)?; + output = output.merge(new); + // When we are done we the current value of this entry move make + // it move forward and let the heap reorganize itself (on drop) + if entry.cursor.move_on_next()?.is_none() { + PeekMut::pop(entry); + } } } // Once we merged all of the spilled bitmaps we must also // fetch the entries from the non-spilled entries (the HashMaps). - for (map_index, map) in maps.iter_mut().enumerate() { - if first_entry.source_index != map_index { + for (source_id, map) in maps.iter_mut() { + debug_assert!( + !(map.get(first_key).is_some() && first_entry.source_id == *source_id), + "A thread should not have spiled a key that has been inserted in the cache" + ); + if first_entry.source_id != *source_id { if let Some(new) = map.get_mut(first_key) { output.union_and_clear_bbbul(new); } @@ -538,12 +543,12 @@ where // Don't forget to put the first entry back into the heap. if first_entry.cursor.move_on_next()?.is_some() { - heap.push(first_entry) + heap.push(first_entry); } } // Then manage the content on the HashMap entries that weren't taken (mem::take). - while let Some(mut map) = maps.pop() { + while let Some((_, mut map)) = maps.pop() { // Make sure we don't try to work with entries already managed by the spilled let mut ordered_entries: Vec<_> = map.iter_mut().filter(|(_, bbbul)| !bbbul.is_empty()).collect(); @@ -553,7 +558,7 @@ where let mut output = DelAddRoaringBitmap::empty(); output.union_and_clear_bbbul(bbbul); - for rhs in maps.iter_mut() { + for (_, rhs) in maps.iter_mut() { if let Some(new) = rhs.get_mut(key) { output.union_and_clear_bbbul(new); } @@ -569,14 +574,14 @@ where struct Entry { cursor: ReaderCursor, - source_index: usize, + source_id: usize, } impl Ord for Entry { fn cmp(&self, other: &Entry) -> Ordering { let skey = self.cursor.current().map(|(k, _)| k); let okey = other.cursor.current().map(|(k, _)| k); - skey.cmp(&okey).then(self.source_index.cmp(&other.source_index)).reverse() + skey.cmp(&okey).then(self.source_id.cmp(&other.source_id)).reverse() } }