diff --git a/milli/src/update/del_add.rs b/milli/src/update/del_add.rs index e8e595837..346ae0afa 100644 --- a/milli/src/update/del_add.rs +++ b/milli/src/update/del_add.rs @@ -58,3 +58,43 @@ pub fn into_del_add_obkv( writer.finish() } + +/// Creates a Kv> from two Kv +/// +/// putting each deletion obkv's keys under an DelAdd::Deletion +/// and putting each addition obkv's keys under an DelAdd::Addition +pub fn del_add_from_two_obkvs( + deletion: obkv::KvReader, + addition: obkv::KvReader, + buffer: &mut Vec, +) -> Result<(), std::io::Error> { + use itertools::merge_join_by; + use itertools::EitherOrBoth::{Both, Left, Right}; + + let mut writer = obkv::KvWriter::new(buffer); + let mut value_buffer = Vec::new(); + + for eob in merge_join_by(deletion.iter(), addition.iter(), |(b, _), (u, _)| b.cmp(u)) { + value_buffer.clear(); + match eob { + Left((k, v)) => { + let mut value_writer = KvWriterDelAdd::new(&mut value_buffer); + value_writer.insert(DelAdd::Deletion, v).unwrap(); + writer.insert(k, value_writer.into_inner()?).unwrap(); + } + Right((k, v)) => { + let mut value_writer = KvWriterDelAdd::new(&mut value_buffer); + value_writer.insert(DelAdd::Addition, v).unwrap(); + writer.insert(k, value_writer.into_inner()?).unwrap(); + } + Both((k, deletion), (_, addition)) => { + let mut value_writer = KvWriterDelAdd::new(&mut value_buffer); + value_writer.insert(DelAdd::Deletion, deletion).unwrap(); + value_writer.insert(DelAdd::Addition, addition).unwrap(); + writer.insert(k, value_writer.into_inner()?).unwrap(); + } + } + } + + writer.finish() +} diff --git a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs index 0c7c5cf46..e02e492d2 100644 --- a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs +++ b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs @@ -11,7 +11,7 @@ use serde_json::Value; use super::helpers::{create_sorter, keep_latest_obkv, sorter_into_reader, GrenadParameters}; use crate::error::{InternalError, SerializationError}; -use crate::update::index_documents::MergeFn; +use crate::update::del_add::{del_add_from_two_obkvs, DelAdd, KvReaderDelAdd}; use crate::{FieldId, Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH}; pub type ScriptLanguageDocidsMap = HashMap<(Script, Language), RoaringBitmap>; @@ -30,15 +30,21 @@ pub fn extract_docid_word_positions( allowed_separators: Option<&[&str]>, dictionary: Option<&[&str]>, max_positions_per_attributes: Option, -) -> Result<(RoaringBitmap, grenad::Reader>, ScriptLanguageDocidsMap)> { +) -> Result<( + RoaringBitmap, + grenad::Reader>, + (ScriptLanguageDocidsMap, ScriptLanguageDocidsMap), +)> { puffin::profile_function!(); let max_positions_per_attributes = max_positions_per_attributes .map_or(MAX_POSITION_PER_ATTRIBUTE, |max| max.min(MAX_POSITION_PER_ATTRIBUTE)); let max_memory = indexer.max_memory_by_thread(); + // initialize destination values. let mut documents_ids = RoaringBitmap::new(); - let mut script_language_docids = HashMap::new(); + let mut del_script_language_docids = HashMap::new(); + let mut add_script_language_docids = HashMap::new(); let mut docid_word_positions_sorter = create_sorter( grenad::SortAlgorithm::Stable, keep_latest_obkv, @@ -48,7 +54,142 @@ pub fn extract_docid_word_positions( max_memory, ); - let mut buffers = Buffers::default(); + // initialize buffers. + let mut del_buffers = Buffers::default(); + let mut add_buffers = Buffers::default(); + let mut key_buffer = Vec::new(); + let mut value_buffer = Vec::new(); + + // initialize tokenizer. + let mut builder = tokenizer_builder(stop_words, dictionary, allowed_separators, None); + let tokenizer = builder.build(); + + // iterate over documents. + let mut cursor = obkv_documents.into_cursor()?; + while let Some((key, value)) = cursor.move_on_next()? { + let document_id = key + .try_into() + .map(u32::from_be_bytes) + .map_err(|_| SerializationError::InvalidNumberSerialization)?; + let obkv = KvReader::::new(value); + + // if the searchable fields didn't change, skip the searchable indexing for this document. + if !searchable_fields_changed(&KvReader::::new(value), searchable_fields) { + continue; + } + + documents_ids.push(document_id); + + // Update key buffer prefix. + key_buffer.clear(); + key_buffer.extend_from_slice(&document_id.to_be_bytes()); + + // Tokenize deletions and additions in 2 diffferent threads. + let (del, add): (Result<_>, Result<_>) = rayon::join( + || { + // deletions + lang_safe_tokens_from_document( + &obkv, + searchable_fields, + &tokenizer, + stop_words, + allowed_separators, + dictionary, + max_positions_per_attributes, + DelAdd::Deletion, + &mut del_buffers, + ) + }, + || { + // additions + lang_safe_tokens_from_document( + &obkv, + searchable_fields, + &tokenizer, + stop_words, + allowed_separators, + dictionary, + max_positions_per_attributes, + DelAdd::Addition, + &mut add_buffers, + ) + }, + ); + + let (del_obkv, del_script_language_word_count) = del?; + let (add_obkv, add_script_language_word_count) = add?; + + // merge deletions and additions. + value_buffer.clear(); + del_add_from_two_obkvs( + KvReader::::new(del_obkv), + KvReader::::new(add_obkv), + &mut value_buffer, + )?; + + // write them into the sorter. + let obkv = KvReader::::new(value); + for (field_id, value) in obkv.iter() { + key_buffer.truncate(mem::size_of::()); + key_buffer.extend_from_slice(&field_id.to_be_bytes()); + docid_word_positions_sorter.insert(&key_buffer, value)?; + } + + // update script_language_docids deletions. + for (script, languages_frequency) in del_script_language_word_count { + for (language, _) in languages_frequency { + let entry = del_script_language_docids + .entry((script, language)) + .or_insert_with(RoaringBitmap::new); + entry.push(document_id); + } + } + + // update script_language_docids additions. + for (script, languages_frequency) in add_script_language_word_count { + for (language, _) in languages_frequency { + let entry = add_script_language_docids + .entry((script, language)) + .or_insert_with(RoaringBitmap::new); + entry.push(document_id); + } + } + } + + let script_language_docids = (del_script_language_docids, add_script_language_docids); + sorter_into_reader(docid_word_positions_sorter, indexer) + .map(|reader| (documents_ids, reader, script_language_docids)) +} + +/// Check if any searchable fields of a document changed. +fn searchable_fields_changed( + obkv: &KvReader, + searchable_fields: &Option>, +) -> bool { + for (field_id, field_bytes) in obkv.iter() { + if searchable_fields.as_ref().map_or(true, |sf| sf.contains(&field_id)) { + let del_add = KvReaderDelAdd::new(field_bytes); + match (del_add.get(DelAdd::Deletion), del_add.get(DelAdd::Addition)) { + // if both fields are None, check the next field. + (None, None) => (), + // if both contains a value and values are the same, check the next field. + (Some(del), Some(add)) if del == add => (), + // otherwise the fields are different, return true. + _otherwise => return true, + } + } + } + + false +} + +/// Factorize tokenizer building. +fn tokenizer_builder<'a>( + stop_words: Option<&'a fst::Set<&[u8]>>, + allowed_separators: Option<&'a [&str]>, + dictionary: Option<&'a [&str]>, + script_language: Option<&'a HashMap>>, +) -> TokenizerBuilder<'a, &'a [u8]> { let mut tokenizer_builder = TokenizerBuilder::new(); if let Some(stop_words) = stop_words { tokenizer_builder.stop_words(stop_words); @@ -59,138 +200,144 @@ pub fn extract_docid_word_positions( if let Some(separators) = allowed_separators { tokenizer_builder.separators(separators); } - let tokenizer = tokenizer_builder.build(); - let mut cursor = obkv_documents.into_cursor()?; - while let Some((key, value)) = cursor.move_on_next()? { - let document_id = key - .try_into() - .map(u32::from_be_bytes) - .map_err(|_| SerializationError::InvalidNumberSerialization)?; - let obkv = KvReader::::new(value); + if let Some(script_language) = script_language { + tokenizer_builder.allow_list(&script_language); + } - documents_ids.push(document_id); - buffers.key_buffer.clear(); - buffers.key_buffer.extend_from_slice(&document_id.to_be_bytes()); + tokenizer_builder +} - let mut script_language_word_count = HashMap::new(); +/// Extract words maped with their positions of a document, +/// ensuring no Language detection mistakes was made. +fn lang_safe_tokens_from_document<'a>( + obkv: &KvReader, + searchable_fields: &Option>, + tokenizer: &Tokenizer, + stop_words: Option<&fst::Set<&[u8]>>, + allowed_separators: Option<&[&str]>, + dictionary: Option<&[&str]>, + max_positions_per_attributes: u32, + del_add: DelAdd, + buffers: &'a mut Buffers, +) -> Result<(&'a [u8], HashMap>)> { + let mut script_language_word_count = HashMap::new(); - extract_tokens_from_document( - &obkv, - searchable_fields, - &tokenizer, - max_positions_per_attributes, - &mut buffers, - &mut script_language_word_count, - &mut docid_word_positions_sorter, - )?; + tokens_from_document( + &obkv, + searchable_fields, + &tokenizer, + max_positions_per_attributes, + del_add, + buffers, + &mut script_language_word_count, + )?; - // if we detect a potetial mistake in the language detection, - // we rerun the extraction forcing the tokenizer to detect the most frequently detected Languages. - // context: https://github.com/meilisearch/meilisearch/issues/3565 - if script_language_word_count - .values() - .map(Vec::as_slice) - .any(potential_language_detection_error) - { - // build an allow list with the most frequent detected languages in the document. - let script_language: HashMap<_, _> = - script_language_word_count.iter().filter_map(most_frequent_languages).collect(); + // if we detect a potetial mistake in the language detection, + // we rerun the extraction forcing the tokenizer to detect the most frequently detected Languages. + // context: https://github.com/meilisearch/meilisearch/issues/3565 + if script_language_word_count + .values() + .map(Vec::as_slice) + .any(potential_language_detection_error) + { + // build an allow list with the most frequent detected languages in the document. + let script_language: HashMap<_, _> = + script_language_word_count.iter().filter_map(most_frequent_languages).collect(); - // if the allow list is empty, meaning that no Language is considered frequent, - // then we don't rerun the extraction. - if !script_language.is_empty() { - // build a new temporary tokenizer including the allow list. - let mut tokenizer_builder = TokenizerBuilder::new(); - if let Some(stop_words) = stop_words { - tokenizer_builder.stop_words(stop_words); - } - if let Some(dictionary) = dictionary { - tokenizer_builder.words_dict(dictionary); - } - if let Some(separators) = allowed_separators { - tokenizer_builder.separators(separators); - } - tokenizer_builder.allow_list(&script_language); - let tokenizer = tokenizer_builder.build(); + // if the allow list is empty, meaning that no Language is considered frequent, + // then we don't rerun the extraction. + if !script_language.is_empty() { + // build a new temporary tokenizer including the allow list. + let mut builder = tokenizer_builder( + stop_words, + dictionary, + allowed_separators, + Some(&script_language), + ); + let tokenizer = builder.build(); - script_language_word_count.clear(); + script_language_word_count.clear(); - // rerun the extraction. - extract_tokens_from_document( - &obkv, - searchable_fields, - &tokenizer, - max_positions_per_attributes, - &mut buffers, - &mut script_language_word_count, - &mut docid_word_positions_sorter, - )?; - } - } - - for (script, languages_frequency) in script_language_word_count { - for (language, _) in languages_frequency { - let entry = script_language_docids - .entry((script, language)) - .or_insert_with(RoaringBitmap::new); - entry.push(document_id); - } + // rerun the extraction. + tokens_from_document( + &obkv, + searchable_fields, + &tokenizer, + max_positions_per_attributes, + del_add, + buffers, + &mut script_language_word_count, + )?; } } - sorter_into_reader(docid_word_positions_sorter, indexer) - .map(|reader| (documents_ids, reader, script_language_docids)) + Ok((&buffers.obkv_buffer, script_language_word_count)) } -fn extract_tokens_from_document( +/// Extract words maped with their positions of a document. +fn tokens_from_document<'a>( obkv: &KvReader, searchable_fields: &Option>, tokenizer: &Tokenizer, max_positions_per_attributes: u32, - buffers: &mut Buffers, + del_add: DelAdd, + buffers: &'a mut Buffers, script_language_word_count: &mut HashMap>, - docid_word_positions_sorter: &mut grenad::Sorter, -) -> Result<()> { +) -> Result<&'a [u8]> { + buffers.obkv_buffer.clear(); + let mut document_writer = KvWriterU16::new(&mut buffers.obkv_buffer); for (field_id, field_bytes) in obkv.iter() { + // if field is searchable. if searchable_fields.as_ref().map_or(true, |sf| sf.contains(&field_id)) { - let value = serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)?; - buffers.field_buffer.clear(); - if let Some(field) = json_to_string(&value, &mut buffers.field_buffer) { - let tokens = process_tokens(tokenizer.tokenize(field)) - .take_while(|(p, _)| (*p as u32) < max_positions_per_attributes); + // extract deletion or addition only. + if let Some(field_bytes) = KvReaderDelAdd::new(field_bytes).get(del_add) { + // parse json. + let value = + serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)?; - buffers.obkv_buffer.clear(); - let mut writer = KvWriterU16::new(&mut buffers.obkv_buffer); - for (index, token) in tokens { - // if a language has been detected for the token, we update the counter. - if let Some(language) = token.language { - let script = token.script; - let entry = - script_language_word_count.entry(script).or_insert_with(Vec::new); - match entry.iter_mut().find(|(l, _)| *l == language) { - Some((_, n)) => *n += 1, - None => entry.push((language, 1)), + // prepare writting destination. + buffers.obkv_positions_buffer.clear(); + let mut writer = KvWriterU16::new(&mut buffers.obkv_positions_buffer); + + // convert json into an unique string. + buffers.field_buffer.clear(); + if let Some(field) = json_to_string(&value, &mut buffers.field_buffer) { + // create an iterator of token with their positions. + let tokens = process_tokens(tokenizer.tokenize(field)) + .take_while(|(p, _)| (*p as u32) < max_positions_per_attributes); + + for (index, token) in tokens { + // if a language has been detected for the token, we update the counter. + if let Some(language) = token.language { + let script = token.script; + let entry = + script_language_word_count.entry(script).or_insert_with(Vec::new); + match entry.iter_mut().find(|(l, _)| *l == language) { + Some((_, n)) => *n += 1, + None => entry.push((language, 1)), + } + } + + // keep a word only if it is not empty and fit in a LMDB key. + let token = token.lemma().trim(); + if !token.is_empty() && token.len() <= MAX_WORD_LENGTH { + let position: u16 = index + .try_into() + .map_err(|_| SerializationError::InvalidNumberSerialization)?; + writer.insert(position, token.as_bytes())?; } } - let token = token.lemma().trim(); - if !token.is_empty() && token.len() <= MAX_WORD_LENGTH { - let position: u16 = index - .try_into() - .map_err(|_| SerializationError::InvalidNumberSerialization)?; - writer.insert(position, token.as_bytes())?; - } - } - let positions = writer.into_inner()?; - buffers.key_buffer.truncate(mem::size_of::()); - buffers.key_buffer.extend_from_slice(&field_id.to_be_bytes()); - docid_word_positions_sorter.insert(&buffers.key_buffer, positions)?; + // write positions into document. + let positions = writer.into_inner()?; + document_writer.insert(field_id, positions)?; + } } } } - Ok(()) + Ok(document_writer.into_inner().map(|v| v.as_slice())?) } /// Transform a JSON value into a string that can be indexed. @@ -293,12 +440,10 @@ fn compute_language_frequency_threshold(languages_frequency: &[(Language, usize) #[derive(Default)] struct Buffers { - // the key buffer is the concatenation of the internal document id with the field id. - // The buffer has to be completelly cleared between documents, - // and the field id part must be cleared between each field. - key_buffer: Vec, // the field buffer for each fields desserialization, and must be cleared between each field. field_buffer: String, // buffer used to store the value data containing an obkv. obkv_buffer: Vec, + // buffer used to store the value data containing an obkv of tokens with their positions. + obkv_positions_buffer: Vec, } diff --git a/milli/src/update/index_documents/helpers/merge_functions.rs b/milli/src/update/index_documents/helpers/merge_functions.rs index 6317b5610..dee200b21 100644 --- a/milli/src/update/index_documents/helpers/merge_functions.rs +++ b/milli/src/update/index_documents/helpers/merge_functions.rs @@ -117,8 +117,9 @@ pub fn merge_two_del_add_obkvs( let update_reader = KvReaderDelAdd::new(update); // keep newest deletion. - if let Some(deletion) = - update_reader.get(DelAdd::Deletion).or(base_reader.get(DelAdd::Deletion)) + if let Some(deletion) = update_reader + .get(DelAdd::Deletion) + .or_else(|| base_reader.get(DelAdd::Deletion)) { value_writer.insert(DelAdd::Deletion, deletion).unwrap(); } @@ -127,6 +128,7 @@ pub fn merge_two_del_add_obkvs( let base_addition = merge_additions.then(|| base_reader.get(DelAdd::Addition)).flatten(); // keep newest addition. + // TODO use or_else if let Some(addition) = update_reader.get(DelAdd::Addition).or(base_addition) { value_writer.insert(DelAdd::Addition, addition).unwrap(); } diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index a45a6ee3c..2b77768cb 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -805,7 +805,7 @@ impl<'a, 'i> Transform<'a, 'i> { let buffer = obkv_writer.into_inner()?; document_sorter_buffer.clear(); - into_del_add_obkv(KvReaderU16::new(buffer), true, true, &mut document_sorter_buffer)?; + into_del_add_obkv(KvReaderU16::new(buffer), false, true, &mut document_sorter_buffer)?; original_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?; // Once we have the document. We're going to flatten it @@ -842,7 +842,7 @@ impl<'a, 'i> Transform<'a, 'i> { writer.insert(fid, &value)?; } document_sorter_buffer.clear(); - into_del_add_obkv(KvReaderU16::new(&buffer), true, true, &mut document_sorter_buffer)?; + into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut document_sorter_buffer)?; flattened_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?; } diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index a94bcf581..f2dc7d336 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -43,7 +43,9 @@ pub(crate) enum TypedChunk { FieldIdFacetIsEmptyDocids(grenad::Reader>), GeoPoints(grenad::Reader>), VectorPoints(grenad::Reader>), - ScriptLanguageDocids(HashMap<(Script, Language), RoaringBitmap>), + ScriptLanguageDocids( + (HashMap<(Script, Language), RoaringBitmap>, HashMap<(Script, Language), RoaringBitmap>), + ), } impl TypedChunk { @@ -101,8 +103,8 @@ impl TypedChunk { TypedChunk::VectorPoints(grenad) => { format!("VectorPoints {{ number_of_entries: {} }}", grenad.len()) } - TypedChunk::ScriptLanguageDocids(grenad) => { - format!("ScriptLanguageDocids {{ number_of_entries: {} }}", grenad.len()) + TypedChunk::ScriptLanguageDocids((_, addition)) => { + format!("ScriptLanguageDocids {{ number_of_entries: {} }}", addition.len()) } } } @@ -344,19 +346,21 @@ pub(crate) fn write_typed_chunk_into_index( log::debug!("There are {} entries in the HNSW so far", hnsw_length); index.put_vector_hnsw(wtxn, &new_hnsw)?; } - TypedChunk::ScriptLanguageDocids(hash_pair) => { - let mut buffer = Vec::new(); - for (key, value) in hash_pair { - buffer.clear(); - let final_value = match index.script_language_docids.get(wtxn, &key)? { - Some(db_values) => { - let mut db_value_buffer = Vec::new(); - serialize_roaring_bitmap(&db_values, &mut db_value_buffer)?; - let mut new_value_buffer = Vec::new(); - serialize_roaring_bitmap(&value, &mut new_value_buffer)?; - merge_roaring_bitmaps(&new_value_buffer, &db_value_buffer, &mut buffer)?; - RoaringBitmap::deserialize_from(&buffer[..])? + TypedChunk::ScriptLanguageDocids((deletion, addition)) => { + for (key, value) in deletion { + if let Some(mut db_values) = index.script_language_docids.get(wtxn, &key)? { + db_values -= value; + if db_values.is_empty() { + index.script_language_docids.delete(wtxn, &key)?; + } else { + index.script_language_docids.put(wtxn, &key, &db_values)?; } + } + } + + for (key, value) in addition { + let final_value = match index.script_language_docids.get(wtxn, &key)? { + Some(mut db_values) => db_values | value, None => value, }; index.script_language_docids.put(wtxn, &key, &final_value)?;