diff --git a/milli/src/update/index_documents/extract/extract_word_docids.rs b/milli/src/update/index_documents/extract/extract_word_docids.rs index 0acdd6202..e5cf918dd 100644 --- a/milli/src/update/index_documents/extract/extract_word_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_docids.rs @@ -3,14 +3,16 @@ use std::fs::File; use std::io; use std::iter::FromIterator; +use heed::BytesDecode; use obkv::KvReaderU16; use roaring::RoaringBitmap; use super::helpers::{ - create_sorter, merge_cbo_roaring_bitmaps, merge_roaring_bitmaps, serialize_roaring_bitmap, - sorter_into_reader, try_split_array_at, GrenadParameters, + create_sorter, create_writer, merge_roaring_bitmaps, serialize_roaring_bitmap, + sorter_into_reader, try_split_array_at, writer_into_reader, GrenadParameters, }; use crate::error::SerializationError; +use crate::heed_codec::StrBEU16Codec; use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::update::MergeFn; use crate::{DocumentId, FieldId, Result}; @@ -32,6 +34,59 @@ pub fn extract_word_docids( let max_memory = indexer.max_memory_by_thread(); + let mut word_fid_docids_sorter = create_sorter( + grenad::SortAlgorithm::Unstable, + merge_roaring_bitmaps, + indexer.chunk_compression_type, + indexer.chunk_compression_level, + indexer.max_nb_chunks, + max_memory.map(|x| x / 3), + ); + + let mut current_document_id = None; + let mut fid = 0; + let mut key_buffer = Vec::new(); + let mut value_buffer = Vec::new(); + let mut words = BTreeSet::new(); + let mut cursor = docid_word_positions.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { + let (document_id_bytes, fid_bytes) = try_split_array_at(key) + .ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; + let (fid_bytes, _) = try_split_array_at(fid_bytes) + .ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; + let document_id = u32::from_be_bytes(document_id_bytes); + fid = u16::from_be_bytes(fid_bytes); + + // drain the btreemaps when we change document. + if current_document_id.map_or(false, |id| id != document_id) { + words_into_sorter( + document_id, + fid, + &mut key_buffer, + &mut value_buffer, + &mut words, + &mut word_fid_docids_sorter, + )?; + } + + current_document_id = Some(document_id); + for (_pos, word) in KvReaderU16::new(&value).iter() { + words.insert(word.to_vec()); + } + } + + // We must make sure that don't lose the current document field id + if let Some(document_id) = current_document_id { + words_into_sorter( + document_id, + fid, + &mut key_buffer, + &mut value_buffer, + &mut words, + &mut word_fid_docids_sorter, + )?; + } + let mut word_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, merge_roaring_bitmaps, @@ -50,104 +105,47 @@ pub fn extract_word_docids( max_memory.map(|x| x / 3), ); - let mut word_fid_docids_sorter = create_sorter( - grenad::SortAlgorithm::Unstable, - merge_roaring_bitmaps, + let mut word_fid_docids_writer = create_writer( indexer.chunk_compression_type, indexer.chunk_compression_level, - indexer.max_nb_chunks, - max_memory.map(|x| x / 3), + tempfile::tempfile()?, ); - let mut current_document_id = None; - let mut fid = 0; - let mut key_buffer = Vec::new(); - let mut value_buffer = Vec::new(); - let mut words = BTreeSet::new(); - let mut exact_words = BTreeSet::new(); - let mut cursor = docid_word_positions.into_cursor()?; - while let Some((key, value)) = cursor.move_on_next()? { - let (document_id_bytes, fid_bytes) = try_split_array_at(key) - .ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; - let (fid_bytes, _) = try_split_array_at(fid_bytes) - .ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; - let document_id = u32::from_be_bytes(document_id_bytes); - fid = u16::from_be_bytes(fid_bytes); + let mut iter = word_fid_docids_sorter.into_stream_merger_iter()?; + while let Some((key, value)) = iter.next()? { + word_fid_docids_writer.insert(key, value)?; - // drain the btreemaps when we change document. - if current_document_id.map_or(false, |id| id != document_id) { - words_into_sorters( - document_id, - fid, - &mut key_buffer, - &mut value_buffer, - &mut exact_words, - &mut words, - &mut exact_word_docids_sorter, - &mut word_docids_sorter, - &mut word_fid_docids_sorter, - )?; - } - - current_document_id = Some(document_id); + let (word, fid) = StrBEU16Codec::bytes_decode(key) + .ok_or(SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; // every words contained in an attribute set to exact must be pushed in the exact_words list. if exact_attributes.contains(&fid) { - for (_pos, word) in KvReaderU16::new(&value).iter() { - exact_words.insert(word.to_vec()); - } + exact_word_docids_sorter.insert(word.as_bytes(), &value)?; } else { - for (_pos, word) in KvReaderU16::new(&value).iter() { - words.insert(word.to_vec()); - } + word_docids_sorter.insert(word.as_bytes(), &value)?; } } - // We must make sure that don't lose the current document field id - if let Some(document_id) = current_document_id { - words_into_sorters( - document_id, - fid, - &mut key_buffer, - &mut value_buffer, - &mut exact_words, - &mut words, - &mut exact_word_docids_sorter, - &mut word_docids_sorter, - &mut word_fid_docids_sorter, - )?; - } - Ok(( sorter_into_reader(word_docids_sorter, indexer)?, sorter_into_reader(exact_word_docids_sorter, indexer)?, - sorter_into_reader(word_fid_docids_sorter, indexer)?, + writer_into_reader(word_fid_docids_writer)?, )) } -fn words_into_sorters( +fn words_into_sorter( document_id: DocumentId, fid: FieldId, key_buffer: &mut Vec, value_buffer: &mut Vec, - exact_words: &mut BTreeSet>, words: &mut BTreeSet>, - exact_word_docids_sorter: &mut grenad::Sorter, - word_docids_sorter: &mut grenad::Sorter, word_fid_docids_sorter: &mut grenad::Sorter, ) -> Result<()> { puffin::profile_function!(); let bitmap = RoaringBitmap::from_iter(Some(document_id)); serialize_roaring_bitmap(&bitmap, value_buffer)?; - for word_bytes in exact_words.iter() { - exact_word_docids_sorter.insert(word_bytes, &mut *value_buffer)?; - } for word_bytes in words.iter() { - word_docids_sorter.insert(word_bytes, &value_buffer)?; - } - - for word_bytes in (&*words | &*exact_words).iter() { key_buffer.clear(); key_buffer.extend_from_slice(&word_bytes); key_buffer.push(0); @@ -155,7 +153,6 @@ fn words_into_sorters( word_fid_docids_sorter.insert(&key_buffer, &value_buffer)?; } - exact_words.clear(); words.clear(); Ok(())