diff --git a/milli/src/update/new/extract/cache.rs b/milli/src/update/new/extract/cache.rs index 94722cd97..226e34573 100644 --- a/milli/src/update/new/extract/cache.rs +++ b/milli/src/update/new/extract/cache.rs @@ -3,20 +3,21 @@ use std::collections::binary_heap::PeekMut; use std::collections::BinaryHeap; use std::fs::File; use std::hash::BuildHasher; +use std::io::BufReader; use std::{io, iter, mem}; use bumpalo::Bump; -use grenad::ReaderCursor; +use grenad::{CompressionType, ReaderCursor}; use hashbrown::hash_map::RawEntryMut; -use hashbrown::{DefaultHashBuilder, HashMap, HashSet}; +use hashbrown::{DefaultHashBuilder, HashMap}; use raw_collections::map::FrozenMap; use roaring::RoaringBitmap; use crate::update::del_add::{DelAdd, KvWriterDelAdd}; use crate::update::new::indexer::document_changes::MostlySend; use crate::update::new::KvReaderDelAdd; -use crate::update::MergeDeladdCboRoaringBitmaps; -use crate::CboRoaringBitmapCodec; +use crate::update::{create_writer, writer_into_reader, MergeDeladdCboRoaringBitmaps}; +use crate::{CboRoaringBitmapCodec, Result}; // # How the Merge Algorithm works // @@ -77,7 +78,7 @@ enum InnerCaches<'extractor> { } impl<'extractor> CboCachedSorter<'extractor> { - pub fn new_in(buckets: usize, alloc: &'extractor Bump) -> Self { + pub fn new_in(buckets: usize, max_memory: usize, alloc: &'extractor Bump) -> Self { Self { hasher: DefaultHashBuilder::default(), caches: InnerCaches::Normal(NormalCaches { @@ -132,8 +133,29 @@ impl<'extractor> CboCachedSorter<'extractor> { Ok(()) } - pub fn freeze(&mut self) -> Vec<()> { - todo!() + pub fn freeze(&mut self) -> Result>> { + match &mut self.caches { + InnerCaches::Normal(NormalCaches { caches }) => caches + .iter_mut() + .map(|map| { + let file = tempfile::tempfile()?; + let writer = create_writer(CompressionType::None, None, file); + let spilled = writer_into_reader(writer)?; + Ok(FrozenCache { cache: FrozenMap::new(map), spilled }) + }) + .collect(), + InnerCaches::Spilling(SpillingCaches { caches, spilled_entries, .. }) => caches + .iter_mut() + .zip(mem::take(spilled_entries)) + .map(|(map, sorter)| { + let file = tempfile::tempfile()?; + let mut writer = create_writer(CompressionType::None, None, file); + sorter.write_into_stream_writer(&mut writer)?; + let spilled = writer_into_reader(writer)?; + Ok(FrozenCache { cache: FrozenMap::new(map), spilled }) + }) + .collect(), + } } } @@ -328,17 +350,19 @@ fn spill_entry_to_disk( pub struct FrozenCache<'a, 'extractor> { cache: FrozenMap<'a, 'extractor, &'extractor [u8], DelAddRoaringBitmap, DefaultHashBuilder>, - spilled: grenad::Reader, + spilled: grenad::Reader>, } -pub fn merge_me(frozen: Vec, mut iter: F) -> crate::Result<()> +/// Merges the caches that must be all associated to the same bucket. +pub fn merge_caches(frozen: Vec, mut iter: F) -> Result<()> where - F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> crate::Result<()>, + F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>, { let (mut maps, spilled): (Vec<_>, Vec<_>) = frozen.into_iter().map(|FrozenCache { cache, spilled }| (cache, spilled)).collect(); - // First manage the spilled entries by looking into the HashMaps and then merge them. + // First manage the spilled entries by looking into the HashMaps, + // merge them and mark them as dummy. let mut heap = BinaryHeap::new(); for (source_index, source) in spilled.into_iter().enumerate() { let mut cursor = source.into_cursor()?; @@ -395,9 +419,27 @@ where } } - // Then manage the content on the HashMap that weren't taken (mem::take). + // Then manage the content on the HashMap entries that weren't taken (mem::take). + while let Some(mut map) = maps.pop() { + for (key, output) in map.iter_mut() { + let mut output = mem::replace(output, DelAddRoaringBitmap::dummy()); - todo!() + // Make sure we don't try to work with entries already managed by the spilled + if !output.is_dummy() { + for rhs in maps.iter_mut() { + if let Some(new) = rhs.get_mut(key) { + let new = mem::replace(new, DelAddRoaringBitmap::dummy()); + output.merge(new); + } + } + + // We send the merged entry outside. + (iter)(key, output)?; + } + } + } + + Ok(()) } struct Entry { @@ -454,6 +496,11 @@ impl DelAddRoaringBitmap { DelAddRoaringBitmap { del: None, add: None } } + fn is_dummy(&self) -> bool { + let DelAddRoaringBitmap { del, add } = self; + del.is_none() && add.is_none() + } + fn new_del_add_u32(n: u32) -> Self { DelAddRoaringBitmap { del: Some(RoaringBitmap::from([n])),