Finalize the cache freeze function

This commit is contained in:
Clément Renault 2024-10-28 18:29:26 +01:00
parent 498f51c7b3
commit 9fcf51dcc6
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -3,20 +3,21 @@ use std::collections::binary_heap::PeekMut;
use std::collections::BinaryHeap;
use std::fs::File;
use std::hash::BuildHasher;
use std::io::BufReader;
use std::{io, iter, mem};
use bumpalo::Bump;
use grenad::ReaderCursor;
use grenad::{CompressionType, ReaderCursor};
use hashbrown::hash_map::RawEntryMut;
use hashbrown::{DefaultHashBuilder, HashMap, HashSet};
use hashbrown::{DefaultHashBuilder, HashMap};
use raw_collections::map::FrozenMap;
use roaring::RoaringBitmap;
use crate::update::del_add::{DelAdd, KvWriterDelAdd};
use crate::update::new::indexer::document_changes::MostlySend;
use crate::update::new::KvReaderDelAdd;
use crate::update::MergeDeladdCboRoaringBitmaps;
use crate::CboRoaringBitmapCodec;
use crate::update::{create_writer, writer_into_reader, MergeDeladdCboRoaringBitmaps};
use crate::{CboRoaringBitmapCodec, Result};
// # How the Merge Algorithm works
//
@ -77,7 +78,7 @@ enum InnerCaches<'extractor> {
}
impl<'extractor> CboCachedSorter<'extractor> {
pub fn new_in(buckets: usize, alloc: &'extractor Bump) -> Self {
pub fn new_in(buckets: usize, max_memory: usize, alloc: &'extractor Bump) -> Self {
Self {
hasher: DefaultHashBuilder::default(),
caches: InnerCaches::Normal(NormalCaches {
@ -132,8 +133,29 @@ impl<'extractor> CboCachedSorter<'extractor> {
Ok(())
}
pub fn freeze(&mut self) -> Vec<()> {
todo!()
pub fn freeze(&mut self) -> Result<Vec<FrozenCache<'_, 'extractor>>> {
match &mut self.caches {
InnerCaches::Normal(NormalCaches { caches }) => caches
.iter_mut()
.map(|map| {
let file = tempfile::tempfile()?;
let writer = create_writer(CompressionType::None, None, file);
let spilled = writer_into_reader(writer)?;
Ok(FrozenCache { cache: FrozenMap::new(map), spilled })
})
.collect(),
InnerCaches::Spilling(SpillingCaches { caches, spilled_entries, .. }) => caches
.iter_mut()
.zip(mem::take(spilled_entries))
.map(|(map, sorter)| {
let file = tempfile::tempfile()?;
let mut writer = create_writer(CompressionType::None, None, file);
sorter.write_into_stream_writer(&mut writer)?;
let spilled = writer_into_reader(writer)?;
Ok(FrozenCache { cache: FrozenMap::new(map), spilled })
})
.collect(),
}
}
}
@ -328,17 +350,19 @@ fn spill_entry_to_disk(
pub struct FrozenCache<'a, 'extractor> {
cache: FrozenMap<'a, 'extractor, &'extractor [u8], DelAddRoaringBitmap, DefaultHashBuilder>,
spilled: grenad::Reader<File>,
spilled: grenad::Reader<BufReader<File>>,
}
pub fn merge_me<F>(frozen: Vec<FrozenCache>, mut iter: F) -> crate::Result<()>
/// Merges the caches that must be all associated to the same bucket.
pub fn merge_caches<F>(frozen: Vec<FrozenCache>, mut iter: F) -> Result<()>
where
F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> crate::Result<()>,
F: for<'a> FnMut(&'a [u8], DelAddRoaringBitmap) -> Result<()>,
{
let (mut maps, spilled): (Vec<_>, Vec<_>) =
frozen.into_iter().map(|FrozenCache { cache, spilled }| (cache, spilled)).collect();
// First manage the spilled entries by looking into the HashMaps and then merge them.
// First manage the spilled entries by looking into the HashMaps,
// merge them and mark them as dummy.
let mut heap = BinaryHeap::new();
for (source_index, source) in spilled.into_iter().enumerate() {
let mut cursor = source.into_cursor()?;
@ -395,9 +419,27 @@ where
}
}
// Then manage the content on the HashMap that weren't taken (mem::take).
// Then manage the content on the HashMap entries that weren't taken (mem::take).
while let Some(mut map) = maps.pop() {
for (key, output) in map.iter_mut() {
let mut output = mem::replace(output, DelAddRoaringBitmap::dummy());
todo!()
// Make sure we don't try to work with entries already managed by the spilled
if !output.is_dummy() {
for rhs in maps.iter_mut() {
if let Some(new) = rhs.get_mut(key) {
let new = mem::replace(new, DelAddRoaringBitmap::dummy());
output.merge(new);
}
}
// We send the merged entry outside.
(iter)(key, output)?;
}
}
}
Ok(())
}
struct Entry<R> {
@ -454,6 +496,11 @@ impl DelAddRoaringBitmap {
DelAddRoaringBitmap { del: None, add: None }
}
fn is_dummy(&self) -> bool {
let DelAddRoaringBitmap { del, add } = self;
del.is_none() && add.is_none()
}
fn new_del_add_u32(n: u32) -> Self {
DelAddRoaringBitmap {
del: Some(RoaringBitmap::from([n])),