Support diff indexing on extract_docid_word_positions

This commit is contained in:
ManyTheFish 2023-10-16 14:58:11 +02:00 committed by Louis Dureuil
parent 1dd97578a8
commit 313b16bec2
No known key found for this signature in database
5 changed files with 322 additions and 131 deletions

View File

@ -58,3 +58,43 @@ pub fn into_del_add_obkv<K: obkv::Key + PartialOrd>(
writer.finish() writer.finish()
} }
/// Creates a Kv<K, Kv<DelAdd, value>> from two Kv<K, value>
///
/// putting each deletion obkv's keys under an DelAdd::Deletion
/// and putting each addition obkv's keys under an DelAdd::Addition
pub fn del_add_from_two_obkvs<K: obkv::Key + PartialOrd + Ord>(
deletion: obkv::KvReader<K>,
addition: obkv::KvReader<K>,
buffer: &mut Vec<u8>,
) -> Result<(), std::io::Error> {
use itertools::merge_join_by;
use itertools::EitherOrBoth::{Both, Left, Right};
let mut writer = obkv::KvWriter::new(buffer);
let mut value_buffer = Vec::new();
for eob in merge_join_by(deletion.iter(), addition.iter(), |(b, _), (u, _)| b.cmp(u)) {
value_buffer.clear();
match eob {
Left((k, v)) => {
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
value_writer.insert(DelAdd::Deletion, v).unwrap();
writer.insert(k, value_writer.into_inner()?).unwrap();
}
Right((k, v)) => {
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
value_writer.insert(DelAdd::Addition, v).unwrap();
writer.insert(k, value_writer.into_inner()?).unwrap();
}
Both((k, deletion), (_, addition)) => {
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
value_writer.insert(DelAdd::Deletion, deletion).unwrap();
value_writer.insert(DelAdd::Addition, addition).unwrap();
writer.insert(k, value_writer.into_inner()?).unwrap();
}
}
}
writer.finish()
}

View File

@ -11,7 +11,7 @@ use serde_json::Value;
use super::helpers::{create_sorter, keep_latest_obkv, sorter_into_reader, GrenadParameters}; use super::helpers::{create_sorter, keep_latest_obkv, sorter_into_reader, GrenadParameters};
use crate::error::{InternalError, SerializationError}; use crate::error::{InternalError, SerializationError};
use crate::update::index_documents::MergeFn; use crate::update::del_add::{del_add_from_two_obkvs, DelAdd, KvReaderDelAdd};
use crate::{FieldId, Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH}; use crate::{FieldId, Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH};
pub type ScriptLanguageDocidsMap = HashMap<(Script, Language), RoaringBitmap>; pub type ScriptLanguageDocidsMap = HashMap<(Script, Language), RoaringBitmap>;
@ -30,15 +30,21 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
allowed_separators: Option<&[&str]>, allowed_separators: Option<&[&str]>,
dictionary: Option<&[&str]>, dictionary: Option<&[&str]>,
max_positions_per_attributes: Option<u32>, max_positions_per_attributes: Option<u32>,
) -> Result<(RoaringBitmap, grenad::Reader<BufReader<File>>, ScriptLanguageDocidsMap)> { ) -> Result<(
RoaringBitmap,
grenad::Reader<BufReader<File>>,
(ScriptLanguageDocidsMap, ScriptLanguageDocidsMap),
)> {
puffin::profile_function!(); puffin::profile_function!();
let max_positions_per_attributes = max_positions_per_attributes let max_positions_per_attributes = max_positions_per_attributes
.map_or(MAX_POSITION_PER_ATTRIBUTE, |max| max.min(MAX_POSITION_PER_ATTRIBUTE)); .map_or(MAX_POSITION_PER_ATTRIBUTE, |max| max.min(MAX_POSITION_PER_ATTRIBUTE));
let max_memory = indexer.max_memory_by_thread(); let max_memory = indexer.max_memory_by_thread();
// initialize destination values.
let mut documents_ids = RoaringBitmap::new(); let mut documents_ids = RoaringBitmap::new();
let mut script_language_docids = HashMap::new(); let mut del_script_language_docids = HashMap::new();
let mut add_script_language_docids = HashMap::new();
let mut docid_word_positions_sorter = create_sorter( let mut docid_word_positions_sorter = create_sorter(
grenad::SortAlgorithm::Stable, grenad::SortAlgorithm::Stable,
keep_latest_obkv, keep_latest_obkv,
@ -48,7 +54,142 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
max_memory, max_memory,
); );
let mut buffers = Buffers::default(); // initialize buffers.
let mut del_buffers = Buffers::default();
let mut add_buffers = Buffers::default();
let mut key_buffer = Vec::new();
let mut value_buffer = Vec::new();
// initialize tokenizer.
let mut builder = tokenizer_builder(stop_words, dictionary, allowed_separators, None);
let tokenizer = builder.build();
// iterate over documents.
let mut cursor = obkv_documents.into_cursor()?;
while let Some((key, value)) = cursor.move_on_next()? {
let document_id = key
.try_into()
.map(u32::from_be_bytes)
.map_err(|_| SerializationError::InvalidNumberSerialization)?;
let obkv = KvReader::<FieldId>::new(value);
// if the searchable fields didn't change, skip the searchable indexing for this document.
if !searchable_fields_changed(&KvReader::<FieldId>::new(value), searchable_fields) {
continue;
}
documents_ids.push(document_id);
// Update key buffer prefix.
key_buffer.clear();
key_buffer.extend_from_slice(&document_id.to_be_bytes());
// Tokenize deletions and additions in 2 diffferent threads.
let (del, add): (Result<_>, Result<_>) = rayon::join(
|| {
// deletions
lang_safe_tokens_from_document(
&obkv,
searchable_fields,
&tokenizer,
stop_words,
allowed_separators,
dictionary,
max_positions_per_attributes,
DelAdd::Deletion,
&mut del_buffers,
)
},
|| {
// additions
lang_safe_tokens_from_document(
&obkv,
searchable_fields,
&tokenizer,
stop_words,
allowed_separators,
dictionary,
max_positions_per_attributes,
DelAdd::Addition,
&mut add_buffers,
)
},
);
let (del_obkv, del_script_language_word_count) = del?;
let (add_obkv, add_script_language_word_count) = add?;
// merge deletions and additions.
value_buffer.clear();
del_add_from_two_obkvs(
KvReader::<FieldId>::new(del_obkv),
KvReader::<FieldId>::new(add_obkv),
&mut value_buffer,
)?;
// write them into the sorter.
let obkv = KvReader::<FieldId>::new(value);
for (field_id, value) in obkv.iter() {
key_buffer.truncate(mem::size_of::<u32>());
key_buffer.extend_from_slice(&field_id.to_be_bytes());
docid_word_positions_sorter.insert(&key_buffer, value)?;
}
// update script_language_docids deletions.
for (script, languages_frequency) in del_script_language_word_count {
for (language, _) in languages_frequency {
let entry = del_script_language_docids
.entry((script, language))
.or_insert_with(RoaringBitmap::new);
entry.push(document_id);
}
}
// update script_language_docids additions.
for (script, languages_frequency) in add_script_language_word_count {
for (language, _) in languages_frequency {
let entry = add_script_language_docids
.entry((script, language))
.or_insert_with(RoaringBitmap::new);
entry.push(document_id);
}
}
}
let script_language_docids = (del_script_language_docids, add_script_language_docids);
sorter_into_reader(docid_word_positions_sorter, indexer)
.map(|reader| (documents_ids, reader, script_language_docids))
}
/// Check if any searchable fields of a document changed.
fn searchable_fields_changed(
obkv: &KvReader<FieldId>,
searchable_fields: &Option<HashSet<FieldId>>,
) -> bool {
for (field_id, field_bytes) in obkv.iter() {
if searchable_fields.as_ref().map_or(true, |sf| sf.contains(&field_id)) {
let del_add = KvReaderDelAdd::new(field_bytes);
match (del_add.get(DelAdd::Deletion), del_add.get(DelAdd::Addition)) {
// if both fields are None, check the next field.
(None, None) => (),
// if both contains a value and values are the same, check the next field.
(Some(del), Some(add)) if del == add => (),
// otherwise the fields are different, return true.
_otherwise => return true,
}
}
}
false
}
/// Factorize tokenizer building.
fn tokenizer_builder<'a>(
stop_words: Option<&'a fst::Set<&[u8]>>,
allowed_separators: Option<&'a [&str]>,
dictionary: Option<&'a [&str]>,
script_language: Option<&'a HashMap<Script, Vec<Language>>>,
) -> TokenizerBuilder<'a, &'a [u8]> {
let mut tokenizer_builder = TokenizerBuilder::new(); let mut tokenizer_builder = TokenizerBuilder::new();
if let Some(stop_words) = stop_words { if let Some(stop_words) = stop_words {
tokenizer_builder.stop_words(stop_words); tokenizer_builder.stop_words(stop_words);
@ -59,138 +200,144 @@ pub fn extract_docid_word_positions<R: io::Read + io::Seek>(
if let Some(separators) = allowed_separators { if let Some(separators) = allowed_separators {
tokenizer_builder.separators(separators); tokenizer_builder.separators(separators);
} }
let tokenizer = tokenizer_builder.build();
let mut cursor = obkv_documents.into_cursor()?; if let Some(script_language) = script_language {
while let Some((key, value)) = cursor.move_on_next()? { tokenizer_builder.allow_list(&script_language);
let document_id = key }
.try_into()
.map(u32::from_be_bytes)
.map_err(|_| SerializationError::InvalidNumberSerialization)?;
let obkv = KvReader::<FieldId>::new(value);
documents_ids.push(document_id); tokenizer_builder
buffers.key_buffer.clear(); }
buffers.key_buffer.extend_from_slice(&document_id.to_be_bytes());
let mut script_language_word_count = HashMap::new(); /// Extract words maped with their positions of a document,
/// ensuring no Language detection mistakes was made.
fn lang_safe_tokens_from_document<'a>(
obkv: &KvReader<FieldId>,
searchable_fields: &Option<HashSet<FieldId>>,
tokenizer: &Tokenizer,
stop_words: Option<&fst::Set<&[u8]>>,
allowed_separators: Option<&[&str]>,
dictionary: Option<&[&str]>,
max_positions_per_attributes: u32,
del_add: DelAdd,
buffers: &'a mut Buffers,
) -> Result<(&'a [u8], HashMap<Script, Vec<(Language, usize)>>)> {
let mut script_language_word_count = HashMap::new();
extract_tokens_from_document( tokens_from_document(
&obkv, &obkv,
searchable_fields, searchable_fields,
&tokenizer, &tokenizer,
max_positions_per_attributes, max_positions_per_attributes,
&mut buffers, del_add,
&mut script_language_word_count, buffers,
&mut docid_word_positions_sorter, &mut script_language_word_count,
)?; )?;
// if we detect a potetial mistake in the language detection, // if we detect a potetial mistake in the language detection,
// we rerun the extraction forcing the tokenizer to detect the most frequently detected Languages. // we rerun the extraction forcing the tokenizer to detect the most frequently detected Languages.
// context: https://github.com/meilisearch/meilisearch/issues/3565 // context: https://github.com/meilisearch/meilisearch/issues/3565
if script_language_word_count if script_language_word_count
.values() .values()
.map(Vec::as_slice) .map(Vec::as_slice)
.any(potential_language_detection_error) .any(potential_language_detection_error)
{ {
// build an allow list with the most frequent detected languages in the document. // build an allow list with the most frequent detected languages in the document.
let script_language: HashMap<_, _> = let script_language: HashMap<_, _> =
script_language_word_count.iter().filter_map(most_frequent_languages).collect(); script_language_word_count.iter().filter_map(most_frequent_languages).collect();
// if the allow list is empty, meaning that no Language is considered frequent, // if the allow list is empty, meaning that no Language is considered frequent,
// then we don't rerun the extraction. // then we don't rerun the extraction.
if !script_language.is_empty() { if !script_language.is_empty() {
// build a new temporary tokenizer including the allow list. // build a new temporary tokenizer including the allow list.
let mut tokenizer_builder = TokenizerBuilder::new(); let mut builder = tokenizer_builder(
if let Some(stop_words) = stop_words { stop_words,
tokenizer_builder.stop_words(stop_words); dictionary,
} allowed_separators,
if let Some(dictionary) = dictionary { Some(&script_language),
tokenizer_builder.words_dict(dictionary); );
} let tokenizer = builder.build();
if let Some(separators) = allowed_separators {
tokenizer_builder.separators(separators);
}
tokenizer_builder.allow_list(&script_language);
let tokenizer = tokenizer_builder.build();
script_language_word_count.clear(); script_language_word_count.clear();
// rerun the extraction. // rerun the extraction.
extract_tokens_from_document( tokens_from_document(
&obkv, &obkv,
searchable_fields, searchable_fields,
&tokenizer, &tokenizer,
max_positions_per_attributes, max_positions_per_attributes,
&mut buffers, del_add,
&mut script_language_word_count, buffers,
&mut docid_word_positions_sorter, &mut script_language_word_count,
)?; )?;
}
}
for (script, languages_frequency) in script_language_word_count {
for (language, _) in languages_frequency {
let entry = script_language_docids
.entry((script, language))
.or_insert_with(RoaringBitmap::new);
entry.push(document_id);
}
} }
} }
sorter_into_reader(docid_word_positions_sorter, indexer) Ok((&buffers.obkv_buffer, script_language_word_count))
.map(|reader| (documents_ids, reader, script_language_docids))
} }
fn extract_tokens_from_document( /// Extract words maped with their positions of a document.
fn tokens_from_document<'a>(
obkv: &KvReader<FieldId>, obkv: &KvReader<FieldId>,
searchable_fields: &Option<HashSet<FieldId>>, searchable_fields: &Option<HashSet<FieldId>>,
tokenizer: &Tokenizer, tokenizer: &Tokenizer,
max_positions_per_attributes: u32, max_positions_per_attributes: u32,
buffers: &mut Buffers, del_add: DelAdd,
buffers: &'a mut Buffers,
script_language_word_count: &mut HashMap<Script, Vec<(Language, usize)>>, script_language_word_count: &mut HashMap<Script, Vec<(Language, usize)>>,
docid_word_positions_sorter: &mut grenad::Sorter<MergeFn>, ) -> Result<&'a [u8]> {
) -> Result<()> { buffers.obkv_buffer.clear();
let mut document_writer = KvWriterU16::new(&mut buffers.obkv_buffer);
for (field_id, field_bytes) in obkv.iter() { for (field_id, field_bytes) in obkv.iter() {
// if field is searchable.
if searchable_fields.as_ref().map_or(true, |sf| sf.contains(&field_id)) { if searchable_fields.as_ref().map_or(true, |sf| sf.contains(&field_id)) {
let value = serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)?; // extract deletion or addition only.
buffers.field_buffer.clear(); if let Some(field_bytes) = KvReaderDelAdd::new(field_bytes).get(del_add) {
if let Some(field) = json_to_string(&value, &mut buffers.field_buffer) { // parse json.
let tokens = process_tokens(tokenizer.tokenize(field)) let value =
.take_while(|(p, _)| (*p as u32) < max_positions_per_attributes); serde_json::from_slice(field_bytes).map_err(InternalError::SerdeJson)?;
buffers.obkv_buffer.clear(); // prepare writting destination.
let mut writer = KvWriterU16::new(&mut buffers.obkv_buffer); buffers.obkv_positions_buffer.clear();
for (index, token) in tokens { let mut writer = KvWriterU16::new(&mut buffers.obkv_positions_buffer);
// if a language has been detected for the token, we update the counter.
if let Some(language) = token.language { // convert json into an unique string.
let script = token.script; buffers.field_buffer.clear();
let entry = if let Some(field) = json_to_string(&value, &mut buffers.field_buffer) {
script_language_word_count.entry(script).or_insert_with(Vec::new); // create an iterator of token with their positions.
match entry.iter_mut().find(|(l, _)| *l == language) { let tokens = process_tokens(tokenizer.tokenize(field))
Some((_, n)) => *n += 1, .take_while(|(p, _)| (*p as u32) < max_positions_per_attributes);
None => entry.push((language, 1)),
for (index, token) in tokens {
// if a language has been detected for the token, we update the counter.
if let Some(language) = token.language {
let script = token.script;
let entry =
script_language_word_count.entry(script).or_insert_with(Vec::new);
match entry.iter_mut().find(|(l, _)| *l == language) {
Some((_, n)) => *n += 1,
None => entry.push((language, 1)),
}
}
// keep a word only if it is not empty and fit in a LMDB key.
let token = token.lemma().trim();
if !token.is_empty() && token.len() <= MAX_WORD_LENGTH {
let position: u16 = index
.try_into()
.map_err(|_| SerializationError::InvalidNumberSerialization)?;
writer.insert(position, token.as_bytes())?;
} }
} }
let token = token.lemma().trim();
if !token.is_empty() && token.len() <= MAX_WORD_LENGTH {
let position: u16 = index
.try_into()
.map_err(|_| SerializationError::InvalidNumberSerialization)?;
writer.insert(position, token.as_bytes())?;
}
}
let positions = writer.into_inner()?; // write positions into document.
buffers.key_buffer.truncate(mem::size_of::<u32>()); let positions = writer.into_inner()?;
buffers.key_buffer.extend_from_slice(&field_id.to_be_bytes()); document_writer.insert(field_id, positions)?;
docid_word_positions_sorter.insert(&buffers.key_buffer, positions)?; }
} }
} }
} }
Ok(()) Ok(document_writer.into_inner().map(|v| v.as_slice())?)
} }
/// Transform a JSON value into a string that can be indexed. /// Transform a JSON value into a string that can be indexed.
@ -293,12 +440,10 @@ fn compute_language_frequency_threshold(languages_frequency: &[(Language, usize)
#[derive(Default)] #[derive(Default)]
struct Buffers { struct Buffers {
// the key buffer is the concatenation of the internal document id with the field id.
// The buffer has to be completelly cleared between documents,
// and the field id part must be cleared between each field.
key_buffer: Vec<u8>,
// the field buffer for each fields desserialization, and must be cleared between each field. // the field buffer for each fields desserialization, and must be cleared between each field.
field_buffer: String, field_buffer: String,
// buffer used to store the value data containing an obkv. // buffer used to store the value data containing an obkv.
obkv_buffer: Vec<u8>, obkv_buffer: Vec<u8>,
// buffer used to store the value data containing an obkv of tokens with their positions.
obkv_positions_buffer: Vec<u8>,
} }

View File

@ -117,8 +117,9 @@ pub fn merge_two_del_add_obkvs(
let update_reader = KvReaderDelAdd::new(update); let update_reader = KvReaderDelAdd::new(update);
// keep newest deletion. // keep newest deletion.
if let Some(deletion) = if let Some(deletion) = update_reader
update_reader.get(DelAdd::Deletion).or(base_reader.get(DelAdd::Deletion)) .get(DelAdd::Deletion)
.or_else(|| base_reader.get(DelAdd::Deletion))
{ {
value_writer.insert(DelAdd::Deletion, deletion).unwrap(); value_writer.insert(DelAdd::Deletion, deletion).unwrap();
} }
@ -127,6 +128,7 @@ pub fn merge_two_del_add_obkvs(
let base_addition = let base_addition =
merge_additions.then(|| base_reader.get(DelAdd::Addition)).flatten(); merge_additions.then(|| base_reader.get(DelAdd::Addition)).flatten();
// keep newest addition. // keep newest addition.
// TODO use or_else
if let Some(addition) = update_reader.get(DelAdd::Addition).or(base_addition) { if let Some(addition) = update_reader.get(DelAdd::Addition).or(base_addition) {
value_writer.insert(DelAdd::Addition, addition).unwrap(); value_writer.insert(DelAdd::Addition, addition).unwrap();
} }

View File

@ -805,7 +805,7 @@ impl<'a, 'i> Transform<'a, 'i> {
let buffer = obkv_writer.into_inner()?; let buffer = obkv_writer.into_inner()?;
document_sorter_buffer.clear(); document_sorter_buffer.clear();
into_del_add_obkv(KvReaderU16::new(buffer), true, true, &mut document_sorter_buffer)?; into_del_add_obkv(KvReaderU16::new(buffer), false, true, &mut document_sorter_buffer)?;
original_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?; original_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
// Once we have the document. We're going to flatten it // Once we have the document. We're going to flatten it
@ -842,7 +842,7 @@ impl<'a, 'i> Transform<'a, 'i> {
writer.insert(fid, &value)?; writer.insert(fid, &value)?;
} }
document_sorter_buffer.clear(); document_sorter_buffer.clear();
into_del_add_obkv(KvReaderU16::new(&buffer), true, true, &mut document_sorter_buffer)?; into_del_add_obkv(KvReaderU16::new(&buffer), false, true, &mut document_sorter_buffer)?;
flattened_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?; flattened_writer.insert(docid.to_be_bytes(), &document_sorter_buffer)?;
} }

View File

@ -43,7 +43,9 @@ pub(crate) enum TypedChunk {
FieldIdFacetIsEmptyDocids(grenad::Reader<BufReader<File>>), FieldIdFacetIsEmptyDocids(grenad::Reader<BufReader<File>>),
GeoPoints(grenad::Reader<BufReader<File>>), GeoPoints(grenad::Reader<BufReader<File>>),
VectorPoints(grenad::Reader<BufReader<File>>), VectorPoints(grenad::Reader<BufReader<File>>),
ScriptLanguageDocids(HashMap<(Script, Language), RoaringBitmap>), ScriptLanguageDocids(
(HashMap<(Script, Language), RoaringBitmap>, HashMap<(Script, Language), RoaringBitmap>),
),
} }
impl TypedChunk { impl TypedChunk {
@ -101,8 +103,8 @@ impl TypedChunk {
TypedChunk::VectorPoints(grenad) => { TypedChunk::VectorPoints(grenad) => {
format!("VectorPoints {{ number_of_entries: {} }}", grenad.len()) format!("VectorPoints {{ number_of_entries: {} }}", grenad.len())
} }
TypedChunk::ScriptLanguageDocids(grenad) => { TypedChunk::ScriptLanguageDocids((_, addition)) => {
format!("ScriptLanguageDocids {{ number_of_entries: {} }}", grenad.len()) format!("ScriptLanguageDocids {{ number_of_entries: {} }}", addition.len())
} }
} }
} }
@ -344,19 +346,21 @@ pub(crate) fn write_typed_chunk_into_index(
log::debug!("There are {} entries in the HNSW so far", hnsw_length); log::debug!("There are {} entries in the HNSW so far", hnsw_length);
index.put_vector_hnsw(wtxn, &new_hnsw)?; index.put_vector_hnsw(wtxn, &new_hnsw)?;
} }
TypedChunk::ScriptLanguageDocids(hash_pair) => { TypedChunk::ScriptLanguageDocids((deletion, addition)) => {
let mut buffer = Vec::new(); for (key, value) in deletion {
for (key, value) in hash_pair { if let Some(mut db_values) = index.script_language_docids.get(wtxn, &key)? {
buffer.clear(); db_values -= value;
let final_value = match index.script_language_docids.get(wtxn, &key)? { if db_values.is_empty() {
Some(db_values) => { index.script_language_docids.delete(wtxn, &key)?;
let mut db_value_buffer = Vec::new(); } else {
serialize_roaring_bitmap(&db_values, &mut db_value_buffer)?; index.script_language_docids.put(wtxn, &key, &db_values)?;
let mut new_value_buffer = Vec::new();
serialize_roaring_bitmap(&value, &mut new_value_buffer)?;
merge_roaring_bitmaps(&new_value_buffer, &db_value_buffer, &mut buffer)?;
RoaringBitmap::deserialize_from(&buffer[..])?
} }
}
}
for (key, value) in addition {
let final_value = match index.script_language_docids.get(wtxn, &key)? {
Some(mut db_values) => db_values | value,
None => value, None => value,
}; };
index.script_language_docids.put(wtxn, &key, &final_value)?; index.script_language_docids.put(wtxn, &key, &final_value)?;