diff --git a/milli/src/update/new/extract/cache.rs b/milli/src/update/new/extract/cache.rs index e1dbc601c..f34d049f9 100644 --- a/milli/src/update/new/extract/cache.rs +++ b/milli/src/update/new/extract/cache.rs @@ -151,10 +151,7 @@ impl<'extractor> CboCachedSorter<'extractor> { .iter_mut() .enumerate() .map(|(bucket, map)| { - let file = tempfile::tempfile()?; - let writer = create_writer(CompressionType::None, None, file); - let spilled = writer_into_reader(writer)?; - Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled }) + Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled: Vec::new() }) }) .collect(), InnerCaches::Spilling(SpillingCaches { caches, spilled_entries, .. }) => caches @@ -162,10 +159,13 @@ impl<'extractor> CboCachedSorter<'extractor> { .zip(mem::take(spilled_entries)) .enumerate() .map(|(bucket, (map, sorter))| { - let file = tempfile::tempfile()?; - let mut writer = create_writer(CompressionType::None, None, file); - sorter.write_into_stream_writer(&mut writer)?; - let spilled = writer_into_reader(writer)?; + let spilled = sorter + .into_reader_cursors()? + .into_iter() + .map(ReaderCursor::into_inner) + .map(BufReader::new) + .map(|bufreader| grenad::Reader::new(bufreader).map_err(Into::into)) + .collect::>()?; Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled }) }) .collect(), @@ -275,7 +275,7 @@ impl<'extractor> SpillingCaches<'extractor> { } RawEntryMut::Vacant(_entry) => { let deladd = DelAddRoaringBitmap::new_del_u32(n); - spill_entry_to_disk( + spill_entry_to_sorter( &mut self.spilled_entries[bucket], &mut self.deladd_buffer, &mut self.cbo_buffer, @@ -302,7 +302,7 @@ impl<'extractor> SpillingCaches<'extractor> { } RawEntryMut::Vacant(_entry) => { let deladd = DelAddRoaringBitmap::new_add_u32(n); - spill_entry_to_disk( + spill_entry_to_sorter( &mut self.spilled_entries[bucket], &mut self.deladd_buffer, &mut self.cbo_buffer, @@ -327,7 +327,7 @@ fn compute_bucket_from_hash(buckets: usize, hash: u64) -> usize { hash as usize % buckets } -fn spill_entry_to_disk( +fn spill_entry_to_sorter( spilled_entries: &mut grenad::Sorter, deladd_buffer: &mut Vec, cbo_buffer: &mut Vec, @@ -367,7 +367,24 @@ fn spill_entry_to_disk( pub struct FrozenCache<'a, 'extractor> { bucket: usize, cache: FrozenMap<'a, 'extractor, &'extractor [u8], DelAddRoaringBitmap, DefaultHashBuilder>, - spilled: grenad::Reader>, + spilled: Vec>>, +} + +pub fn transpose_and_freeze_caches<'a, 'extractor>( + caches: &'a mut [Vec>], +) -> Result>>> { + let width = caches.get(0).map(Vec::len).unwrap_or(0); + let mut bucket_caches: Vec<_> = iter::repeat_with(Vec::new).take(width).collect(); + + for thread_caches in caches { + for cache in thread_caches.iter_mut() { + for frozen in cache.freeze()? { + bucket_caches[frozen.bucket].push(frozen); + } + } + } + + Ok(bucket_caches) } /// Merges the caches that must be all associated to the same bucket. @@ -379,19 +396,19 @@ pub fn merge_caches(frozen: Vec, mut iter: F) -> Result<()> where F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>, { + let mut maps = Vec::new(); + let mut readers = Vec::new(); let mut current_bucket = None; - let (mut maps, spilled): (Vec<_>, Vec<_>) = frozen - .into_iter() - .map(|FrozenCache { bucket, cache, spilled }| { - assert_eq!(*current_bucket.get_or_insert(bucket), bucket); - (cache, spilled) - }) - .collect(); + for FrozenCache { bucket, cache, ref mut spilled } in frozen { + assert_eq!(*current_bucket.get_or_insert(bucket), bucket); + maps.push(cache); + readers.append(spilled); + } // First manage the spilled entries by looking into the HashMaps, // merge them and mark them as dummy. let mut heap = BinaryHeap::new(); - for (source_index, source) in spilled.into_iter().enumerate() { + for (source_index, source) in readers.into_iter().enumerate() { let mut cursor = source.into_cursor()?; if cursor.move_on_next()?.is_some() { heap.push(Entry { cursor, source_index });