diff --git a/milli/src/update/new/extract/cache.rs b/milli/src/update/new/extract/cache.rs index 7a5f3cc4c..470aa6867 100644 --- a/milli/src/update/new/extract/cache.rs +++ b/milli/src/update/new/extract/cache.rs @@ -1,55 +1,68 @@ -use std::cell::RefCell; -use std::fmt::Write as _; use std::fs::File; -use std::io::{self, BufReader, BufWriter, Read as _, Write as _}; +use std::io::{self, BufReader, BufWriter, Read as _, Seek, Write as _}; use std::vec; -use bumpalo::Bump; -use grenad::{MergeFunction, Sorter}; use hashbrown::hash_map::RawEntryMut; use raw_collections::alloc::{RefBump, RefBytes}; -use roaring::bitmap::Statistics; use roaring::RoaringBitmap; +use tempfile::tempfile; -use crate::update::del_add::{DelAdd, KvWriterDelAdd}; +use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::new::indexer::document_changes::MostlySend; use crate::CboRoaringBitmapCodec; const KEY_SIZE: usize = 12; // #[derive(Debug)] -pub struct CboCachedSorter<'extractor, MF> { +pub struct CboCachedSorter<'extractor> { cache: hashbrown::HashMap< - // TODO check the size of it RefBytes<'extractor>, DelAddRoaringBitmap, hashbrown::DefaultHashBuilder, RefBump<'extractor>, >, alloc: RefBump<'extractor>, - sorter: Sorter, + spilled_entries: UnorderedEntries, deladd_buffer: Vec, cbo_buffer: Vec, - total_insertions: usize, - fitted_in_key: usize, } -impl<'extractor, MF> CboCachedSorter<'extractor, MF> { +// # How the Merge Algorithm works +// +// - Collect all hashmaps to the main thread +// - Iterator over all the hashmaps in the different threads +// - Each thread must take care of its own keys (regarding a hash number) +// - Also read the spilled content which are inside +// - Each thread must populate a local hashmap with the entries +// - Every thread send the merged content to the main writing thread +// +// ## Next Step +// +// - Define the size of the buckets in advance to make sure everything fits in memory. +// ``` +// let total_buckets = 32; +// (0..total_buckets).par_iter().for_each(|n| { +// let hash = todo!(); +// if hash % total_bucket == n { +// // take care of this key +// } +// }); +// ``` + +impl<'extractor> CboCachedSorter<'extractor> { /// TODO may add the capacity - pub fn new_in(sorter: Sorter, alloc: RefBump<'extractor>) -> Self { - CboCachedSorter { + pub fn new_in(alloc: RefBump<'extractor>) -> io::Result { + Ok(CboCachedSorter { cache: hashbrown::HashMap::new_in(RefBump::clone(&alloc)), alloc, - sorter, + spilled_entries: tempfile().map(UnorderedEntries::new)?, deladd_buffer: Vec::new(), cbo_buffer: Vec::new(), - total_insertions: 0, - fitted_in_key: 0, - } + }) } } -impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> { +impl<'extractor> CboCachedSorter<'extractor> { pub fn insert_del_u32(&mut self, key: &[u8], n: u32) { match self.cache.raw_entry_mut().from_key(key) { RawEntryMut::Occupied(mut entry) => { @@ -57,8 +70,6 @@ impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> { del.get_or_insert_with(RoaringBitmap::default).insert(n); } RawEntryMut::Vacant(entry) => { - self.total_insertions += 1; - self.fitted_in_key += (key.len() <= KEY_SIZE) as usize; let alloc = RefBump::clone(&self.alloc); let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key)); entry.insert(RefBytes(key), DelAddRoaringBitmap::new_del_u32(n)); @@ -73,8 +84,6 @@ impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> { add.get_or_insert_with(RoaringBitmap::default).insert(n); } RawEntryMut::Vacant(entry) => { - self.total_insertions += 1; - self.fitted_in_key += (key.len() <= KEY_SIZE) as usize; let alloc = RefBump::clone(&self.alloc); let key = RefBump::map(alloc, |b| b.alloc_slice_copy(key)); entry.insert(RefBytes(key), DelAddRoaringBitmap::new_add_u32(n)); @@ -82,164 +91,104 @@ impl<'extractor, MF: MergeFunction> CboCachedSorter<'extractor, MF> { } } - fn write_entry>( - sorter: &mut Sorter, - deladd_buffer: &mut Vec, - cbo_buffer: &mut Vec, - key: A, - deladd: DelAddRoaringBitmap, - ) -> grenad::Result<(), MF::Error> { - deladd_buffer.clear(); - let mut value_writer = KvWriterDelAdd::new(deladd_buffer); - match deladd { - DelAddRoaringBitmap { del: Some(del), add: None } => { - cbo_buffer.clear(); - CboRoaringBitmapCodec::serialize_into(&del, cbo_buffer); - value_writer.insert(DelAdd::Deletion, &cbo_buffer)?; - } - DelAddRoaringBitmap { del: None, add: Some(add) } => { - cbo_buffer.clear(); - CboRoaringBitmapCodec::serialize_into(&add, cbo_buffer); - value_writer.insert(DelAdd::Addition, &cbo_buffer)?; - } - DelAddRoaringBitmap { del: Some(del), add: Some(add) } => { - cbo_buffer.clear(); - CboRoaringBitmapCodec::serialize_into(&del, cbo_buffer); - value_writer.insert(DelAdd::Deletion, &cbo_buffer)?; - - cbo_buffer.clear(); - CboRoaringBitmapCodec::serialize_into(&add, cbo_buffer); - value_writer.insert(DelAdd::Addition, &cbo_buffer)?; - } - DelAddRoaringBitmap { del: None, add: None } => return Ok(()), - } - let bytes = value_writer.into_inner().unwrap(); - sorter.insert(key, bytes) - } - - pub fn spill_to_disk(self) -> grenad::Result, MF::Error> { - let Self { - cache, - alloc: _, - mut sorter, - mut deladd_buffer, - mut cbo_buffer, - total_insertions, - fitted_in_key, - } = self; + pub fn spill_to_disk(self) -> io::Result { + let Self { cache, alloc: _, mut spilled_entries, mut deladd_buffer, mut cbo_buffer } = self; for (key, deladd) in cache { - Self::write_entry(&mut sorter, &mut deladd_buffer, &mut cbo_buffer, key, deladd)?; + spill_entry_to_disk( + &mut spilled_entries, + &mut deladd_buffer, + &mut cbo_buffer, + &key, + deladd, + )?; } - Ok(SpilledCache { sorter, deladd_buffer, cbo_buffer, total_insertions, fitted_in_key }) + Ok(SpilledCache { spilled_entries, deladd_buffer, cbo_buffer }) } - pub fn into_sorter(self) -> grenad::Result, MF::Error> { - let Self { - cache, - alloc: _, - mut sorter, - mut deladd_buffer, - mut cbo_buffer, - total_insertions, - fitted_in_key, - } = self; - - let mut all_n_containers = Vec::new(); - let mut all_n_array_containers = Vec::new(); - let mut all_n_bitset_containers = Vec::new(); - let mut all_n_values_array_containers = Vec::new(); - let mut all_n_values_bitset_containers = Vec::new(); - let mut all_cardinality = Vec::new(); - - for (_key, deladd) in &cache { - for bitmap in [&deladd.del, &deladd.add].into_iter().flatten() { - let Statistics { - n_containers, - n_array_containers, - n_bitset_containers, - n_values_array_containers, - n_values_bitset_containers, - cardinality, - .. - } = bitmap.statistics(); - all_n_containers.push(n_containers); - all_n_array_containers.push(n_array_containers); - all_n_bitset_containers.push(n_bitset_containers); - all_n_values_array_containers.push(n_values_array_containers); - all_n_values_bitset_containers.push(n_values_bitset_containers); - all_cardinality.push(cardinality as u32); - } - } + // TODO Do not spill to disk if not necessary + pub fn into_unordered_entries(self) -> io::Result { + let Self { cache, alloc: _, mut spilled_entries, mut cbo_buffer, mut deladd_buffer } = self; for (key, deladd) in cache { - Self::write_entry(&mut sorter, &mut deladd_buffer, &mut cbo_buffer, key, deladd)?; + spill_entry_to_disk( + &mut spilled_entries, + &mut deladd_buffer, + &mut cbo_buffer, + &key, + deladd, + )?; } - let mut output = String::new(); - - for (name, mut slice) in [ - ("n_containers", all_n_containers), - ("n_array_containers", all_n_array_containers), - ("n_bitset_containers", all_n_bitset_containers), - ("n_values_array_containers", all_n_values_array_containers), - ("n_values_bitset_containers", all_n_values_bitset_containers), - ("cardinality", all_cardinality), - ] { - let _ = writeln!(&mut output, "{name} (p100) {:?}", Stats::from_slice(&mut slice)); - // let _ = writeln!(&mut output, "{name} (p99) {:?}", Stats::from_slice_p99(&mut slice)); - } - - let _ = writeln!( - &mut output, - "LruCache stats: {} <= {KEY_SIZE} bytes ({}%) on a total of {} insertions", - fitted_in_key, - (fitted_in_key as f32 / total_insertions as f32) * 100.0, - total_insertions, - ); - - eprintln!("{output}"); - - Ok(sorter) + spilled_entries.into_iter_bitmap() } } -pub struct SpilledCache { - sorter: Sorter, +fn spill_entry_to_disk( + spilled_entries: &mut UnorderedEntries, + deladd_buffer: &mut Vec, + cbo_buffer: &mut Vec, + key: &[u8], + deladd: DelAddRoaringBitmap, +) -> io::Result<()> { + deladd_buffer.clear(); + let mut value_writer = KvWriterDelAdd::new(deladd_buffer); + match deladd { + DelAddRoaringBitmap { del: Some(del), add: None } => { + cbo_buffer.clear(); + CboRoaringBitmapCodec::serialize_into(&del, cbo_buffer); + value_writer.insert(DelAdd::Deletion, &cbo_buffer)?; + } + DelAddRoaringBitmap { del: None, add: Some(add) } => { + cbo_buffer.clear(); + CboRoaringBitmapCodec::serialize_into(&add, cbo_buffer); + value_writer.insert(DelAdd::Addition, &cbo_buffer)?; + } + DelAddRoaringBitmap { del: Some(del), add: Some(add) } => { + cbo_buffer.clear(); + CboRoaringBitmapCodec::serialize_into(&del, cbo_buffer); + value_writer.insert(DelAdd::Deletion, &cbo_buffer)?; + + cbo_buffer.clear(); + CboRoaringBitmapCodec::serialize_into(&add, cbo_buffer); + value_writer.insert(DelAdd::Addition, &cbo_buffer)?; + } + DelAddRoaringBitmap { del: None, add: None } => return Ok(()), + } + let bytes = value_writer.into_inner().unwrap(); + spilled_entries.push(key, bytes) +} + +pub struct SpilledCache { + spilled_entries: UnorderedEntries, deladd_buffer: Vec, cbo_buffer: Vec, - total_insertions: usize, - fitted_in_key: usize, } -impl SpilledCache { - pub fn reconstruct(self, alloc: RefBump<'_>) -> CboCachedSorter<'_, MF> { - let SpilledCache { sorter, deladd_buffer, cbo_buffer, total_insertions, fitted_in_key } = - self; - +impl SpilledCache { + pub fn reconstruct(self, alloc: RefBump<'_>) -> CboCachedSorter<'_> { + let SpilledCache { spilled_entries, deladd_buffer, cbo_buffer } = self; CboCachedSorter { cache: hashbrown::HashMap::new_in(RefBump::clone(&alloc)), alloc, - sorter, + spilled_entries, deladd_buffer, cbo_buffer, - total_insertions, - fitted_in_key, } } } -unsafe impl<'extractor, MF: Send> MostlySend for CboCachedSorter<'extractor, MF> {} +unsafe impl<'extractor> MostlySend for CboCachedSorter<'extractor> {} pub struct UnorderedEntries { - entry_offsets: Vec<(u32, u32)>, + entry_sizes: Vec<(u32, u32)>, file: BufWriter, } impl UnorderedEntries { - pub fn new(file: File) -> Self { - UnorderedEntries { entry_offsets: Vec::new(), file: BufWriter::new(file) } + fn new(file: File) -> Self { + UnorderedEntries { entry_sizes: Vec::new(), file: BufWriter::new(file) } } /// Pushes a new tuple of key/value into a file. @@ -249,33 +198,41 @@ impl UnorderedEntries { /// # Panics /// /// - Panics if the key or value length is larger than 2^32 bytes. - pub fn push(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> { + fn push(&mut self, key: &[u8], value: &[u8]) -> io::Result<()> { let key_len = key.len().try_into().unwrap(); - let value_len = key.len().try_into().unwrap(); + let value_len = value.len().try_into().unwrap(); self.file.write_all(key)?; self.file.write_all(value)?; - self.entry_offsets.push((key_len, value_len)); + self.entry_sizes.push((key_len, value_len)); Ok(()) } - pub fn into_iter_ref(self) -> IntoIterRef { - let Self { entry_offsets, file } = self; - IntoIterRef { entry_offsets: entry_offsets.into_iter(), file, buffer: Vec::new() } + fn into_iter_bitmap(self) -> io::Result { + let Self { entry_sizes, file } = self; + + let mut file = file.into_inner().map_err(|e| e.into_error())?; + file.rewind()?; + + Ok(UnorderedEntriesIntoIter { + entry_sizes: entry_sizes.into_iter(), + file: BufReader::new(file), + buffer: Vec::new(), + }) } } -pub struct IntoIterRef { - entry_offsets: vec::IntoIter<(u32, u32)>, +pub struct UnorderedEntriesIntoIter { + entry_sizes: vec::IntoIter<(u32, u32)>, file: BufReader, buffer: Vec, } -impl IntoIterRef { - pub fn next(&mut self) -> io::Result> { - match self.entry_offsets.next() { +impl UnorderedEntriesIntoIter { + fn next_ref(&mut self) -> io::Result> { + match self.entry_sizes.next() { Some((key_len, value_len)) => { let key_len = key_len as usize; let value_len = value_len as usize; @@ -287,10 +244,25 @@ impl IntoIterRef { self.file.read_exact(buffer)?; let buffer = &self.buffer[..total_len]; - let (key, value) = buffer.split_at(key_len); - debug_assert_eq!(value.len(), value_len); + Ok(Some(buffer.split_at(key_len))) + } + None => Ok(None), + } + } - Ok(Some((key, value))) + pub fn next_deladd_bitmap(&mut self) -> io::Result> { + match self.next_ref()? { + Some((key, value_bytes)) => { + let reader = KvReaderDelAdd::from_slice(value_bytes); + let del = match reader.get(DelAdd::Deletion) { + Some(del_bytes) => Some(CboRoaringBitmapCodec::deserialize_from(del_bytes)?), + None => None, + }; + let add = match reader.get(DelAdd::Addition) { + Some(add_bytes) => Some(CboRoaringBitmapCodec::deserialize_from(add_bytes)?), + None => None, + }; + Ok(Some((key, DelAddRoaringBitmap { del, add }))) } None => Ok(None), } diff --git a/milli/src/update/new/extract/faceted/extract_facets.rs b/milli/src/update/new/extract/faceted/extract_facets.rs index 8da451341..947a6e6d2 100644 --- a/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/milli/src/update/new/extract/faceted/extract_facets.rs @@ -1,14 +1,12 @@ use std::cell::RefCell; use std::collections::HashSet; -use std::fmt::Debug; use std::fs::File; use std::ops::DerefMut as _; use bumpalo::Bump; -use grenad::{MergeFunction, Merger}; +use grenad::Merger; use heed::RoTxn; use raw_collections::alloc::RefBump; -use rayon::iter::{ParallelBridge as _, ParallelIterator as _}; use serde_json::Value; use super::super::cache::CboCachedSorter; @@ -21,7 +19,7 @@ use crate::update::new::indexer::document_changes::{ IndexingContext, RefCellExt, ThreadLocal, }; use crate::update::new::DocumentChange; -use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; +use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH}; pub struct FacetedExtractorData<'extractor> { @@ -31,24 +29,10 @@ pub struct FacetedExtractorData<'extractor> { } impl<'extractor> Extractor<'extractor> for FacetedExtractorData<'extractor> { - type Data = RefCell>; + type Data = RefCell>; fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result { - Ok(RefCell::new(CboCachedSorter::new_in( - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - self.grenad_parameters.chunk_compression_type, - self.grenad_parameters.chunk_compression_level, - self.grenad_parameters.max_nb_chunks, - self.max_memory, - // *NOTE*: this must not be set to true: - // 1. we're already using max parallelism in the pool, so it wouldn't help - // 2. it creates correctness issues if it causes to yield a borrow-mut wielding task - false, - ), - extractor_alloc, - ))) + Ok(RefCell::new(CboCachedSorter::new_in(extractor_alloc)?)) } fn process( @@ -64,7 +48,7 @@ pub struct FacetedDocidsExtractor; impl FacetedDocidsExtractor { fn extract_document_change( - context: &DocumentChangeContext>>, + context: &DocumentChangeContext>, attributes_to_extract: &[&str], document_change: DocumentChange, ) -> Result<()> { @@ -143,10 +127,10 @@ impl FacetedDocidsExtractor { } } - fn facet_fn_with_options<'extractor, MF>( + fn facet_fn_with_options<'extractor>( doc_alloc: &Bump, - cached_sorter: &mut CboCachedSorter<'extractor, MF>, - cache_fn: impl Fn(&mut CboCachedSorter<'extractor, MF>, &[u8], u32), + cached_sorter: &mut CboCachedSorter<'extractor>, + cache_fn: impl Fn(&mut CboCachedSorter<'extractor>, &[u8], u32), docid: DocumentId, fid: FieldId, value: &Value, diff --git a/milli/src/update/new/extract/searchable/extract_word_docids.rs b/milli/src/update/new/extract/searchable/extract_word_docids.rs index ec934170f..7c850cebb 100644 --- a/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -1,7 +1,7 @@ use std::cell::RefCell; use std::collections::HashMap; use std::fs::File; -use std::num::NonZero; +use std::io; use std::ops::DerefMut as _; use bumpalo::Bump; @@ -17,17 +17,16 @@ use crate::update::new::indexer::document_changes::{ IndexingContext, MostlySend, RefCellExt, ThreadLocal, }; use crate::update::new::DocumentChange; -use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::{bucketed_position, DocumentId, FieldId, Index, Result, MAX_POSITION_PER_ATTRIBUTE}; const MAX_COUNTED_WORDS: usize = 30; pub struct WordDocidsCachedSorters<'indexer> { - word_fid_docids: CboCachedSorter<'indexer, MergeDeladdCboRoaringBitmaps>, - word_docids: CboCachedSorter<'indexer, MergeDeladdCboRoaringBitmaps>, - exact_word_docids: CboCachedSorter<'indexer, MergeDeladdCboRoaringBitmaps>, - word_position_docids: CboCachedSorter<'indexer, MergeDeladdCboRoaringBitmaps>, - fid_word_count_docids: CboCachedSorter<'indexer, MergeDeladdCboRoaringBitmaps>, + word_fid_docids: CboCachedSorter<'indexer>, + word_docids: CboCachedSorter<'indexer>, + exact_word_docids: CboCachedSorter<'indexer>, + word_position_docids: CboCachedSorter<'indexer>, + fid_word_count_docids: CboCachedSorter<'indexer>, fid_word_count: HashMap, current_docid: Option, } @@ -35,83 +34,16 @@ pub struct WordDocidsCachedSorters<'indexer> { unsafe impl<'indexer> MostlySend for WordDocidsCachedSorters<'indexer> {} impl<'indexer> WordDocidsCachedSorters<'indexer> { - pub fn new_in( - indexer: GrenadParameters, - max_memory: Option, - alloc: RefBump<'indexer>, - ) -> Self { - let max_memory = max_memory.map(|max_memory| max_memory / 4); - - let word_fid_docids = CboCachedSorter::new_in( - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - false, - ), - RefBump::clone(&alloc), - ); - let word_docids = CboCachedSorter::new_in( - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - false, - ), - RefBump::clone(&alloc), - ); - let exact_word_docids = CboCachedSorter::new_in( - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - false, - ), - RefBump::clone(&alloc), - ); - let word_position_docids = CboCachedSorter::new_in( - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - false, - ), - RefBump::clone(&alloc), - ); - let fid_word_count_docids = CboCachedSorter::new_in( - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - indexer.chunk_compression_type, - indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory, - false, - ), - alloc, - ); - - Self { - word_fid_docids, - word_docids, - exact_word_docids, - word_position_docids, - fid_word_count_docids, + pub fn new_in(alloc: RefBump<'indexer>) -> io::Result { + Ok(Self { + word_fid_docids: CboCachedSorter::new_in(RefBump::clone(&alloc))?, + word_docids: CboCachedSorter::new_in(RefBump::clone(&alloc))?, + exact_word_docids: CboCachedSorter::new_in(RefBump::clone(&alloc))?, + word_position_docids: CboCachedSorter::new_in(RefBump::clone(&alloc))?, + fid_word_count_docids: CboCachedSorter::new_in(alloc)?, fid_word_count: HashMap::new(), current_docid: None, - } + }) } fn insert_add_u32( @@ -253,21 +185,17 @@ impl WordDocidsMergerBuilders { current_docid: _, } = other; - let word_fid_docids_readers = - word_fid_docids.into_sorter().and_then(|s| s.into_reader_cursors()); - let word_docids_readers = word_docids.into_sorter().and_then(|s| s.into_reader_cursors()); - let exact_word_docids_readers = - exact_word_docids.into_sorter().and_then(|s| s.into_reader_cursors()); - let word_position_docids_readers = - word_position_docids.into_sorter().and_then(|s| s.into_reader_cursors()); - let fid_word_count_docids_readers = - fid_word_count_docids.into_sorter().and_then(|s| s.into_reader_cursors()); + let word_fid_docids_entries = word_fid_docids.into_unordered_entries()?; + let word_docids_entries = word_docids.into_unordered_entries()?; + let exact_word_docids_entries = exact_word_docids.into_unordered_entries()?; + let word_position_docids_entries = word_position_docids.into_unordered_entries()?; + let fid_word_count_docids_entries = fid_word_count_docids.into_unordered_entries()?; - self.word_fid_docids.extend(word_fid_docids_readers?); - self.word_docids.extend(word_docids_readers?); - self.exact_word_docids.extend(exact_word_docids_readers?); - self.word_position_docids.extend(word_position_docids_readers?); - self.fid_word_count_docids.extend(fid_word_count_docids_readers?); + self.word_fid_docids.push(word_fid_docids_entries); + self.word_docids.push(word_docids_entries); + self.exact_word_docids.push(exact_word_docids_entries); + self.word_position_docids.push(word_position_docids_entries); + self.fid_word_count_docids.push(fid_word_count_docids_entries); Ok(()) } @@ -293,11 +221,7 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'extractor> { type Data = RefCell>>; fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result { - Ok(RefCell::new(Some(WordDocidsCachedSorters::new_in( - self.grenad_parameters, - self.max_memory, - extractor_alloc, - )))) + Ok(RefCell::new(Some(WordDocidsCachedSorters::new_in(extractor_alloc)))) } fn process( @@ -357,7 +281,6 @@ pub struct WordDocidsExtractors; impl WordDocidsExtractors { pub fn run_extraction<'pl, 'fid, 'indexer, 'index, DC: DocumentChanges<'pl>>( - grenad_parameters: GrenadParameters, document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index>, extractor_allocs: &mut ThreadLocal>>, diff --git a/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs b/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs index b8821dacc..dd2a2a0de 100644 --- a/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs @@ -31,7 +31,7 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor { // and to store the docids of the documents that have a number of words in a given field // equal to or under than MAX_COUNTED_WORDS. fn extract_document_change( - context: &DocumentChangeContext>>, + context: &DocumentChangeContext>, document_tokenizer: &DocumentTokenizer, document_change: DocumentChange, ) -> Result<()> { diff --git a/milli/src/update/new/extract/searchable/mod.rs b/milli/src/update/new/extract/searchable/mod.rs index 0a4fd6dd1..d35069d82 100644 --- a/milli/src/update/new/extract/searchable/mod.rs +++ b/milli/src/update/new/extract/searchable/mod.rs @@ -12,7 +12,6 @@ pub use extract_word_pair_proximity_docids::WordPairProximityDocidsExtractor; use grenad::Merger; use heed::RoTxn; use raw_collections::alloc::RefBump; -use rayon::iter::{ParallelBridge, ParallelIterator}; use tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::cache::CboCachedSorter; @@ -22,7 +21,7 @@ use crate::update::new::indexer::document_changes::{ IndexingContext, ThreadLocal, }; use crate::update::new::DocumentChange; -use crate::update::{create_sorter, GrenadParameters, MergeDeladdCboRoaringBitmaps}; +use crate::update::{GrenadParameters, MergeDeladdCboRoaringBitmaps}; use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE}; pub struct SearchableExtractorData<'extractor, EX: SearchableExtractor> { @@ -35,21 +34,10 @@ pub struct SearchableExtractorData<'extractor, EX: SearchableExtractor> { impl<'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor> for SearchableExtractorData<'extractor, EX> { - type Data = RefCell>; + type Data = RefCell>; fn init_data(&self, extractor_alloc: RefBump<'extractor>) -> Result { - Ok(RefCell::new(CboCachedSorter::new_in( - create_sorter( - grenad::SortAlgorithm::Stable, - MergeDeladdCboRoaringBitmaps, - self.grenad_parameters.chunk_compression_type, - self.grenad_parameters.chunk_compression_level, - self.grenad_parameters.max_nb_chunks, - self.max_memory, - false, - ), - extractor_alloc, - ))) + Ok(RefCell::new(CboCachedSorter::new_in(extractor_alloc)?)) } fn process(