Reduce the amount of work done by the frozen cache

This commit is contained in:
Clément Renault 2024-10-29 13:41:19 +01:00
parent 92b55bdc57
commit 82f6e3f3b9
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -151,10 +151,7 @@ impl<'extractor> CboCachedSorter<'extractor> {
.iter_mut()
.enumerate()
.map(|(bucket, map)| {
let file = tempfile::tempfile()?;
let writer = create_writer(CompressionType::None, None, file);
let spilled = writer_into_reader(writer)?;
Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled })
Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled: Vec::new() })
})
.collect(),
InnerCaches::Spilling(SpillingCaches { caches, spilled_entries, .. }) => caches
@ -162,10 +159,13 @@ impl<'extractor> CboCachedSorter<'extractor> {
.zip(mem::take(spilled_entries))
.enumerate()
.map(|(bucket, (map, sorter))| {
let file = tempfile::tempfile()?;
let mut writer = create_writer(CompressionType::None, None, file);
sorter.write_into_stream_writer(&mut writer)?;
let spilled = writer_into_reader(writer)?;
let spilled = sorter
.into_reader_cursors()?
.into_iter()
.map(ReaderCursor::into_inner)
.map(BufReader::new)
.map(|bufreader| grenad::Reader::new(bufreader).map_err(Into::into))
.collect::<Result<_>>()?;
Ok(FrozenCache { bucket, cache: FrozenMap::new(map), spilled })
})
.collect(),
@ -275,7 +275,7 @@ impl<'extractor> SpillingCaches<'extractor> {
}
RawEntryMut::Vacant(_entry) => {
let deladd = DelAddRoaringBitmap::new_del_u32(n);
spill_entry_to_disk(
spill_entry_to_sorter(
&mut self.spilled_entries[bucket],
&mut self.deladd_buffer,
&mut self.cbo_buffer,
@ -302,7 +302,7 @@ impl<'extractor> SpillingCaches<'extractor> {
}
RawEntryMut::Vacant(_entry) => {
let deladd = DelAddRoaringBitmap::new_add_u32(n);
spill_entry_to_disk(
spill_entry_to_sorter(
&mut self.spilled_entries[bucket],
&mut self.deladd_buffer,
&mut self.cbo_buffer,
@ -327,7 +327,7 @@ fn compute_bucket_from_hash(buckets: usize, hash: u64) -> usize {
hash as usize % buckets
}
fn spill_entry_to_disk(
fn spill_entry_to_sorter(
spilled_entries: &mut grenad::Sorter<MergeDeladdCboRoaringBitmaps>,
deladd_buffer: &mut Vec<u8>,
cbo_buffer: &mut Vec<u8>,
@ -367,7 +367,24 @@ fn spill_entry_to_disk(
pub struct FrozenCache<'a, 'extractor> {
bucket: usize,
cache: FrozenMap<'a, 'extractor, &'extractor [u8], DelAddRoaringBitmap, DefaultHashBuilder>,
spilled: grenad::Reader<BufReader<File>>,
spilled: Vec<grenad::Reader<BufReader<File>>>,
}
pub fn transpose_and_freeze_caches<'a, 'extractor>(
caches: &'a mut [Vec<CboCachedSorter<'extractor>>],
) -> Result<Vec<Vec<FrozenCache<'a, 'extractor>>>> {
let width = caches.get(0).map(Vec::len).unwrap_or(0);
let mut bucket_caches: Vec<_> = iter::repeat_with(Vec::new).take(width).collect();
for thread_caches in caches {
for cache in thread_caches.iter_mut() {
for frozen in cache.freeze()? {
bucket_caches[frozen.bucket].push(frozen);
}
}
}
Ok(bucket_caches)
}
/// Merges the caches that must be all associated to the same bucket.
@ -379,19 +396,19 @@ pub fn merge_caches<F>(frozen: Vec<FrozenCache>, mut iter: F) -> Result<()>
where
F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>,
{
let mut maps = Vec::new();
let mut readers = Vec::new();
let mut current_bucket = None;
let (mut maps, spilled): (Vec<_>, Vec<_>) = frozen
.into_iter()
.map(|FrozenCache { bucket, cache, spilled }| {
assert_eq!(*current_bucket.get_or_insert(bucket), bucket);
(cache, spilled)
})
.collect();
for FrozenCache { bucket, cache, ref mut spilled } in frozen {
assert_eq!(*current_bucket.get_or_insert(bucket), bucket);
maps.push(cache);
readers.append(spilled);
}
// First manage the spilled entries by looking into the HashMaps,
// merge them and mark them as dummy.
let mut heap = BinaryHeap::new();
for (source_index, source) in spilled.into_iter().enumerate() {
for (source_index, source) in readers.into_iter().enumerate() {
let mut cursor = source.into_cursor()?;
if cursor.move_on_next()?.is_some() {
heap.push(Entry { cursor, source_index });