mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-22 21:04:27 +01:00
Merge #4680
4680: Speedup additional searchables r=Kerollmops a=ManyTheFish Fixes #4492. ## To Do - [x] Do not call the `InnerSettingsDiff::only_additional_fields` function too many times Co-authored-by: Clément Renault <clement@meilisearch.com> Co-authored-by: ManyTheFish <many@meilisearch.com>
This commit is contained in:
commit
40b2345394
@ -40,11 +40,26 @@ pub fn into_del_add_obkv<K: obkv::Key + PartialOrd>(
|
|||||||
operation: DelAddOperation,
|
operation: DelAddOperation,
|
||||||
buffer: &mut Vec<u8>,
|
buffer: &mut Vec<u8>,
|
||||||
) -> Result<(), std::io::Error> {
|
) -> Result<(), std::io::Error> {
|
||||||
|
into_del_add_obkv_conditional_operation(reader, buffer, |_| operation)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Akin to the [into_del_add_obkv] function but lets you
|
||||||
|
/// conditionally define the `DelAdd` variant based on the obkv key.
|
||||||
|
pub fn into_del_add_obkv_conditional_operation<K, F>(
|
||||||
|
reader: obkv::KvReader<K>,
|
||||||
|
buffer: &mut Vec<u8>,
|
||||||
|
operation: F,
|
||||||
|
) -> std::io::Result<()>
|
||||||
|
where
|
||||||
|
K: obkv::Key + PartialOrd,
|
||||||
|
F: Fn(K) -> DelAddOperation,
|
||||||
|
{
|
||||||
let mut writer = obkv::KvWriter::new(buffer);
|
let mut writer = obkv::KvWriter::new(buffer);
|
||||||
let mut value_buffer = Vec::new();
|
let mut value_buffer = Vec::new();
|
||||||
for (key, value) in reader.iter() {
|
for (key, value) in reader.iter() {
|
||||||
value_buffer.clear();
|
value_buffer.clear();
|
||||||
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
|
let mut value_writer = KvWriterDelAdd::new(&mut value_buffer);
|
||||||
|
let operation = operation(key);
|
||||||
if matches!(operation, DelAddOperation::Deletion | DelAddOperation::DeletionAndAddition) {
|
if matches!(operation, DelAddOperation::Deletion | DelAddOperation::DeletionAndAddition) {
|
||||||
value_writer.insert(DelAdd::Deletion, value)?;
|
value_writer.insert(DelAdd::Deletion, value)?;
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
use std::collections::BTreeMap;
|
use std::collections::{BTreeMap, BTreeSet};
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{self, BufReader};
|
use std::io::{self, BufReader};
|
||||||
@ -9,7 +9,7 @@ use std::result::Result as StdResult;
|
|||||||
use bytemuck::bytes_of;
|
use bytemuck::bytes_of;
|
||||||
use grenad::Sorter;
|
use grenad::Sorter;
|
||||||
use heed::BytesEncode;
|
use heed::BytesEncode;
|
||||||
use itertools::EitherOrBoth;
|
use itertools::{merge_join_by, EitherOrBoth};
|
||||||
use ordered_float::OrderedFloat;
|
use ordered_float::OrderedFloat;
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
use serde_json::{from_slice, Value};
|
use serde_json::{from_slice, Value};
|
||||||
@ -18,7 +18,7 @@ use FilterableValues::{Empty, Null, Values};
|
|||||||
use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters};
|
use super::helpers::{create_sorter, keep_first, sorter_into_reader, GrenadParameters};
|
||||||
use crate::error::InternalError;
|
use crate::error::InternalError;
|
||||||
use crate::facet::value_encoding::f64_into_bytes;
|
use crate::facet::value_encoding::f64_into_bytes;
|
||||||
use crate::update::del_add::{DelAdd, KvWriterDelAdd};
|
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
|
||||||
use crate::update::index_documents::{create_writer, writer_into_reader};
|
use crate::update::index_documents::{create_writer, writer_into_reader};
|
||||||
use crate::update::settings::InnerIndexSettingsDiff;
|
use crate::update::settings::InnerIndexSettingsDiff;
|
||||||
use crate::{CboRoaringBitmapCodec, DocumentId, Error, FieldId, Result, MAX_FACET_VALUE_LENGTH};
|
use crate::{CboRoaringBitmapCodec, DocumentId, Error, FieldId, Result, MAX_FACET_VALUE_LENGTH};
|
||||||
@ -75,149 +75,181 @@ pub fn extract_fid_docid_facet_values<R: io::Read + io::Seek>(
|
|||||||
let mut numbers_key_buffer = Vec::new();
|
let mut numbers_key_buffer = Vec::new();
|
||||||
let mut strings_key_buffer = Vec::new();
|
let mut strings_key_buffer = Vec::new();
|
||||||
|
|
||||||
let mut cursor = obkv_documents.into_cursor()?;
|
let old_faceted_fids: BTreeSet<_> =
|
||||||
while let Some((docid_bytes, value)) = cursor.move_on_next()? {
|
settings_diff.old.faceted_fields_ids.iter().copied().collect();
|
||||||
let obkv = obkv::KvReader::new(value);
|
let new_faceted_fids: BTreeSet<_> =
|
||||||
|
settings_diff.new.faceted_fields_ids.iter().copied().collect();
|
||||||
|
|
||||||
for (field_id, field_bytes) in obkv.iter() {
|
if !settings_diff.settings_update_only || old_faceted_fids != new_faceted_fids {
|
||||||
let delete_faceted = settings_diff.old.faceted_fields_ids.contains(&field_id);
|
let mut cursor = obkv_documents.into_cursor()?;
|
||||||
let add_faceted = settings_diff.new.faceted_fields_ids.contains(&field_id);
|
while let Some((docid_bytes, value)) = cursor.move_on_next()? {
|
||||||
if delete_faceted || add_faceted {
|
let obkv = obkv::KvReader::new(value);
|
||||||
numbers_key_buffer.clear();
|
let get_document_json_value = move |field_id, side| {
|
||||||
strings_key_buffer.clear();
|
obkv.get(field_id)
|
||||||
|
.map(KvReaderDelAdd::new)
|
||||||
|
.and_then(|kv| kv.get(side))
|
||||||
|
.map(from_slice)
|
||||||
|
.transpose()
|
||||||
|
.map_err(InternalError::SerdeJson)
|
||||||
|
};
|
||||||
|
// iterate over the faceted fields instead of over the whole document.
|
||||||
|
for eob in
|
||||||
|
merge_join_by(old_faceted_fids.iter(), new_faceted_fids.iter(), |old, new| {
|
||||||
|
old.cmp(new)
|
||||||
|
})
|
||||||
|
{
|
||||||
|
let (field_id, del_value, add_value) = match eob {
|
||||||
|
EitherOrBoth::Left(&field_id) => {
|
||||||
|
let del_value = get_document_json_value(field_id, DelAdd::Deletion)?;
|
||||||
|
|
||||||
// Set key to the field_id
|
// deletion only
|
||||||
// Note: this encoding is consistent with FieldIdCodec
|
(field_id, del_value, None)
|
||||||
numbers_key_buffer.extend_from_slice(&field_id.to_be_bytes());
|
}
|
||||||
strings_key_buffer.extend_from_slice(&field_id.to_be_bytes());
|
EitherOrBoth::Right(&field_id) => {
|
||||||
|
let add_value = get_document_json_value(field_id, DelAdd::Addition)?;
|
||||||
|
|
||||||
let document: [u8; 4] = docid_bytes[..4].try_into().ok().unwrap();
|
// addition only
|
||||||
let document = DocumentId::from_be_bytes(document);
|
(field_id, None, add_value)
|
||||||
|
}
|
||||||
|
EitherOrBoth::Both(&field_id, _) => {
|
||||||
|
// during settings update, recompute the changing settings only.
|
||||||
|
if settings_diff.settings_update_only {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// For the other extraction tasks, prefix the key with the field_id and the document_id
|
let del_value = get_document_json_value(field_id, DelAdd::Deletion)?;
|
||||||
numbers_key_buffer.extend_from_slice(docid_bytes);
|
let add_value = get_document_json_value(field_id, DelAdd::Addition)?;
|
||||||
strings_key_buffer.extend_from_slice(docid_bytes);
|
|
||||||
|
|
||||||
let del_add_obkv = obkv::KvReader::new(field_bytes);
|
(field_id, del_value, add_value)
|
||||||
let del_value = match del_add_obkv.get(DelAdd::Deletion).filter(|_| delete_faceted)
|
}
|
||||||
{
|
|
||||||
Some(bytes) => Some(from_slice(bytes).map_err(InternalError::SerdeJson)?),
|
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
let add_value = match del_add_obkv.get(DelAdd::Addition).filter(|_| add_faceted) {
|
|
||||||
Some(bytes) => Some(from_slice(bytes).map_err(InternalError::SerdeJson)?),
|
|
||||||
None => None,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// We insert the document id on the Del and the Add side if the field exists.
|
if del_value.is_some() || add_value.is_some() {
|
||||||
let (ref mut del_exists, ref mut add_exists) =
|
numbers_key_buffer.clear();
|
||||||
facet_exists_docids.entry(field_id).or_default();
|
strings_key_buffer.clear();
|
||||||
let (ref mut del_is_null, ref mut add_is_null) =
|
|
||||||
facet_is_null_docids.entry(field_id).or_default();
|
|
||||||
let (ref mut del_is_empty, ref mut add_is_empty) =
|
|
||||||
facet_is_empty_docids.entry(field_id).or_default();
|
|
||||||
|
|
||||||
if del_value.is_some() {
|
// Set key to the field_id
|
||||||
del_exists.insert(document);
|
// Note: this encoding is consistent with FieldIdCodec
|
||||||
}
|
numbers_key_buffer.extend_from_slice(&field_id.to_be_bytes());
|
||||||
if add_value.is_some() {
|
strings_key_buffer.extend_from_slice(&field_id.to_be_bytes());
|
||||||
add_exists.insert(document);
|
|
||||||
}
|
|
||||||
|
|
||||||
let del_geo_support = settings_diff
|
let document: [u8; 4] = docid_bytes[..4].try_into().ok().unwrap();
|
||||||
.old
|
let document = DocumentId::from_be_bytes(document);
|
||||||
.geo_fields_ids
|
|
||||||
.map_or(false, |(lat, lng)| field_id == lat || field_id == lng);
|
|
||||||
let add_geo_support = settings_diff
|
|
||||||
.new
|
|
||||||
.geo_fields_ids
|
|
||||||
.map_or(false, |(lat, lng)| field_id == lat || field_id == lng);
|
|
||||||
let del_filterable_values =
|
|
||||||
del_value.map(|value| extract_facet_values(&value, del_geo_support));
|
|
||||||
let add_filterable_values =
|
|
||||||
add_value.map(|value| extract_facet_values(&value, add_geo_support));
|
|
||||||
|
|
||||||
// Those closures are just here to simplify things a bit.
|
// For the other extraction tasks, prefix the key with the field_id and the document_id
|
||||||
let mut insert_numbers_diff = |del_numbers, add_numbers| {
|
numbers_key_buffer.extend_from_slice(docid_bytes);
|
||||||
insert_numbers_diff(
|
strings_key_buffer.extend_from_slice(docid_bytes);
|
||||||
&mut fid_docid_facet_numbers_sorter,
|
|
||||||
&mut numbers_key_buffer,
|
|
||||||
del_numbers,
|
|
||||||
add_numbers,
|
|
||||||
)
|
|
||||||
};
|
|
||||||
let mut insert_strings_diff = |del_strings, add_strings| {
|
|
||||||
insert_strings_diff(
|
|
||||||
&mut fid_docid_facet_strings_sorter,
|
|
||||||
&mut strings_key_buffer,
|
|
||||||
del_strings,
|
|
||||||
add_strings,
|
|
||||||
)
|
|
||||||
};
|
|
||||||
|
|
||||||
match (del_filterable_values, add_filterable_values) {
|
// We insert the document id on the Del and the Add side if the field exists.
|
||||||
(None, None) => (),
|
let (ref mut del_exists, ref mut add_exists) =
|
||||||
(Some(del_filterable_values), None) => match del_filterable_values {
|
facet_exists_docids.entry(field_id).or_default();
|
||||||
Null => {
|
let (ref mut del_is_null, ref mut add_is_null) =
|
||||||
del_is_null.insert(document);
|
facet_is_null_docids.entry(field_id).or_default();
|
||||||
}
|
let (ref mut del_is_empty, ref mut add_is_empty) =
|
||||||
Empty => {
|
facet_is_empty_docids.entry(field_id).or_default();
|
||||||
del_is_empty.insert(document);
|
|
||||||
}
|
if del_value.is_some() {
|
||||||
Values { numbers, strings } => {
|
del_exists.insert(document);
|
||||||
insert_numbers_diff(numbers, vec![])?;
|
}
|
||||||
insert_strings_diff(strings, vec![])?;
|
if add_value.is_some() {
|
||||||
}
|
add_exists.insert(document);
|
||||||
},
|
}
|
||||||
(None, Some(add_filterable_values)) => match add_filterable_values {
|
|
||||||
Null => {
|
let del_geo_support = settings_diff
|
||||||
add_is_null.insert(document);
|
.old
|
||||||
}
|
.geo_fields_ids
|
||||||
Empty => {
|
.map_or(false, |(lat, lng)| field_id == lat || field_id == lng);
|
||||||
add_is_empty.insert(document);
|
let add_geo_support = settings_diff
|
||||||
}
|
.new
|
||||||
Values { numbers, strings } => {
|
.geo_fields_ids
|
||||||
insert_numbers_diff(vec![], numbers)?;
|
.map_or(false, |(lat, lng)| field_id == lat || field_id == lng);
|
||||||
insert_strings_diff(vec![], strings)?;
|
let del_filterable_values =
|
||||||
}
|
del_value.map(|value| extract_facet_values(&value, del_geo_support));
|
||||||
},
|
let add_filterable_values =
|
||||||
(Some(del_filterable_values), Some(add_filterable_values)) => {
|
add_value.map(|value| extract_facet_values(&value, add_geo_support));
|
||||||
match (del_filterable_values, add_filterable_values) {
|
|
||||||
(Null, Null) | (Empty, Empty) => (),
|
// Those closures are just here to simplify things a bit.
|
||||||
(Null, Empty) => {
|
let mut insert_numbers_diff = |del_numbers, add_numbers| {
|
||||||
del_is_null.insert(document);
|
insert_numbers_diff(
|
||||||
add_is_empty.insert(document);
|
&mut fid_docid_facet_numbers_sorter,
|
||||||
}
|
&mut numbers_key_buffer,
|
||||||
(Empty, Null) => {
|
del_numbers,
|
||||||
del_is_empty.insert(document);
|
add_numbers,
|
||||||
add_is_null.insert(document);
|
)
|
||||||
}
|
};
|
||||||
(Null, Values { numbers, strings }) => {
|
let mut insert_strings_diff = |del_strings, add_strings| {
|
||||||
insert_numbers_diff(vec![], numbers)?;
|
insert_strings_diff(
|
||||||
insert_strings_diff(vec![], strings)?;
|
&mut fid_docid_facet_strings_sorter,
|
||||||
|
&mut strings_key_buffer,
|
||||||
|
del_strings,
|
||||||
|
add_strings,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
match (del_filterable_values, add_filterable_values) {
|
||||||
|
(None, None) => (),
|
||||||
|
(Some(del_filterable_values), None) => match del_filterable_values {
|
||||||
|
Null => {
|
||||||
del_is_null.insert(document);
|
del_is_null.insert(document);
|
||||||
}
|
}
|
||||||
(Empty, Values { numbers, strings }) => {
|
Empty => {
|
||||||
insert_numbers_diff(vec![], numbers)?;
|
|
||||||
insert_strings_diff(vec![], strings)?;
|
|
||||||
del_is_empty.insert(document);
|
del_is_empty.insert(document);
|
||||||
}
|
}
|
||||||
(Values { numbers, strings }, Null) => {
|
Values { numbers, strings } => {
|
||||||
add_is_null.insert(document);
|
|
||||||
insert_numbers_diff(numbers, vec![])?;
|
insert_numbers_diff(numbers, vec![])?;
|
||||||
insert_strings_diff(strings, vec![])?;
|
insert_strings_diff(strings, vec![])?;
|
||||||
}
|
}
|
||||||
(Values { numbers, strings }, Empty) => {
|
},
|
||||||
add_is_empty.insert(document);
|
(None, Some(add_filterable_values)) => match add_filterable_values {
|
||||||
insert_numbers_diff(numbers, vec![])?;
|
Null => {
|
||||||
insert_strings_diff(strings, vec![])?;
|
add_is_null.insert(document);
|
||||||
}
|
}
|
||||||
(
|
Empty => {
|
||||||
Values { numbers: del_numbers, strings: del_strings },
|
add_is_empty.insert(document);
|
||||||
Values { numbers: add_numbers, strings: add_strings },
|
}
|
||||||
) => {
|
Values { numbers, strings } => {
|
||||||
insert_numbers_diff(del_numbers, add_numbers)?;
|
insert_numbers_diff(vec![], numbers)?;
|
||||||
insert_strings_diff(del_strings, add_strings)?;
|
insert_strings_diff(vec![], strings)?;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
(Some(del_filterable_values), Some(add_filterable_values)) => {
|
||||||
|
match (del_filterable_values, add_filterable_values) {
|
||||||
|
(Null, Null) | (Empty, Empty) => (),
|
||||||
|
(Null, Empty) => {
|
||||||
|
del_is_null.insert(document);
|
||||||
|
add_is_empty.insert(document);
|
||||||
|
}
|
||||||
|
(Empty, Null) => {
|
||||||
|
del_is_empty.insert(document);
|
||||||
|
add_is_null.insert(document);
|
||||||
|
}
|
||||||
|
(Null, Values { numbers, strings }) => {
|
||||||
|
insert_numbers_diff(vec![], numbers)?;
|
||||||
|
insert_strings_diff(vec![], strings)?;
|
||||||
|
del_is_null.insert(document);
|
||||||
|
}
|
||||||
|
(Empty, Values { numbers, strings }) => {
|
||||||
|
insert_numbers_diff(vec![], numbers)?;
|
||||||
|
insert_strings_diff(vec![], strings)?;
|
||||||
|
del_is_empty.insert(document);
|
||||||
|
}
|
||||||
|
(Values { numbers, strings }, Null) => {
|
||||||
|
add_is_null.insert(document);
|
||||||
|
insert_numbers_diff(numbers, vec![])?;
|
||||||
|
insert_strings_diff(strings, vec![])?;
|
||||||
|
}
|
||||||
|
(Values { numbers, strings }, Empty) => {
|
||||||
|
add_is_empty.insert(document);
|
||||||
|
insert_numbers_diff(numbers, vec![])?;
|
||||||
|
insert_strings_diff(strings, vec![])?;
|
||||||
|
}
|
||||||
|
(
|
||||||
|
Values { numbers: del_numbers, strings: del_strings },
|
||||||
|
Values { numbers: add_numbers, strings: add_strings },
|
||||||
|
) => {
|
||||||
|
insert_numbers_diff(del_numbers, add_numbers)?;
|
||||||
|
insert_strings_diff(del_strings, add_strings)?;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -26,11 +26,8 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
|
|||||||
indexer: GrenadParameters,
|
indexer: GrenadParameters,
|
||||||
settings_diff: &InnerIndexSettingsDiff,
|
settings_diff: &InnerIndexSettingsDiff,
|
||||||
) -> Result<grenad::Reader<BufReader<File>>> {
|
) -> Result<grenad::Reader<BufReader<File>>> {
|
||||||
let any_deletion = settings_diff.old.proximity_precision == ProximityPrecision::ByWord;
|
|
||||||
let any_addition = settings_diff.new.proximity_precision == ProximityPrecision::ByWord;
|
|
||||||
|
|
||||||
// early return if the data shouldn't be deleted nor created.
|
// early return if the data shouldn't be deleted nor created.
|
||||||
if !any_deletion && !any_addition {
|
if settings_diff.settings_update_only && !settings_diff.reindex_proximities() {
|
||||||
let writer = create_writer(
|
let writer = create_writer(
|
||||||
indexer.chunk_compression_type,
|
indexer.chunk_compression_type,
|
||||||
indexer.chunk_compression_level,
|
indexer.chunk_compression_level,
|
||||||
@ -39,8 +36,10 @@ pub fn extract_word_pair_proximity_docids<R: io::Read + io::Seek>(
|
|||||||
return writer_into_reader(writer);
|
return writer_into_reader(writer);
|
||||||
}
|
}
|
||||||
|
|
||||||
let max_memory = indexer.max_memory_by_thread();
|
let any_deletion = settings_diff.old.proximity_precision == ProximityPrecision::ByWord;
|
||||||
|
let any_addition = settings_diff.new.proximity_precision == ProximityPrecision::ByWord;
|
||||||
|
|
||||||
|
let max_memory = indexer.max_memory_by_thread();
|
||||||
let mut word_pair_proximity_docids_sorters: Vec<_> = (1..MAX_DISTANCE)
|
let mut word_pair_proximity_docids_sorters: Vec<_> = (1..MAX_DISTANCE)
|
||||||
.map(|_| {
|
.map(|_| {
|
||||||
create_sorter(
|
create_sorter(
|
||||||
|
@ -369,6 +369,7 @@ where
|
|||||||
|
|
||||||
// Run extraction pipeline in parallel.
|
// Run extraction pipeline in parallel.
|
||||||
pool.install(|| {
|
pool.install(|| {
|
||||||
|
let settings_diff_cloned = settings_diff.clone();
|
||||||
rayon::spawn(move || {
|
rayon::spawn(move || {
|
||||||
let child_span = tracing::trace_span!(target: "indexing::details", parent: ¤t_span, "extract_and_send_grenad_chunks");
|
let child_span = tracing::trace_span!(target: "indexing::details", parent: ¤t_span, "extract_and_send_grenad_chunks");
|
||||||
let _enter = child_span.enter();
|
let _enter = child_span.enter();
|
||||||
@ -398,7 +399,7 @@ where
|
|||||||
pool_params,
|
pool_params,
|
||||||
lmdb_writer_sx.clone(),
|
lmdb_writer_sx.clone(),
|
||||||
primary_key_id,
|
primary_key_id,
|
||||||
settings_diff.clone(),
|
settings_diff_cloned,
|
||||||
max_positions_per_attributes,
|
max_positions_per_attributes,
|
||||||
)
|
)
|
||||||
});
|
});
|
||||||
@ -425,7 +426,7 @@ where
|
|||||||
Err(status) => {
|
Err(status) => {
|
||||||
if let Some(typed_chunks) = chunk_accumulator.pop_longest() {
|
if let Some(typed_chunks) = chunk_accumulator.pop_longest() {
|
||||||
let (docids, is_merged_database) =
|
let (docids, is_merged_database) =
|
||||||
write_typed_chunk_into_index(typed_chunks, self.index, self.wtxn)?;
|
write_typed_chunk_into_index(self.wtxn, self.index, &settings_diff, typed_chunks)?;
|
||||||
if !docids.is_empty() {
|
if !docids.is_empty() {
|
||||||
final_documents_ids |= docids;
|
final_documents_ids |= docids;
|
||||||
let documents_seen_count = final_documents_ids.len();
|
let documents_seen_count = final_documents_ids.len();
|
||||||
|
@ -20,7 +20,10 @@ use super::{IndexDocumentsMethod, IndexerConfig};
|
|||||||
use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
|
use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader};
|
||||||
use crate::error::{Error, InternalError, UserError};
|
use crate::error::{Error, InternalError, UserError};
|
||||||
use crate::index::{db_name, main_key};
|
use crate::index::{db_name, main_key};
|
||||||
use crate::update::del_add::{into_del_add_obkv, DelAdd, DelAddOperation, KvReaderDelAdd};
|
use crate::update::del_add::{
|
||||||
|
into_del_add_obkv, into_del_add_obkv_conditional_operation, DelAdd, DelAddOperation,
|
||||||
|
KvReaderDelAdd,
|
||||||
|
};
|
||||||
use crate::update::index_documents::GrenadParameters;
|
use crate::update::index_documents::GrenadParameters;
|
||||||
use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
|
use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
|
||||||
use crate::update::{AvailableDocumentsIds, UpdateIndexingStep};
|
use crate::update::{AvailableDocumentsIds, UpdateIndexingStep};
|
||||||
@ -805,13 +808,15 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
let mut new_inner_settings = old_inner_settings.clone();
|
let mut new_inner_settings = old_inner_settings.clone();
|
||||||
new_inner_settings.fields_ids_map = fields_ids_map;
|
new_inner_settings.fields_ids_map = fields_ids_map;
|
||||||
|
|
||||||
let settings_diff = InnerIndexSettingsDiff {
|
let embedding_configs_updated = false;
|
||||||
old: old_inner_settings,
|
let settings_update_only = false;
|
||||||
new: new_inner_settings,
|
let settings_diff = InnerIndexSettingsDiff::new(
|
||||||
|
old_inner_settings,
|
||||||
|
new_inner_settings,
|
||||||
primary_key_id,
|
primary_key_id,
|
||||||
embedding_configs_updated: false,
|
embedding_configs_updated,
|
||||||
settings_update_only: false,
|
settings_update_only,
|
||||||
};
|
);
|
||||||
|
|
||||||
Ok(TransformOutput {
|
Ok(TransformOutput {
|
||||||
primary_key,
|
primary_key,
|
||||||
@ -840,14 +845,6 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
// Always keep the primary key.
|
// Always keep the primary key.
|
||||||
let is_primary_key = |id: FieldId| -> bool { settings_diff.primary_key_id == Some(id) };
|
let is_primary_key = |id: FieldId| -> bool { settings_diff.primary_key_id == Some(id) };
|
||||||
|
|
||||||
// If only the `searchableAttributes` has been changed, keep only the searchable fields.
|
|
||||||
let must_reindex_searchables = settings_diff.reindex_searchable();
|
|
||||||
let necessary_searchable_field = |id: FieldId| -> bool {
|
|
||||||
must_reindex_searchables
|
|
||||||
&& (settings_diff.old.searchable_fields_ids.contains(&id)
|
|
||||||
|| settings_diff.new.searchable_fields_ids.contains(&id))
|
|
||||||
};
|
|
||||||
|
|
||||||
// If only a faceted field has been added, keep only this field.
|
// If only a faceted field has been added, keep only this field.
|
||||||
let must_reindex_facets = settings_diff.reindex_facets();
|
let must_reindex_facets = settings_diff.reindex_facets();
|
||||||
let necessary_faceted_field = |id: FieldId| -> bool {
|
let necessary_faceted_field = |id: FieldId| -> bool {
|
||||||
@ -862,13 +859,16 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
// we need the fields for the prompt/templating.
|
// we need the fields for the prompt/templating.
|
||||||
let reindex_vectors = settings_diff.reindex_vectors();
|
let reindex_vectors = settings_diff.reindex_vectors();
|
||||||
|
|
||||||
|
// The operations that we must perform on the different fields.
|
||||||
|
let mut operations = HashMap::new();
|
||||||
|
|
||||||
let mut obkv_writer = KvWriter::<_, FieldId>::memory();
|
let mut obkv_writer = KvWriter::<_, FieldId>::memory();
|
||||||
for (id, val) in old_obkv.iter() {
|
for (id, val) in old_obkv.iter() {
|
||||||
if is_primary_key(id)
|
if is_primary_key(id) || necessary_faceted_field(id) || reindex_vectors {
|
||||||
|| necessary_searchable_field(id)
|
operations.insert(id, DelAddOperation::DeletionAndAddition);
|
||||||
|| necessary_faceted_field(id)
|
obkv_writer.insert(id, val)?;
|
||||||
|| reindex_vectors
|
} else if let Some(operation) = settings_diff.reindex_searchable_id(id) {
|
||||||
{
|
operations.insert(id, operation);
|
||||||
obkv_writer.insert(id, val)?;
|
obkv_writer.insert(id, val)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -887,11 +887,9 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
let flattened = flattened.as_deref().map_or(obkv, KvReader::new);
|
let flattened = flattened.as_deref().map_or(obkv, KvReader::new);
|
||||||
|
|
||||||
flattened_obkv_buffer.clear();
|
flattened_obkv_buffer.clear();
|
||||||
into_del_add_obkv(
|
into_del_add_obkv_conditional_operation(flattened, flattened_obkv_buffer, |id| {
|
||||||
flattened,
|
operations.get(&id).copied().unwrap_or(DelAddOperation::DeletionAndAddition)
|
||||||
DelAddOperation::DeletionAndAddition,
|
})?;
|
||||||
flattened_obkv_buffer,
|
|
||||||
)?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
@ -901,6 +899,11 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
/// of the index with the attributes reordered accordingly to the `FieldsIdsMap` given as argument.
|
/// of the index with the attributes reordered accordingly to the `FieldsIdsMap` given as argument.
|
||||||
///
|
///
|
||||||
// TODO this can be done in parallel by using the rayon `ThreadPool`.
|
// TODO this can be done in parallel by using the rayon `ThreadPool`.
|
||||||
|
#[tracing::instrument(
|
||||||
|
level = "trace"
|
||||||
|
skip(self, wtxn, settings_diff),
|
||||||
|
target = "indexing::documents"
|
||||||
|
)]
|
||||||
pub fn prepare_for_documents_reindexing(
|
pub fn prepare_for_documents_reindexing(
|
||||||
self,
|
self,
|
||||||
wtxn: &mut heed::RwTxn<'i>,
|
wtxn: &mut heed::RwTxn<'i>,
|
||||||
|
@ -7,7 +7,7 @@ use bytemuck::allocation::pod_collect_to_vec;
|
|||||||
use charabia::{Language, Script};
|
use charabia::{Language, Script};
|
||||||
use grenad::{Merger, MergerBuilder};
|
use grenad::{Merger, MergerBuilder};
|
||||||
use heed::types::Bytes;
|
use heed::types::Bytes;
|
||||||
use heed::RwTxn;
|
use heed::{BytesDecode, RwTxn};
|
||||||
use obkv::{KvReader, KvWriter};
|
use obkv::{KvReader, KvWriter};
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
|
|
||||||
@ -20,13 +20,16 @@ use super::MergeFn;
|
|||||||
use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind};
|
use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind};
|
||||||
use crate::facet::FacetType;
|
use crate::facet::FacetType;
|
||||||
use crate::index::db_name::DOCUMENTS;
|
use crate::index::db_name::DOCUMENTS;
|
||||||
|
use crate::proximity::MAX_DISTANCE;
|
||||||
use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvReaderDelAdd};
|
use crate::update::del_add::{deladd_serialize_add_side, DelAdd, KvReaderDelAdd};
|
||||||
use crate::update::facet::FacetsUpdate;
|
use crate::update::facet::FacetsUpdate;
|
||||||
use crate::update::index_documents::helpers::{
|
use crate::update::index_documents::helpers::{
|
||||||
as_cloneable_grenad, keep_latest_obkv, try_split_array_at,
|
as_cloneable_grenad, keep_latest_obkv, try_split_array_at,
|
||||||
};
|
};
|
||||||
|
use crate::update::settings::InnerIndexSettingsDiff;
|
||||||
use crate::{
|
use crate::{
|
||||||
lat_lng_to_xyz, DocumentId, FieldId, GeoPoint, Index, InternalError, Result, SerializationError,
|
lat_lng_to_xyz, CboRoaringBitmapCodec, DocumentId, FieldId, GeoPoint, Index, InternalError,
|
||||||
|
Result, SerializationError, U8StrStrCodec,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// This struct accumulates and group the TypedChunks
|
/// This struct accumulates and group the TypedChunks
|
||||||
@ -122,9 +125,10 @@ impl TypedChunk {
|
|||||||
/// Return new documents seen.
|
/// Return new documents seen.
|
||||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::write_db")]
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::write_db")]
|
||||||
pub(crate) fn write_typed_chunk_into_index(
|
pub(crate) fn write_typed_chunk_into_index(
|
||||||
typed_chunks: Vec<TypedChunk>,
|
|
||||||
index: &Index,
|
|
||||||
wtxn: &mut RwTxn,
|
wtxn: &mut RwTxn,
|
||||||
|
index: &Index,
|
||||||
|
settings_diff: &InnerIndexSettingsDiff,
|
||||||
|
typed_chunks: Vec<TypedChunk>,
|
||||||
) -> Result<(RoaringBitmap, bool)> {
|
) -> Result<(RoaringBitmap, bool)> {
|
||||||
let mut is_merged_database = false;
|
let mut is_merged_database = false;
|
||||||
match typed_chunks[0] {
|
match typed_chunks[0] {
|
||||||
@ -485,13 +489,22 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||||||
}
|
}
|
||||||
let merger = builder.build();
|
let merger = builder.build();
|
||||||
|
|
||||||
write_entries_into_database(
|
if settings_diff.only_additional_fields.is_some() {
|
||||||
merger,
|
write_proximity_entries_into_database_additional_searchables(
|
||||||
&index.word_pair_proximity_docids,
|
merger,
|
||||||
wtxn,
|
&index.word_pair_proximity_docids,
|
||||||
deladd_serialize_add_side,
|
wtxn,
|
||||||
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
|
)?;
|
||||||
)?;
|
} else {
|
||||||
|
write_entries_into_database(
|
||||||
|
merger,
|
||||||
|
&index.word_pair_proximity_docids,
|
||||||
|
wtxn,
|
||||||
|
deladd_serialize_add_side,
|
||||||
|
merge_deladd_cbo_roaring_bitmaps_into_cbo_roaring_bitmap,
|
||||||
|
)?;
|
||||||
|
}
|
||||||
|
|
||||||
is_merged_database = true;
|
is_merged_database = true;
|
||||||
}
|
}
|
||||||
TypedChunk::FieldIdDocidFacetNumbers(_) => {
|
TypedChunk::FieldIdDocidFacetNumbers(_) => {
|
||||||
@ -830,3 +843,51 @@ where
|
|||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Akin to the `write_entries_into_database` function but specialized
|
||||||
|
/// for the case when we only index additional searchable fields only.
|
||||||
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::write_db")]
|
||||||
|
fn write_proximity_entries_into_database_additional_searchables<R>(
|
||||||
|
merger: Merger<R, MergeFn>,
|
||||||
|
database: &heed::Database<U8StrStrCodec, CboRoaringBitmapCodec>,
|
||||||
|
wtxn: &mut RwTxn,
|
||||||
|
) -> Result<()>
|
||||||
|
where
|
||||||
|
R: io::Read + io::Seek,
|
||||||
|
{
|
||||||
|
let mut iter = merger.into_stream_merger_iter()?;
|
||||||
|
while let Some((key, value)) = iter.next()? {
|
||||||
|
if valid_lmdb_key(key) {
|
||||||
|
let (proximity_to_insert, word1, word2) =
|
||||||
|
U8StrStrCodec::bytes_decode(key).map_err(heed::Error::Decoding)?;
|
||||||
|
let data_to_insert = match KvReaderDelAdd::new(value).get(DelAdd::Addition) {
|
||||||
|
Some(value) => {
|
||||||
|
CboRoaringBitmapCodec::bytes_decode(value).map_err(heed::Error::Decoding)?
|
||||||
|
}
|
||||||
|
None => continue,
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut data_to_remove = RoaringBitmap::new();
|
||||||
|
for prox in 1..(MAX_DISTANCE as u8) {
|
||||||
|
let key = (prox, word1, word2);
|
||||||
|
let database_value = database.get(wtxn, &key)?.unwrap_or_default();
|
||||||
|
let value = if prox == proximity_to_insert {
|
||||||
|
// Proximity that should be changed.
|
||||||
|
// Union values and remove lower proximity data
|
||||||
|
(&database_value | &data_to_insert) - &data_to_remove
|
||||||
|
} else {
|
||||||
|
// Remove lower proximity data
|
||||||
|
&database_value - &data_to_remove
|
||||||
|
};
|
||||||
|
|
||||||
|
// add the current data in data_to_remove for the next proximities
|
||||||
|
data_to_remove |= &value;
|
||||||
|
|
||||||
|
if database_value != value {
|
||||||
|
database.put(wtxn, &key, &value)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
@ -9,6 +9,7 @@ use itertools::{EitherOrBoth, Itertools};
|
|||||||
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
|
|
||||||
|
use super::del_add::DelAddOperation;
|
||||||
use super::index_documents::{IndexDocumentsConfig, Transform};
|
use super::index_documents::{IndexDocumentsConfig, Transform};
|
||||||
use super::IndexerConfig;
|
use super::IndexerConfig;
|
||||||
use crate::criterion::Criterion;
|
use crate::criterion::Criterion;
|
||||||
@ -1072,13 +1073,14 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
|
|||||||
.index
|
.index
|
||||||
.primary_key(self.wtxn)?
|
.primary_key(self.wtxn)?
|
||||||
.and_then(|name| new_inner_settings.fields_ids_map.id(name));
|
.and_then(|name| new_inner_settings.fields_ids_map.id(name));
|
||||||
let inner_settings_diff = InnerIndexSettingsDiff {
|
let settings_update_only = true;
|
||||||
old: old_inner_settings,
|
let inner_settings_diff = InnerIndexSettingsDiff::new(
|
||||||
new: new_inner_settings,
|
old_inner_settings,
|
||||||
|
new_inner_settings,
|
||||||
primary_key_id,
|
primary_key_id,
|
||||||
embedding_configs_updated,
|
embedding_configs_updated,
|
||||||
settings_update_only: true,
|
settings_update_only,
|
||||||
};
|
);
|
||||||
|
|
||||||
if inner_settings_diff.any_reindexing_needed() {
|
if inner_settings_diff.any_reindexing_needed() {
|
||||||
self.reindex(&progress_callback, &should_abort, inner_settings_diff)?;
|
self.reindex(&progress_callback, &should_abort, inner_settings_diff)?;
|
||||||
@ -1095,21 +1097,104 @@ pub struct InnerIndexSettingsDiff {
|
|||||||
// TODO: compare directly the embedders.
|
// TODO: compare directly the embedders.
|
||||||
pub(crate) embedding_configs_updated: bool,
|
pub(crate) embedding_configs_updated: bool,
|
||||||
pub(crate) settings_update_only: bool,
|
pub(crate) settings_update_only: bool,
|
||||||
|
/// The set of only the additional searchable fields.
|
||||||
|
/// If any other searchable field has been modified, is set to None.
|
||||||
|
pub(crate) only_additional_fields: Option<HashSet<String>>,
|
||||||
|
|
||||||
|
// Cache the check to see if all the stop_words, allowed_separators, dictionary,
|
||||||
|
// exact_attributes, proximity_precision are different.
|
||||||
|
pub(crate) cache_reindex_searchable_without_user_defined: bool,
|
||||||
|
// Cache the check to see if the user_defined_searchables are different.
|
||||||
|
pub(crate) cache_user_defined_searchables: bool,
|
||||||
|
// Cache the check to see if the exact_attributes are different.
|
||||||
|
pub(crate) cache_exact_attributes: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl InnerIndexSettingsDiff {
|
impl InnerIndexSettingsDiff {
|
||||||
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::settings")]
|
||||||
|
pub(crate) fn new(
|
||||||
|
old_settings: InnerIndexSettings,
|
||||||
|
new_settings: InnerIndexSettings,
|
||||||
|
primary_key_id: Option<FieldId>,
|
||||||
|
embedding_configs_updated: bool,
|
||||||
|
settings_update_only: bool,
|
||||||
|
) -> Self {
|
||||||
|
let only_additional_fields = match (
|
||||||
|
&old_settings.user_defined_searchable_fields,
|
||||||
|
&new_settings.user_defined_searchable_fields,
|
||||||
|
) {
|
||||||
|
(None, None) | (Some(_), None) | (None, Some(_)) => None, // None means *
|
||||||
|
(Some(old), Some(new)) => {
|
||||||
|
let old: HashSet<_> = old.iter().cloned().collect();
|
||||||
|
let new: HashSet<_> = new.iter().cloned().collect();
|
||||||
|
if old.difference(&new).next().is_none() {
|
||||||
|
// if no field has been removed return only the additional ones
|
||||||
|
Some(&new - &old).filter(|x| !x.is_empty())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let cache_reindex_searchable_without_user_defined = {
|
||||||
|
old_settings.stop_words.as_ref().map(|set| set.as_fst().as_bytes())
|
||||||
|
!= new_settings.stop_words.as_ref().map(|set| set.as_fst().as_bytes())
|
||||||
|
|| old_settings.allowed_separators != new_settings.allowed_separators
|
||||||
|
|| old_settings.dictionary != new_settings.dictionary
|
||||||
|
|| old_settings.proximity_precision != new_settings.proximity_precision
|
||||||
|
};
|
||||||
|
|
||||||
|
let cache_exact_attributes = old_settings.exact_attributes != new_settings.exact_attributes;
|
||||||
|
|
||||||
|
let cache_user_defined_searchables = old_settings.user_defined_searchable_fields
|
||||||
|
!= new_settings.user_defined_searchable_fields;
|
||||||
|
|
||||||
|
InnerIndexSettingsDiff {
|
||||||
|
old: old_settings,
|
||||||
|
new: new_settings,
|
||||||
|
primary_key_id,
|
||||||
|
embedding_configs_updated,
|
||||||
|
settings_update_only,
|
||||||
|
only_additional_fields,
|
||||||
|
cache_reindex_searchable_without_user_defined,
|
||||||
|
cache_user_defined_searchables,
|
||||||
|
cache_exact_attributes,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn any_reindexing_needed(&self) -> bool {
|
pub fn any_reindexing_needed(&self) -> bool {
|
||||||
self.reindex_searchable() || self.reindex_facets() || self.reindex_vectors()
|
self.reindex_searchable() || self.reindex_facets() || self.reindex_vectors()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn reindex_searchable(&self) -> bool {
|
pub fn reindex_searchable(&self) -> bool {
|
||||||
self.old.stop_words.as_ref().map(|set| set.as_fst().as_bytes())
|
self.cache_reindex_searchable_without_user_defined
|
||||||
!= self.new.stop_words.as_ref().map(|set| set.as_fst().as_bytes())
|
|| self.cache_exact_attributes
|
||||||
|| self.old.allowed_separators != self.new.allowed_separators
|
|| self.cache_user_defined_searchables
|
||||||
|| self.old.dictionary != self.new.dictionary
|
}
|
||||||
|| self.old.user_defined_searchable_fields != self.new.user_defined_searchable_fields
|
|
||||||
|| self.old.exact_attributes != self.new.exact_attributes
|
pub fn reindex_proximities(&self) -> bool {
|
||||||
|| self.old.proximity_precision != self.new.proximity_precision
|
// if any searchable settings force the reindexing
|
||||||
|
(self.cache_reindex_searchable_without_user_defined || self.cache_user_defined_searchables)
|
||||||
|
// and if any settings needs the proximity database created
|
||||||
|
&& (self.old.proximity_precision == ProximityPrecision::ByAttribute
|
||||||
|
|| self.new.proximity_precision == ProximityPrecision::ByAttribute)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn reindex_searchable_id(&self, id: FieldId) -> Option<DelAddOperation> {
|
||||||
|
if self.cache_reindex_searchable_without_user_defined || self.cache_exact_attributes {
|
||||||
|
Some(DelAddOperation::DeletionAndAddition)
|
||||||
|
} else if let Some(only_additional_fields) = &self.only_additional_fields {
|
||||||
|
let additional_field = self.new.fields_ids_map.name(id).unwrap();
|
||||||
|
if only_additional_fields.contains(additional_field) {
|
||||||
|
Some(DelAddOperation::Addition)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
} else if self.cache_user_defined_searchables {
|
||||||
|
Some(DelAddOperation::DeletionAndAddition)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn reindex_facets(&self) -> bool {
|
pub fn reindex_facets(&self) -> bool {
|
||||||
@ -1580,7 +1665,7 @@ mod tests {
|
|||||||
// When we search for something that is not in
|
// When we search for something that is not in
|
||||||
// the searchable fields it must not return any document.
|
// the searchable fields it must not return any document.
|
||||||
let result = index.search(&rtxn).query("23").execute().unwrap();
|
let result = index.search(&rtxn).query("23").execute().unwrap();
|
||||||
assert!(result.documents_ids.is_empty());
|
assert_eq!(result.documents_ids, Vec::<u32>::new());
|
||||||
|
|
||||||
// When we search for something that is in the searchable fields
|
// When we search for something that is in the searchable fields
|
||||||
// we must find the appropriate document.
|
// we must find the appropriate document.
|
||||||
|
Loading…
Reference in New Issue
Block a user