diff --git a/milli/Cargo.toml b/milli/Cargo.toml index 6b830c29e..9197fa818 100644 --- a/milli/Cargo.toml +++ b/milli/Cargo.toml @@ -16,7 +16,7 @@ either = "1.6.1" flate2 = "1.0.20" fst = "0.4.5" fxhash = "0.2.1" -grenad = { version = "0.3.1", default-features = false, features = ["tempfile"] } +grenad = { version = "0.4.1", default-features = false, features = ["tempfile"] } geoutils = "0.4.1" heed = { git = "https://github.com/Kerollmops/heed", tag = "v0.12.1", default-features = false, features = ["lmdb", "sync-read-txn"] } human_format = "1.0.3" diff --git a/milli/src/error.rs b/milli/src/error.rs index 47c9a5993..dce23582a 100644 --- a/milli/src/error.rs +++ b/milli/src/error.rs @@ -29,6 +29,7 @@ pub enum InternalError { FieldIdMapMissingEntry(FieldIdMapMissingEntry), Fst(fst::Error), GrenadInvalidCompressionType, + GrenadInvalidFormatVersion, IndexingMergingKeys { process: &'static str }, InvalidDatabaseTyping, RayonThreadPool(ThreadPoolBuildError), @@ -97,6 +98,9 @@ where grenad::Error::InvalidCompressionType => { Error::InternalError(InternalError::GrenadInvalidCompressionType) } + grenad::Error::InvalidFormatVersion => { + Error::InternalError(InternalError::GrenadInvalidFormatVersion) + } } } } @@ -186,6 +190,9 @@ impl fmt::Display for InternalError { Self::GrenadInvalidCompressionType => { f.write_str("Invalid compression type have been specified to grenad.") } + Self::GrenadInvalidFormatVersion => { + f.write_str("Invalid grenad file with an invalid version format.") + } Self::IndexingMergingKeys { process } => { write!(f, "Invalid merge while processing {}.", process) } diff --git a/milli/src/update/facets.rs b/milli/src/update/facets.rs index 19684c6ea..53305cdee 100644 --- a/milli/src/update/facets.rs +++ b/milli/src/update/facets.rs @@ -160,8 +160,7 @@ fn compute_facet_number_levels<'t>( // It is forbidden to keep a cursor and write in a database at the same time with LMDB // therefore we write the facet levels entries into a grenad file before transfering them. - let mut writer = tempfile::tempfile() - .and_then(|file| create_writer(compression_type, compression_level, file))?; + let mut writer = create_writer(compression_type, compression_level, tempfile::tempfile()?); let level_0_range = { let left = (field_id, 0, f64::MIN, f64::MIN); @@ -279,8 +278,7 @@ fn compute_facet_string_levels<'t>( // It is forbidden to keep a cursor and write in a database at the same time with LMDB // therefore we write the facet levels entries into a grenad file before transfering them. - let mut writer = tempfile::tempfile() - .and_then(|file| create_writer(compression_type, compression_level, file))?; + let mut writer = create_writer(compression_type, compression_level, tempfile::tempfile()?); // Groups sizes are always a power of the original level_group_size and therefore a group // always maps groups of the previous level and never splits previous levels groups in half. diff --git a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs index fa1381412..44bf9dbf7 100644 --- a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs +++ b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs @@ -18,8 +18,8 @@ use crate::{absolute_from_relative_position, FieldId, Result, MAX_POSITION_PER_A /// Returns the generated internal documents ids and a grenad reader /// with the list of extracted words from the given chunk of documents. #[logging_timer::time] -pub fn extract_docid_word_positions( - mut obkv_documents: grenad::Reader, +pub fn extract_docid_word_positions( + obkv_documents: grenad::Reader, indexer: GrenadParameters, searchable_fields: &Option>, stop_words: Option<&fst::Set<&[u8]>>, @@ -46,7 +46,8 @@ pub fn extract_docid_word_positions( } let analyzer = Analyzer::>::new(AnalyzerConfig::default()); - while let Some((key, value)) = obkv_documents.next()? { + let mut cursor = obkv_documents.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { let document_id = key .try_into() .map(u32::from_be_bytes) diff --git a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs index 5480bd605..fa63d9549 100644 --- a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs @@ -14,8 +14,8 @@ use crate::Result; /// Returns a grenad reader with the list of extracted facet numbers and /// documents ids from the given chunk of docid facet number positions. #[logging_timer::time] -pub fn extract_facet_number_docids( - mut docid_fid_facet_number: grenad::Reader, +pub fn extract_facet_number_docids( + docid_fid_facet_number: grenad::Reader, indexer: GrenadParameters, ) -> Result> { let max_memory = indexer.max_memory_by_thread(); @@ -28,7 +28,8 @@ pub fn extract_facet_number_docids( max_memory, ); - while let Some((key_bytes, _)) = docid_fid_facet_number.next()? { + let mut cursor = docid_fid_facet_number.into_cursor()?; + while let Some((key_bytes, _)) = cursor.move_on_next()? { let (field_id, document_id, number) = FieldDocIdFacetF64Codec::bytes_decode(key_bytes).unwrap(); diff --git a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs index e08d062cf..8209d817b 100644 --- a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs @@ -16,8 +16,8 @@ use crate::{FieldId, Result}; /// Returns a grenad reader with the list of extracted facet strings and /// documents ids from the given chunk of docid facet string positions. #[logging_timer::time] -pub fn extract_facet_string_docids( - mut docid_fid_facet_string: grenad::Reader, +pub fn extract_facet_string_docids( + docid_fid_facet_string: grenad::Reader, indexer: GrenadParameters, ) -> Result> { let max_memory = indexer.max_memory_by_thread(); @@ -32,7 +32,8 @@ pub fn extract_facet_string_docids( let mut key_buffer = Vec::new(); let mut value_buffer = Vec::new(); - while let Some((key, original_value_bytes)) = docid_fid_facet_string.next()? { + let mut cursor = docid_fid_facet_string.into_cursor()?; + while let Some((key, original_value_bytes)) = cursor.move_on_next()? { let (field_id_bytes, bytes) = try_split_array_at(key).unwrap(); let field_id = FieldId::from_be_bytes(field_id_bytes); let (document_id_bytes, normalized_value_bytes) = try_split_array_at(bytes).unwrap(); diff --git a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs index a1bf0b1e3..628636f78 100644 --- a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs +++ b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs @@ -16,8 +16,8 @@ use crate::{DocumentId, FieldId, Result}; /// Returns the generated grenad reader containing the docid the fid and the orginal value as key /// and the normalized value as value extracted from the given chunk of documents. #[logging_timer::time] -pub fn extract_fid_docid_facet_values( - mut obkv_documents: grenad::Reader, +pub fn extract_fid_docid_facet_values( + obkv_documents: grenad::Reader, indexer: GrenadParameters, faceted_fields: &HashSet, ) -> Result<(grenad::Reader, grenad::Reader)> { @@ -40,7 +40,8 @@ pub fn extract_fid_docid_facet_values( ); let mut key_buffer = Vec::new(); - while let Some((docid_bytes, value)) = obkv_documents.next()? { + let mut cursor = obkv_documents.into_cursor()?; + while let Some((docid_bytes, value)) = cursor.move_on_next()? { let obkv = obkv::KvReader::new(value); for (field_id, field_bytes) in obkv.iter() { diff --git a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs index 4e25cb4f6..85a65ee14 100644 --- a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs +++ b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs @@ -18,8 +18,8 @@ use crate::{relative_from_absolute_position, DocumentId, FieldId, Result}; /// Returns a grenad reader with the list of extracted field id word counts /// and documents ids from the given chunk of docid word positions. #[logging_timer::time] -pub fn extract_fid_word_count_docids( - mut docid_word_positions: grenad::Reader, +pub fn extract_fid_word_count_docids( + docid_word_positions: grenad::Reader, indexer: GrenadParameters, ) -> Result> { let max_memory = indexer.max_memory_by_thread(); @@ -36,7 +36,8 @@ pub fn extract_fid_word_count_docids( let mut document_fid_wordcount = HashMap::new(); let mut current_document_id = None; - while let Some((key, value)) = docid_word_positions.next()? { + let mut cursor = docid_word_positions.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { let (document_id_bytes, _word_bytes) = try_split_array_at(key) .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; let document_id = u32::from_be_bytes(document_id_bytes); diff --git a/milli/src/update/index_documents/extract/extract_geo_points.rs b/milli/src/update/index_documents/extract/extract_geo_points.rs index a36b608ee..e58d351d6 100644 --- a/milli/src/update/index_documents/extract/extract_geo_points.rs +++ b/milli/src/update/index_documents/extract/extract_geo_points.rs @@ -10,17 +10,20 @@ use crate::{FieldId, InternalError, Result, UserError}; /// Extracts the geographical coordinates contained in each document under the `_geo` field. /// /// Returns the generated grenad reader containing the docid as key associated to the (latitude, longitude) -pub fn extract_geo_points( - mut obkv_documents: grenad::Reader, +pub fn extract_geo_points( + obkv_documents: grenad::Reader, indexer: GrenadParameters, primary_key_id: FieldId, geo_field_id: FieldId, ) -> Result> { - let mut writer = tempfile::tempfile().and_then(|file| { - create_writer(indexer.chunk_compression_type, indexer.chunk_compression_level, file) - })?; + let mut writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); - while let Some((docid_bytes, value)) = obkv_documents.next()? { + let mut cursor = obkv_documents.into_cursor()?; + while let Some((docid_bytes, value)) = cursor.move_on_next()? { let obkv = obkv::KvReader::new(value); let point: Value = match obkv.get(geo_field_id) { Some(point) => serde_json::from_slice(point).map_err(InternalError::SerdeJson)?, diff --git a/milli/src/update/index_documents/extract/extract_word_docids.rs b/milli/src/update/index_documents/extract/extract_word_docids.rs index 6d99fda44..80d68298a 100644 --- a/milli/src/update/index_documents/extract/extract_word_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_docids.rs @@ -17,8 +17,8 @@ use crate::Result; /// Returns a grenad reader with the list of extracted words and /// documents ids from the given chunk of docid word positions. #[logging_timer::time] -pub fn extract_word_docids( - mut docid_word_positions: grenad::Reader, +pub fn extract_word_docids( + docid_word_positions: grenad::Reader, indexer: GrenadParameters, ) -> Result> { let max_memory = indexer.max_memory_by_thread(); @@ -32,7 +32,8 @@ pub fn extract_word_docids( ); let mut value_buffer = Vec::new(); - while let Some((key, _value)) = docid_word_positions.next()? { + let mut cursor = docid_word_positions.into_cursor()?; + while let Some((key, _value)) = cursor.move_on_next()? { let (document_id_bytes, word_bytes) = try_split_array_at(key) .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; let document_id = u32::from_be_bytes(document_id_bytes); diff --git a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs index f3667694a..90349eb93 100644 --- a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs @@ -17,8 +17,8 @@ use crate::{DocumentId, Result}; /// Returns a grenad reader with the list of extracted word pairs proximities and /// documents ids from the given chunk of docid word positions. #[logging_timer::time] -pub fn extract_word_pair_proximity_docids( - mut docid_word_positions: grenad::Reader, +pub fn extract_word_pair_proximity_docids( + docid_word_positions: grenad::Reader, indexer: GrenadParameters, ) -> Result> { let max_memory = indexer.max_memory_by_thread(); @@ -35,7 +35,8 @@ pub fn extract_word_pair_proximity_docids( let mut document_word_positions_heap = BinaryHeap::new(); let mut current_document_id = None; - while let Some((key, value)) = docid_word_positions.next()? { + let mut cursor = docid_word_positions.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { let (document_id_bytes, word_bytes) = try_split_array_at(key) .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; let document_id = u32::from_be_bytes(document_id_bytes); diff --git a/milli/src/update/index_documents/extract/extract_word_position_docids.rs b/milli/src/update/index_documents/extract/extract_word_position_docids.rs index 4ca8537ac..a4720ba2b 100644 --- a/milli/src/update/index_documents/extract/extract_word_position_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_position_docids.rs @@ -14,8 +14,8 @@ use crate::{DocumentId, Result}; /// Returns a grenad reader with the list of extracted words at positions and /// documents ids from the given chunk of docid word positions. #[logging_timer::time] -pub fn extract_word_position_docids( - mut docid_word_positions: grenad::Reader, +pub fn extract_word_position_docids( + docid_word_positions: grenad::Reader, indexer: GrenadParameters, ) -> Result> { let max_memory = indexer.max_memory_by_thread(); @@ -29,7 +29,8 @@ pub fn extract_word_position_docids( ); let mut key_buffer = Vec::new(); - while let Some((key, value)) = docid_word_positions.next()? { + let mut cursor = docid_word_positions.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { let (document_id_bytes, word_bytes) = try_split_array_at(key) .ok_or_else(|| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; let document_id = DocumentId::from_be_bytes(document_id_bytes); diff --git a/milli/src/update/index_documents/helpers/grenad_helpers.rs b/milli/src/update/index_documents/helpers/grenad_helpers.rs index eef067122..ec4a32755 100644 --- a/milli/src/update/index_documents/helpers/grenad_helpers.rs +++ b/milli/src/update/index_documents/helpers/grenad_helpers.rs @@ -17,7 +17,7 @@ pub fn create_writer( typ: grenad::CompressionType, level: Option, file: R, -) -> io::Result> { +) -> grenad::Writer { let mut builder = grenad::Writer::builder(); builder.compression_type(typ); if let Some(level) = level { @@ -52,10 +52,13 @@ pub fn sorter_into_reader( sorter: grenad::Sorter, indexer: GrenadParameters, ) -> Result> { - let mut writer = tempfile::tempfile().and_then(|file| { - create_writer(indexer.chunk_compression_type, indexer.chunk_compression_level, file) - })?; - sorter.write_into(&mut writer)?; + let mut writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); + sorter.write_into_stream_writer(&mut writer)?; + Ok(writer_into_reader(writer)?) } @@ -75,20 +78,25 @@ pub unsafe fn into_clonable_grenad( Ok(reader) } -pub fn merge_readers( +pub fn merge_readers( readers: Vec>, merge_fn: MergeFn, indexer: GrenadParameters, ) -> Result> { let mut merger_builder = grenad::MergerBuilder::new(merge_fn); - merger_builder.extend(readers); + for reader in readers { + merger_builder.push(reader.into_cursor()?); + } + let merger = merger_builder.build(); - let mut writer = tempfile::tempfile().and_then(|file| { - create_writer(indexer.chunk_compression_type, indexer.chunk_compression_level, file) - })?; - merger.write_into(&mut writer)?; - let reader = writer_into_reader(writer)?; - Ok(reader) + let mut writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); + merger.write_into_stream_writer(&mut writer)?; + + Ok(writer_into_reader(writer)?) } #[derive(Debug, Clone, Copy)] @@ -125,12 +133,13 @@ impl GrenadParameters { /// The grenad obkv entries are composed of an incremental document id big-endian /// encoded as the key and an obkv object with an `u8` for the field as the key /// and a simple UTF-8 encoded string as the value. -pub fn grenad_obkv_into_chunks( - mut reader: grenad::Reader, +pub fn grenad_obkv_into_chunks( + reader: grenad::Reader, indexer: GrenadParameters, documents_chunk_size: usize, ) -> Result>>> { let mut continue_reading = true; + let mut cursor = reader.into_cursor()?; let indexer_clone = indexer.clone(); let mut transposer = move || { @@ -139,15 +148,13 @@ pub fn grenad_obkv_into_chunks( } let mut current_chunk_size = 0u64; - let mut obkv_documents = tempfile::tempfile().and_then(|file| { - create_writer( - indexer_clone.chunk_compression_type, - indexer_clone.chunk_compression_level, - file, - ) - })?; + let mut obkv_documents = create_writer( + indexer_clone.chunk_compression_type, + indexer_clone.chunk_compression_level, + tempfile::tempfile()?, + ); - while let Some((document_id, obkv)) = reader.next()? { + while let Some((document_id, obkv)) = cursor.move_on_next()? { obkv_documents.insert(document_id, obkv)?; current_chunk_size += document_id.len() as u64 + obkv.len() as u64; @@ -166,13 +173,14 @@ pub fn grenad_obkv_into_chunks( pub fn write_into_lmdb_database( wtxn: &mut heed::RwTxn, database: heed::PolyDatabase, - mut reader: Reader, + reader: Reader, merge: MergeFn, ) -> Result<()> { debug!("Writing MTBL stores..."); let before = Instant::now(); - while let Some((k, v)) = reader.next()? { + let mut cursor = reader.into_cursor()?; + while let Some((k, v)) = cursor.move_on_next()? { let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?; match iter.next().transpose()? { Some((key, old_val)) if key == k => { @@ -201,19 +209,19 @@ pub fn sorter_into_lmdb_database( debug!("Writing MTBL sorter..."); let before = Instant::now(); - merger_iter_into_lmdb_database(wtxn, database, sorter.into_merger_iter()?, merge)?; + merger_iter_into_lmdb_database(wtxn, database, sorter.into_stream_merger_iter()?, merge)?; debug!("MTBL sorter writen in {:.02?}!", before.elapsed()); Ok(()) } -fn merger_iter_into_lmdb_database( +fn merger_iter_into_lmdb_database( wtxn: &mut heed::RwTxn, database: heed::PolyDatabase, - mut sorter: MergerIter, + mut merger_iter: MergerIter, merge: MergeFn, ) -> Result<()> { - while let Some((k, v)) = sorter.next()? { + while let Some((k, v)) = merger_iter.next()? { let mut iter = database.prefix_iter_mut::<_, ByteSlice, ByteSlice>(wtxn, k)?; match iter.next().transpose()? { Some((key, old_val)) if key == k => { diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index f5fb1ec01..4ec34c0c6 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -277,7 +277,7 @@ impl<'a, 'i> Transform<'a, 'i> { let mut available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids); // consume sorter, in order to free the internal allocation, before creating a new one. - let mut iter = self.sorter.into_merger_iter()?; + let mut iter = self.sorter.into_stream_merger_iter()?; // Once we have sort and deduplicated the documents we write them into a final file. let mut final_sorter = create_sorter( @@ -374,16 +374,15 @@ impl<'a, 'i> Transform<'a, 'i> { }); // We create a final writer to write the new documents in order from the sorter. - let file = tempfile::tempfile()?; let mut writer = create_writer( self.indexer_settings.chunk_compression_type, self.indexer_settings.chunk_compression_level, - file, - )?; + tempfile::tempfile()?, + ); // Once we have written all the documents into the final sorter, we write the documents // into this writer, extract the file and reset the seek to be able to read it again. - final_sorter.write_into(&mut writer)?; + final_sorter.write_into_stream_writer(&mut writer)?; let mut documents_file = writer.into_inner()?; documents_file.seek(SeekFrom::Start(0))?; @@ -424,12 +423,11 @@ impl<'a, 'i> Transform<'a, 'i> { let documents_count = documents_ids.len() as usize; // We create a final writer to write the new documents in order from the sorter. - let file = tempfile::tempfile()?; let mut writer = create_writer( self.indexer_settings.chunk_compression_type, self.indexer_settings.chunk_compression_level, - file, - )?; + tempfile::tempfile()?, + ); let mut obkv_buffer = Vec::new(); for result in self.index.documents.iter(wtxn)? { diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index 7f0cfcab3..3c77de7a1 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -1,6 +1,7 @@ use std::borrow::Cow; use std::convert::TryInto; use std::fs::File; +use std::io; use heed::types::ByteSlice; use heed::{BytesDecode, RwTxn}; @@ -65,8 +66,9 @@ pub(crate) fn write_typed_chunk_into_index( }, )?; } - TypedChunk::Documents(mut obkv_documents_iter) => { - while let Some((key, value)) = obkv_documents_iter.next()? { + TypedChunk::Documents(obkv_documents_iter) => { + let mut cursor = obkv_documents_iter.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { index.documents.remap_types::().put(wtxn, key, value)?; } } @@ -85,7 +87,7 @@ pub(crate) fn write_typed_chunk_into_index( return Ok((documents_ids, is_merged_database)) } TypedChunk::WordDocids(word_docids_iter) => { - let mut word_docids_iter = unsafe { into_clonable_grenad(word_docids_iter) }?; + let word_docids_iter = unsafe { into_clonable_grenad(word_docids_iter) }?; append_entries_into_database( word_docids_iter.clone(), &index.word_docids, @@ -97,7 +99,8 @@ pub(crate) fn write_typed_chunk_into_index( // create fst from word docids let mut builder = fst::SetBuilder::memory(); - while let Some((word, _value)) = word_docids_iter.next()? { + let mut cursor = word_docids_iter.into_cursor()?; + while let Some((word, _value)) = cursor.move_on_next()? { // This is a lexicographically ordered word position // we use the key to construct the words fst. builder.insert(word)?; @@ -146,19 +149,21 @@ pub(crate) fn write_typed_chunk_into_index( )?; is_merged_database = true; } - TypedChunk::FieldIdDocidFacetNumbers(mut fid_docid_facet_number) => { + TypedChunk::FieldIdDocidFacetNumbers(fid_docid_facet_number) => { let index_fid_docid_facet_numbers = index.field_id_docid_facet_f64s.remap_types::(); - while let Some((key, value)) = fid_docid_facet_number.next()? { + let mut cursor = fid_docid_facet_number.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { if valid_lmdb_key(key) { index_fid_docid_facet_numbers.put(wtxn, key, &value)?; } } } - TypedChunk::FieldIdDocidFacetStrings(mut fid_docid_facet_string) => { + TypedChunk::FieldIdDocidFacetStrings(fid_docid_facet_string) => { let index_fid_docid_facet_strings = index.field_id_docid_facet_strings.remap_types::(); - while let Some((key, value)) = fid_docid_facet_string.next()? { + let mut cursor = fid_docid_facet_string.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { if valid_lmdb_key(key) { index_fid_docid_facet_strings.put(wtxn, key, &value)?; } @@ -183,11 +188,12 @@ pub(crate) fn write_typed_chunk_into_index( )?; is_merged_database = true; } - TypedChunk::GeoPoints(mut geo_points) => { + TypedChunk::GeoPoints(geo_points) => { let mut rtree = index.geo_rtree(wtxn)?.unwrap_or_default(); let mut geo_faceted_docids = index.geo_faceted_documents_ids(wtxn)?; - while let Some((key, value)) = geo_points.next()? { + let mut cursor = geo_points.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { // convert the key back to a u32 (4 bytes) let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap(); @@ -229,7 +235,7 @@ fn merge_cbo_roaring_bitmaps( /// Write provided entries in database using serialize_value function. /// merge_values function is used if an entry already exist in the database. fn write_entries_into_database( - mut data: grenad::Reader, + data: grenad::Reader, database: &heed::Database, wtxn: &mut RwTxn, index_is_empty: bool, @@ -237,14 +243,15 @@ fn write_entries_into_database( merge_values: FM, ) -> Result<()> where - R: std::io::Read, + R: io::Read + io::Seek, FS: for<'a> Fn(&'a [u8], &'a mut Vec) -> Result<&'a [u8]>, FM: Fn(&[u8], &[u8], &mut Vec) -> Result<()>, { let mut buffer = Vec::new(); let database = database.remap_types::(); - while let Some((key, value)) = data.next()? { + let mut cursor = data.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { if valid_lmdb_key(key) { buffer.clear(); let value = if index_is_empty { @@ -270,7 +277,7 @@ where /// All provided entries must be ordered. /// If the index is not empty, write_entries_into_database is called instead. fn append_entries_into_database( - mut data: grenad::Reader, + data: grenad::Reader, database: &heed::Database, wtxn: &mut RwTxn, index_is_empty: bool, @@ -278,7 +285,7 @@ fn append_entries_into_database( merge_values: FM, ) -> Result<()> where - R: std::io::Read, + R: io::Read + io::Seek, FS: for<'a> Fn(&'a [u8], &'a mut Vec) -> Result<&'a [u8]>, FM: Fn(&[u8], &[u8], &mut Vec) -> Result<()>, { @@ -296,7 +303,8 @@ where let mut buffer = Vec::new(); let mut database = database.iter_mut(wtxn)?.remap_types::(); - while let Some((key, value)) = data.next()? { + let mut cursor = data.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { if valid_lmdb_key(key) { buffer.clear(); let value = serialize_value(value, &mut buffer)?; diff --git a/milli/src/update/word_prefix_docids.rs b/milli/src/update/word_prefix_docids.rs index 624037f8f..0bb5edb9a 100644 --- a/milli/src/update/word_prefix_docids.rs +++ b/milli/src/update/word_prefix_docids.rs @@ -51,8 +51,10 @@ impl<'t, 'u, 'i> WordPrefixDocids<'t, 'u, 'i> { ); let mut word_docids_merger = MergerBuilder::new(merge_roaring_bitmaps); - word_docids_merger.extend(new_word_docids); - let mut word_docids_iter = word_docids_merger.build().into_merger_iter()?; + for reader in new_word_docids { + word_docids_merger.push(reader.into_cursor()?); + } + let mut word_docids_iter = word_docids_merger.build().into_stream_merger_iter()?; let mut current_prefixes: Option<&&[String]> = None; let mut prefixes_cache = HashMap::new(); diff --git a/milli/src/update/word_prefix_pair_proximity_docids.rs b/milli/src/update/word_prefix_pair_proximity_docids.rs index 530c2867e..b498d5850 100644 --- a/milli/src/update/word_prefix_pair_proximity_docids.rs +++ b/milli/src/update/word_prefix_pair_proximity_docids.rs @@ -77,8 +77,10 @@ impl<'t, 'u, 'i> WordPrefixPairProximityDocids<'t, 'u, 'i> { // We retrieve and merge the created word pair proximities docids entries // for the newly added documents. let mut wppd_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps); - wppd_merger.extend(new_word_pair_proximity_docids); - let mut wppd_iter = wppd_merger.build().into_merger_iter()?; + for reader in new_word_pair_proximity_docids { + wppd_merger.push(reader.into_cursor()?); + } + let mut wppd_iter = wppd_merger.build().into_stream_merger_iter()?; let mut word_prefix_pair_proximity_docids_sorter = create_sorter( merge_cbo_roaring_bitmaps, diff --git a/milli/src/update/words_prefix_position_docids.rs b/milli/src/update/words_prefix_position_docids.rs index c992d01ec..9e15f4d6c 100644 --- a/milli/src/update/words_prefix_position_docids.rs +++ b/milli/src/update/words_prefix_position_docids.rs @@ -73,9 +73,11 @@ impl<'t, 'u, 'i> WordPrefixPositionDocids<'t, 'u, 'i> { ); let mut word_position_docids_merger = MergerBuilder::new(merge_cbo_roaring_bitmaps); - word_position_docids_merger.extend(new_word_position_docids); + for reader in new_word_position_docids { + word_position_docids_merger.push(reader.into_cursor()?); + } let mut word_position_docids_iter = - word_position_docids_merger.build().into_merger_iter()?; + word_position_docids_merger.build().into_stream_merger_iter()?; // We fetch all the new common prefixes between the previous and new prefix fst. let mut buffer = Vec::new();