diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 5901e45f8..5704f5354 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -3041,6 +3041,7 @@ mod tests { source: Setting::Set(milli::vector::settings::EmbedderSource::Rest), api_key: Setting::Set(S("My super secret")), url: Setting::Set(S("http://localhost:7777")), + dimensions: Setting::Set(4), ..Default::default() }; embedders.insert(S("default"), Setting::Set(embedding_settings)); diff --git a/index-scheduler/src/snapshots/index_scheduler__tests__settings_update-2.snap b/index-scheduler/src/snapshots/index_scheduler__tests__settings_update-2.snap index 85f0926b9..72a25f915 100644 --- a/index-scheduler/src/snapshots/index_scheduler__tests__settings_update-2.snap +++ b/index-scheduler/src/snapshots/index_scheduler__tests__settings_update-2.snap @@ -7,6 +7,7 @@ expression: task.details "default": { "source": "rest", "apiKey": "MyXXXX...", + "dimensions": 4, "url": "http://localhost:7777" } } diff --git a/index-scheduler/src/snapshots/index_scheduler__tests__settings_update-3.snap b/index-scheduler/src/snapshots/index_scheduler__tests__settings_update-3.snap index 50a42d678..f7ae1c00a 100644 --- a/index-scheduler/src/snapshots/index_scheduler__tests__settings_update-3.snap +++ b/index-scheduler/src/snapshots/index_scheduler__tests__settings_update-3.snap @@ -6,7 +6,7 @@ expression: embedding_config.embedder_options "Rest": { "api_key": "My super secret", "distribution": null, - "dimensions": null, + "dimensions": 4, "url": "http://localhost:7777", "query": null, "input_field": [ diff --git a/index-scheduler/src/snapshots/index_scheduler__tests__settings_update.snap b/index-scheduler/src/snapshots/index_scheduler__tests__settings_update.snap index 85f0926b9..72a25f915 100644 --- a/index-scheduler/src/snapshots/index_scheduler__tests__settings_update.snap +++ b/index-scheduler/src/snapshots/index_scheduler__tests__settings_update.snap @@ -7,6 +7,7 @@ expression: task.details "default": { "source": "rest", "apiKey": "MyXXXX...", + "dimensions": 4, "url": "http://localhost:7777" } } diff --git a/index-scheduler/src/snapshots/lib.rs/test_settings_update/after_registering_settings_task.snap b/index-scheduler/src/snapshots/lib.rs/test_settings_update/after_registering_settings_task.snap index 8c081b84b..f3b94fb3c 100644 --- a/index-scheduler/src/snapshots/lib.rs/test_settings_update/after_registering_settings_task.snap +++ b/index-scheduler/src/snapshots/lib.rs/test_settings_update/after_registering_settings_task.snap @@ -6,7 +6,7 @@ source: index-scheduler/src/lib.rs [] ---------------------------------------------------------------------- ### All Tasks: -0 {uid: 0, status: enqueued, details: { settings: Settings { displayed_attributes: NotSet, searchable_attributes: NotSet, filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: Set({"default": Set(EmbeddingSettings { source: Set(Rest), model: NotSet, revision: NotSet, api_key: Set("My super secret"), dimensions: NotSet, document_template: NotSet, url: Set("http://localhost:7777"), query: NotSet, input_field: NotSet, path_to_embeddings: NotSet, embedding_object: NotSet, input_type: NotSet, distribution: NotSet })}), search_cutoff_ms: NotSet, _kind: PhantomData } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: NotSet, searchable_attributes: NotSet, filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: Set({"default": Set(EmbeddingSettings { source: Set(Rest), model: NotSet, revision: NotSet, api_key: Set("My super secret"), dimensions: NotSet, document_template: NotSet, url: Set("http://localhost:7777"), query: NotSet, input_field: NotSet, path_to_embeddings: NotSet, embedding_object: NotSet, input_type: NotSet, distribution: NotSet })}), search_cutoff_ms: NotSet, _kind: PhantomData }, is_deletion: false, allow_index_creation: true }} +0 {uid: 0, status: enqueued, details: { settings: Settings { displayed_attributes: NotSet, searchable_attributes: NotSet, filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: Set({"default": Set(EmbeddingSettings { source: Set(Rest), model: NotSet, revision: NotSet, api_key: Set("My super secret"), dimensions: Set(4), document_template: NotSet, url: Set("http://localhost:7777"), query: NotSet, input_field: NotSet, path_to_embeddings: NotSet, embedding_object: NotSet, input_type: NotSet, distribution: NotSet })}), search_cutoff_ms: NotSet, _kind: PhantomData } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: NotSet, searchable_attributes: NotSet, filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: Set({"default": Set(EmbeddingSettings { source: Set(Rest), model: NotSet, revision: NotSet, api_key: Set("My super secret"), dimensions: Set(4), document_template: NotSet, url: Set("http://localhost:7777"), query: NotSet, input_field: NotSet, path_to_embeddings: NotSet, embedding_object: NotSet, input_type: NotSet, distribution: NotSet })}), search_cutoff_ms: NotSet, _kind: PhantomData }, is_deletion: false, allow_index_creation: true }} ---------------------------------------------------------------------- ### Status: enqueued [0,] diff --git a/index-scheduler/src/snapshots/lib.rs/test_settings_update/settings_update_processed.snap b/index-scheduler/src/snapshots/lib.rs/test_settings_update/settings_update_processed.snap index f6fb6a186..830331f61 100644 --- a/index-scheduler/src/snapshots/lib.rs/test_settings_update/settings_update_processed.snap +++ b/index-scheduler/src/snapshots/lib.rs/test_settings_update/settings_update_processed.snap @@ -6,7 +6,7 @@ source: index-scheduler/src/lib.rs [] ---------------------------------------------------------------------- ### All Tasks: -0 {uid: 0, status: succeeded, details: { settings: Settings { displayed_attributes: NotSet, searchable_attributes: NotSet, filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: Set({"default": Set(EmbeddingSettings { source: Set(Rest), model: NotSet, revision: NotSet, api_key: Set("My super secret"), dimensions: NotSet, document_template: NotSet, url: Set("http://localhost:7777"), query: NotSet, input_field: NotSet, path_to_embeddings: NotSet, embedding_object: NotSet, input_type: NotSet, distribution: NotSet })}), search_cutoff_ms: NotSet, _kind: PhantomData } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: NotSet, searchable_attributes: NotSet, filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: Set({"default": Set(EmbeddingSettings { source: Set(Rest), model: NotSet, revision: NotSet, api_key: Set("My super secret"), dimensions: NotSet, document_template: NotSet, url: Set("http://localhost:7777"), query: NotSet, input_field: NotSet, path_to_embeddings: NotSet, embedding_object: NotSet, input_type: NotSet, distribution: NotSet })}), search_cutoff_ms: NotSet, _kind: PhantomData }, is_deletion: false, allow_index_creation: true }} +0 {uid: 0, status: succeeded, details: { settings: Settings { displayed_attributes: NotSet, searchable_attributes: NotSet, filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: Set({"default": Set(EmbeddingSettings { source: Set(Rest), model: NotSet, revision: NotSet, api_key: Set("My super secret"), dimensions: Set(4), document_template: NotSet, url: Set("http://localhost:7777"), query: NotSet, input_field: NotSet, path_to_embeddings: NotSet, embedding_object: NotSet, input_type: NotSet, distribution: NotSet })}), search_cutoff_ms: NotSet, _kind: PhantomData } }, kind: SettingsUpdate { index_uid: "doggos", new_settings: Settings { displayed_attributes: NotSet, searchable_attributes: NotSet, filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, non_separator_tokens: NotSet, separator_tokens: NotSet, dictionary: NotSet, synonyms: NotSet, distinct_attribute: NotSet, proximity_precision: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, embedders: Set({"default": Set(EmbeddingSettings { source: Set(Rest), model: NotSet, revision: NotSet, api_key: Set("My super secret"), dimensions: Set(4), document_template: NotSet, url: Set("http://localhost:7777"), query: NotSet, input_field: NotSet, path_to_embeddings: NotSet, embedding_object: NotSet, input_type: NotSet, distribution: NotSet })}), search_cutoff_ms: NotSet, _kind: PhantomData }, is_deletion: false, allow_index_creation: true }} ---------------------------------------------------------------------- ### Status: enqueued [] diff --git a/meilisearch/tests/settings/get_settings.rs b/meilisearch/tests/settings/get_settings.rs index 980ef3064..cd31d4959 100644 --- a/meilisearch/tests/settings/get_settings.rs +++ b/meilisearch/tests/settings/get_settings.rs @@ -113,7 +113,8 @@ async fn secrets_are_hidden_in_settings() { "default": { "source": "rest", "url": "https://localhost:7777", - "apiKey": "My super secret value you will never guess" + "apiKey": "My super secret value you will never guess", + "dimensions": 4, } } })) @@ -184,6 +185,7 @@ async fn secrets_are_hidden_in_settings() { "default": { "source": "rest", "apiKey": "My suXXXXXX...", + "dimensions": 4, "documentTemplate": "{% for field in fields %} {{ field.name }}: {{ field.value }}\n{% endfor %}", "url": "https://localhost:7777", "query": null, @@ -211,6 +213,7 @@ async fn secrets_are_hidden_in_settings() { "default": { "source": "rest", "apiKey": "My suXXXXXX...", + "dimensions": 4, "url": "https://localhost:7777" } } diff --git a/milli/src/index.rs b/milli/src/index.rs index db31c953a..27b273393 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -678,6 +678,23 @@ impl Index { .get(rtxn, main_key::USER_DEFINED_SEARCHABLE_FIELDS_KEY) } + /// Identical to `user_defined_searchable_fields`, but returns ids instead. + pub fn user_defined_searchable_fields_ids(&self, rtxn: &RoTxn) -> Result>> { + match self.user_defined_searchable_fields(rtxn)? { + Some(fields) => { + let fields_ids_map = self.fields_ids_map(rtxn)?; + let mut fields_ids = Vec::new(); + for name in fields { + if let Some(field_id) = fields_ids_map.id(name) { + fields_ids.push(field_id); + } + } + Ok(Some(fields_ids)) + } + None => Ok(None), + } + } + /* filterable fields */ /// Writes the filterable fields names in the database. @@ -824,11 +841,11 @@ impl Index { /// Identical to `user_defined_faceted_fields`, but returns ids instead. pub fn user_defined_faceted_fields_ids(&self, rtxn: &RoTxn) -> Result> { - let fields = self.faceted_fields(rtxn)?; + let fields = self.user_defined_faceted_fields(rtxn)?; let fields_ids_map = self.fields_ids_map(rtxn)?; let mut fields_ids = HashSet::new(); - for name in fields.into_iter() { + for name in fields { if let Some(field_id) = fields_ids_map.id(&name) { fields_ids.insert(field_id); } diff --git a/milli/src/update/del_add.rs b/milli/src/update/del_add.rs index 794beb5df..0288858ed 100644 --- a/milli/src/update/del_add.rs +++ b/milli/src/update/del_add.rs @@ -71,8 +71,8 @@ pub enum DelAddOperation { /// putting each deletion obkv's keys under an DelAdd::Deletion /// and putting each addition obkv's keys under an DelAdd::Addition pub fn del_add_from_two_obkvs( - deletion: obkv::KvReader, - addition: obkv::KvReader, + deletion: &obkv::KvReader, + addition: &obkv::KvReader, buffer: &mut Vec, ) -> Result<(), std::io::Error> { use itertools::merge_join_by; diff --git a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs index dc4886f00..6af5bba6d 100644 --- a/milli/src/update/index_documents/extract/extract_docid_word_positions.rs +++ b/milli/src/update/index_documents/extract/extract_docid_word_positions.rs @@ -1,4 +1,4 @@ -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::convert::TryInto; use std::fs::File; use std::io::BufReader; @@ -12,6 +12,7 @@ use serde_json::Value; use super::helpers::{create_sorter, keep_latest_obkv, sorter_into_reader, GrenadParameters}; use crate::error::{InternalError, SerializationError}; use crate::update::del_add::{del_add_from_two_obkvs, DelAdd, KvReaderDelAdd}; +use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff}; use crate::{FieldId, Result, MAX_POSITION_PER_ATTRIBUTE, MAX_WORD_LENGTH}; pub type ScriptLanguageDocidsMap = HashMap<(Script, Language), (RoaringBitmap, RoaringBitmap)>; @@ -25,10 +26,7 @@ pub type ScriptLanguageDocidsMap = HashMap<(Script, Language), (RoaringBitmap, R pub fn extract_docid_word_positions( obkv_documents: grenad::Reader, indexer: GrenadParameters, - searchable_fields: &Option>, - stop_words: Option<&fst::Set>>, - allowed_separators: Option<&[&str]>, - dictionary: Option<&[&str]>, + settings_diff: &InnerIndexSettingsDiff, max_positions_per_attributes: Option, ) -> Result<(grenad::Reader>, ScriptLanguageDocidsMap)> { puffin::profile_function!(); @@ -36,6 +34,7 @@ pub fn extract_docid_word_positions( let max_positions_per_attributes = max_positions_per_attributes .map_or(MAX_POSITION_PER_ATTRIBUTE, |max| max.min(MAX_POSITION_PER_ATTRIBUTE)); let max_memory = indexer.max_memory_by_thread(); + let force_reindexing = settings_diff.reindex_searchable(); // initialize destination values. let mut documents_ids = RoaringBitmap::new(); @@ -56,8 +55,37 @@ pub fn extract_docid_word_positions( let mut value_buffer = Vec::new(); // initialize tokenizer. - let mut builder = tokenizer_builder(stop_words, allowed_separators, dictionary, None); - let tokenizer = builder.build(); + let old_stop_words = settings_diff.old.stop_words.as_ref(); + let old_separators: Option> = settings_diff + .old + .allowed_separators + .as_ref() + .map(|s| s.iter().map(String::as_str).collect()); + let old_dictionary: Option> = + settings_diff.old.dictionary.as_ref().map(|s| s.iter().map(String::as_str).collect()); + let mut del_builder = tokenizer_builder( + old_stop_words, + old_separators.as_deref(), + old_dictionary.as_deref(), + None, + ); + let del_tokenizer = del_builder.build(); + + let new_stop_words = settings_diff.new.stop_words.as_ref(); + let new_separators: Option> = settings_diff + .new + .allowed_separators + .as_ref() + .map(|s| s.iter().map(String::as_str).collect()); + let new_dictionary: Option> = + settings_diff.new.dictionary.as_ref().map(|s| s.iter().map(String::as_str).collect()); + let mut add_builder = tokenizer_builder( + new_stop_words, + new_separators.as_deref(), + new_dictionary.as_deref(), + None, + ); + let add_tokenizer = add_builder.build(); // iterate over documents. let mut cursor = obkv_documents.into_cursor()?; @@ -69,7 +97,7 @@ pub fn extract_docid_word_positions( let obkv = KvReader::::new(value); // if the searchable fields didn't change, skip the searchable indexing for this document. - if !searchable_fields_changed(&KvReader::::new(value), searchable_fields) { + if !force_reindexing && !searchable_fields_changed(&obkv, settings_diff) { continue; } @@ -85,11 +113,8 @@ pub fn extract_docid_word_positions( // deletions lang_safe_tokens_from_document( &obkv, - searchable_fields, - &tokenizer, - stop_words, - allowed_separators, - dictionary, + &settings_diff.old, + &del_tokenizer, max_positions_per_attributes, DelAdd::Deletion, &mut del_buffers, @@ -99,11 +124,8 @@ pub fn extract_docid_word_positions( // additions lang_safe_tokens_from_document( &obkv, - searchable_fields, - &tokenizer, - stop_words, - allowed_separators, - dictionary, + &settings_diff.new, + &add_tokenizer, max_positions_per_attributes, DelAdd::Addition, &mut add_buffers, @@ -118,8 +140,8 @@ pub fn extract_docid_word_positions( // transforming two KV> into one KV>> value_buffer.clear(); del_add_from_two_obkvs( - KvReader::::new(del_obkv), - KvReader::::new(add_obkv), + &KvReader::::new(del_obkv), + &KvReader::::new(add_obkv), &mut value_buffer, )?; @@ -160,8 +182,9 @@ pub fn extract_docid_word_positions( /// Check if any searchable fields of a document changed. fn searchable_fields_changed( obkv: &KvReader, - searchable_fields: &Option>, + settings_diff: &InnerIndexSettingsDiff, ) -> bool { + let searchable_fields = &settings_diff.new.searchable_fields_ids; for (field_id, field_bytes) in obkv.iter() { if searchable_fields.as_ref().map_or(true, |sf| sf.contains(&field_id)) { let del_add = KvReaderDelAdd::new(field_bytes); @@ -206,14 +229,10 @@ fn tokenizer_builder<'a>( /// Extract words mapped with their positions of a document, /// ensuring no Language detection mistakes was made. -#[allow(clippy::too_many_arguments)] // FIXME: consider grouping arguments in a struct fn lang_safe_tokens_from_document<'a>( obkv: &KvReader, - searchable_fields: &Option>, + settings: &InnerIndexSettings, tokenizer: &Tokenizer, - stop_words: Option<&fst::Set>>, - allowed_separators: Option<&[&str]>, - dictionary: Option<&[&str]>, max_positions_per_attributes: u32, del_add: DelAdd, buffers: &'a mut Buffers, @@ -222,7 +241,7 @@ fn lang_safe_tokens_from_document<'a>( tokens_from_document( obkv, - searchable_fields, + &settings.searchable_fields_ids, tokenizer, max_positions_per_attributes, del_add, @@ -246,12 +265,15 @@ fn lang_safe_tokens_from_document<'a>( // then we don't rerun the extraction. if !script_language.is_empty() { // build a new temporary tokenizer including the allow list. - let mut builder = tokenizer_builder( - stop_words, - allowed_separators, - dictionary, - Some(&script_language), - ); + let stop_words = settings.stop_words.as_ref(); + let separators: Option> = settings + .allowed_separators + .as_ref() + .map(|s| s.iter().map(String::as_str).collect()); + let dictionary: Option> = + settings.dictionary.as_ref().map(|s| s.iter().map(String::as_str).collect()); + let mut builder = + tokenizer_builder(stop_words, separators.as_deref(), dictionary.as_deref(), None); let tokenizer = builder.build(); script_language_word_count.clear(); @@ -259,7 +281,7 @@ fn lang_safe_tokens_from_document<'a>( // rerun the extraction. tokens_from_document( obkv, - searchable_fields, + &settings.searchable_fields_ids, &tokenizer, max_positions_per_attributes, del_add, @@ -276,7 +298,7 @@ fn lang_safe_tokens_from_document<'a>( /// Extract words mapped with their positions of a document. fn tokens_from_document<'a>( obkv: &KvReader, - searchable_fields: &Option>, + searchable_fields: &Option>, tokenizer: &Tokenizer, max_positions_per_attributes: u32, del_add: DelAdd, diff --git a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs index 33def5abd..1848a085f 100644 --- a/milli/src/update/index_documents/extract/extract_facet_number_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_number_docids.rs @@ -10,6 +10,7 @@ use crate::heed_codec::facet::{ FacetGroupKey, FacetGroupKeyCodec, FieldDocIdFacetF64Codec, OrderedF64Codec, }; use crate::update::del_add::{KvReaderDelAdd, KvWriterDelAdd}; +use crate::update::settings::InnerIndexSettingsDiff; use crate::Result; /// Extracts the facet number and the documents ids where this facet number appear. @@ -20,6 +21,7 @@ use crate::Result; pub fn extract_facet_number_docids( fid_docid_facet_number: grenad::Reader, indexer: GrenadParameters, + _settings_diff: &InnerIndexSettingsDiff, ) -> Result>> { puffin::profile_function!(); diff --git a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs index 8fdd11ee7..abffe17ab 100644 --- a/milli/src/update/index_documents/extract/extract_facet_string_docids.rs +++ b/milli/src/update/index_documents/extract/extract_facet_string_docids.rs @@ -15,6 +15,7 @@ use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::index_documents::helpers::{ merge_deladd_btreeset_string, merge_deladd_cbo_roaring_bitmaps, }; +use crate::update::settings::InnerIndexSettingsDiff; use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH}; /// Extracts the facet string and the documents ids where this facet string appear. @@ -25,6 +26,7 @@ use crate::{FieldId, Result, MAX_FACET_VALUE_LENGTH}; pub fn extract_facet_string_docids( docid_fid_facet_string: grenad::Reader, indexer: GrenadParameters, + _settings_diff: &InnerIndexSettingsDiff, ) -> Result<(grenad::Reader>, grenad::Reader>)> { puffin::profile_function!(); diff --git a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs index 1f8af372d..030303cd9 100644 --- a/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs +++ b/milli/src/update/index_documents/extract/extract_fid_docid_facet_values.rs @@ -1,5 +1,5 @@ use std::borrow::Cow; -use std::collections::{BTreeMap, HashSet}; +use std::collections::BTreeMap; use std::convert::TryInto; use std::fs::File; use std::io::{self, BufReader}; @@ -20,6 +20,7 @@ use crate::error::InternalError; use crate::facet::value_encoding::f64_into_bytes; use crate::update::del_add::{DelAdd, KvWriterDelAdd}; use crate::update::index_documents::{create_writer, writer_into_reader}; +use crate::update::settings::InnerIndexSettingsDiff; use crate::{CboRoaringBitmapCodec, DocumentId, Error, FieldId, Result, MAX_FACET_VALUE_LENGTH}; /// The length of the elements that are always in the buffer when inserting new values. @@ -43,7 +44,7 @@ pub struct ExtractedFacetValues { pub fn extract_fid_docid_facet_values( obkv_documents: grenad::Reader, indexer: GrenadParameters, - faceted_fields: &HashSet, + settings_diff: &InnerIndexSettingsDiff, geo_fields_ids: Option<(FieldId, FieldId)>, ) -> Result { puffin::profile_function!(); @@ -82,7 +83,9 @@ pub fn extract_fid_docid_facet_values( let obkv = obkv::KvReader::new(value); for (field_id, field_bytes) in obkv.iter() { - if faceted_fields.contains(&field_id) { + let delete_faceted = settings_diff.old.faceted_fields_ids.contains(&field_id); + let add_faceted = settings_diff.new.faceted_fields_ids.contains(&field_id); + if delete_faceted || add_faceted { numbers_key_buffer.clear(); strings_key_buffer.clear(); @@ -99,11 +102,12 @@ pub fn extract_fid_docid_facet_values( strings_key_buffer.extend_from_slice(docid_bytes); let del_add_obkv = obkv::KvReader::new(field_bytes); - let del_value = match del_add_obkv.get(DelAdd::Deletion) { + let del_value = match del_add_obkv.get(DelAdd::Deletion).filter(|_| delete_faceted) + { Some(bytes) => Some(from_slice(bytes).map_err(InternalError::SerdeJson)?), None => None, }; - let add_value = match del_add_obkv.get(DelAdd::Addition) { + let add_value = match del_add_obkv.get(DelAdd::Addition).filter(|_| add_faceted) { Some(bytes) => Some(from_slice(bytes).map_err(InternalError::SerdeJson)?), None => None, }; diff --git a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs index 305af3630..51e0642da 100644 --- a/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs +++ b/milli/src/update/index_documents/extract/extract_fid_word_count_docids.rs @@ -10,6 +10,7 @@ use super::helpers::{ use crate::error::SerializationError; use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; +use crate::update::settings::InnerIndexSettingsDiff; use crate::Result; const MAX_COUNTED_WORDS: usize = 30; @@ -23,6 +24,7 @@ const MAX_COUNTED_WORDS: usize = 30; pub fn extract_fid_word_count_docids( docid_word_positions: grenad::Reader, indexer: GrenadParameters, + _settings_diff: &InnerIndexSettingsDiff, ) -> Result>> { puffin::profile_function!(); diff --git a/milli/src/update/index_documents/extract/extract_vector_points.rs b/milli/src/update/index_documents/extract/extract_vector_points.rs index 40b32bf9c..23f945c7a 100644 --- a/milli/src/update/index_documents/extract/extract_vector_points.rs +++ b/milli/src/update/index_documents/extract/extract_vector_points.rs @@ -17,8 +17,9 @@ use crate::error::UserError; use crate::prompt::Prompt; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::index_documents::helpers::try_split_at; +use crate::update::settings::InnerIndexSettingsDiff; use crate::vector::Embedder; -use crate::{DocumentId, FieldsIdsMap, InternalError, Result, VectorOrArrayOfVectors}; +use crate::{DocumentId, InternalError, Result, VectorOrArrayOfVectors}; /// The length of the elements that are always in the buffer when inserting new values. const TRUNCATE_SIZE: usize = size_of::(); @@ -71,12 +72,15 @@ impl VectorStateDelta { pub fn extract_vector_points( obkv_documents: grenad::Reader, indexer: GrenadParameters, - field_id_map: &FieldsIdsMap, + settings_diff: &InnerIndexSettingsDiff, prompt: &Prompt, embedder_name: &str, ) -> Result { puffin::profile_function!(); + let old_fields_ids_map = &settings_diff.old.fields_ids_map; + let new_fields_ids_map = &settings_diff.new.fields_ids_map; + // (docid, _index) -> KvWriterDelAdd -> Vector let mut manual_vectors_writer = create_writer( indexer.chunk_compression_type, @@ -98,8 +102,6 @@ pub fn extract_vector_points( tempfile::tempfile()?, ); - let vectors_fid = field_id_map.id("_vectors"); - let mut key_buffer = Vec::new(); let mut cursor = obkv_documents.into_cursor()?; while let Some((key, value)) = cursor.move_on_next()? { @@ -116,15 +118,29 @@ pub fn extract_vector_points( // lazily get it when needed let document_id = || -> Value { from_utf8(external_id_bytes).unwrap().into() }; - let vectors_field = vectors_fid - .and_then(|vectors_fid| obkv.get(vectors_fid)) - .map(KvReaderDelAdd::new) - .map(|obkv| to_vector_maps(obkv, document_id)) - .transpose()?; + // the vector field id may have changed + let old_vectors_fid = old_fields_ids_map.id("_vectors"); + // filter the old vector fid if the settings has been changed forcing reindexing. + let old_vectors_fid = old_vectors_fid.filter(|_| !settings_diff.reindex_vectors()); - let (del_map, add_map) = vectors_field.unzip(); - let del_map = del_map.flatten(); - let add_map = add_map.flatten(); + let new_vectors_fid = new_fields_ids_map.id("_vectors"); + let vectors_field = { + let del = old_vectors_fid + .and_then(|vectors_fid| obkv.get(vectors_fid)) + .map(KvReaderDelAdd::new) + .map(|obkv| to_vector_map(obkv, DelAdd::Deletion, &document_id)) + .transpose()? + .flatten(); + let add = new_vectors_fid + .and_then(|vectors_fid| obkv.get(vectors_fid)) + .map(KvReaderDelAdd::new) + .map(|obkv| to_vector_map(obkv, DelAdd::Addition, &document_id)) + .transpose()? + .flatten(); + (del, add) + }; + + let (del_map, add_map) = vectors_field; let del_value = del_map.and_then(|mut map| map.remove(embedder_name)); let add_value = add_map.and_then(|mut map| map.remove(embedder_name)); @@ -155,7 +171,7 @@ pub fn extract_vector_points( VectorStateDelta::NowGenerated(prompt.render( obkv, DelAdd::Addition, - field_id_map, + new_fields_ids_map, )?) } else { VectorStateDelta::NowRemoved @@ -182,9 +198,10 @@ pub fn extract_vector_points( if document_is_kept { // Don't give up if the old prompt was failing - let old_prompt = - prompt.render(obkv, DelAdd::Deletion, field_id_map).unwrap_or_default(); - let new_prompt = prompt.render(obkv, DelAdd::Addition, field_id_map)?; + let old_prompt = prompt + .render(obkv, DelAdd::Deletion, old_fields_ids_map) + .unwrap_or_default(); + let new_prompt = prompt.render(obkv, DelAdd::Addition, new_fields_ids_map)?; if old_prompt != new_prompt { tracing::trace!( "🚀 Changing prompt from\n{old_prompt}\n===to===\n{new_prompt}" @@ -220,15 +237,6 @@ pub fn extract_vector_points( }) } -fn to_vector_maps( - obkv: KvReaderDelAdd, - document_id: impl Fn() -> Value, -) -> Result<(Option>, Option>)> { - let del = to_vector_map(obkv, DelAdd::Deletion, &document_id)?; - let add = to_vector_map(obkv, DelAdd::Addition, &document_id)?; - Ok((del, add)) -} - fn to_vector_map( obkv: KvReaderDelAdd, side: DelAdd, diff --git a/milli/src/update/index_documents/extract/extract_word_docids.rs b/milli/src/update/index_documents/extract/extract_word_docids.rs index f38701dac..5699f2fb6 100644 --- a/milli/src/update/index_documents/extract/extract_word_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_docids.rs @@ -1,20 +1,23 @@ -use std::collections::{BTreeSet, HashSet}; +use std::collections::BTreeSet; use std::fs::File; use std::io::{self, BufReader}; -use heed::BytesDecode; +use heed::{BytesDecode, BytesEncode}; use obkv::KvReaderU16; +use roaring::RoaringBitmap; use super::helpers::{ - create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, sorter_into_reader, - try_split_array_at, writer_into_reader, GrenadParameters, + create_sorter, create_writer, merge_deladd_cbo_roaring_bitmaps, try_split_array_at, + writer_into_reader, GrenadParameters, }; use crate::error::SerializationError; use crate::heed_codec::StrBEU16Codec; use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::update::del_add::{is_noop_del_add_obkv, DelAdd, KvReaderDelAdd, KvWriterDelAdd}; +use crate::update::index_documents::helpers::sorter_into_reader; +use crate::update::settings::InnerIndexSettingsDiff; use crate::update::MergeFn; -use crate::{DocumentId, FieldId, Result}; +use crate::{CboRoaringBitmapCodec, DocumentId, FieldId, Result}; /// Extracts the word and the documents ids where this word appear. /// @@ -27,7 +30,7 @@ use crate::{DocumentId, FieldId, Result}; pub fn extract_word_docids( docid_word_positions: grenad::Reader, indexer: GrenadParameters, - exact_attributes: &HashSet, + settings_diff: &InnerIndexSettingsDiff, ) -> Result<( grenad::Reader>, grenad::Reader>, @@ -43,7 +46,7 @@ pub fn extract_word_docids( indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, - max_memory.map(|x| x / 3), + max_memory.map(|m| m / 3), ); let mut key_buffer = Vec::new(); let mut del_words = BTreeSet::new(); @@ -85,13 +88,19 @@ pub fn extract_word_docids( add_words.clear(); } + let mut word_fid_docids_writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); + let mut word_docids_sorter = create_sorter( grenad::SortAlgorithm::Unstable, merge_deladd_cbo_roaring_bitmaps, indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, - max_memory.map(|x| x / 3), + max_memory.map(|m| m / 3), ); let mut exact_word_docids_sorter = create_sorter( @@ -100,31 +109,45 @@ pub fn extract_word_docids( indexer.chunk_compression_type, indexer.chunk_compression_level, indexer.max_nb_chunks, - max_memory.map(|x| x / 3), - ); - - let mut word_fid_docids_writer = create_writer( - indexer.chunk_compression_type, - indexer.chunk_compression_level, - tempfile::tempfile()?, + max_memory.map(|m| m / 3), ); let mut iter = word_fid_docids_sorter.into_stream_merger_iter()?; - // TODO: replace sorters by writers by accumulating values into a buffer before inserting them. + let mut buffer = Vec::new(); + // NOTE: replacing sorters by bitmap merging is less efficient, so, use sorters. while let Some((key, value)) = iter.next()? { // only keep the value if their is a change to apply in the DB. if !is_noop_del_add_obkv(KvReaderDelAdd::new(value)) { word_fid_docids_writer.insert(key, value)?; } - let (word, fid) = StrBEU16Codec::bytes_decode(key) + let (w, fid) = StrBEU16Codec::bytes_decode(key) .map_err(|_| SerializationError::Decoding { db_name: Some(DOCID_WORD_POSITIONS) })?; - // every words contained in an attribute set to exact must be pushed in the exact_words list. - if exact_attributes.contains(&fid) { - exact_word_docids_sorter.insert(word.as_bytes(), value)?; - } else { - word_docids_sorter.insert(word.as_bytes(), value)?; + // merge all deletions + let obkv = KvReaderDelAdd::new(value); + if let Some(value) = obkv.get(DelAdd::Deletion) { + let delete_from_exact = settings_diff.old.exact_attributes.contains(&fid); + buffer.clear(); + let mut obkv = KvWriterDelAdd::new(&mut buffer); + obkv.insert(DelAdd::Deletion, value)?; + if delete_from_exact { + exact_word_docids_sorter.insert(w, obkv.into_inner().unwrap())?; + } else { + word_docids_sorter.insert(w, obkv.into_inner().unwrap())?; + } + } + // merge all additions + if let Some(value) = obkv.get(DelAdd::Addition) { + let add_in_exact = settings_diff.new.exact_attributes.contains(&fid); + buffer.clear(); + let mut obkv = KvWriterDelAdd::new(&mut buffer); + obkv.insert(DelAdd::Addition, value)?; + if add_in_exact { + exact_word_docids_sorter.insert(w, obkv.into_inner().unwrap())?; + } else { + word_docids_sorter.insert(w, obkv.into_inner().unwrap())?; + } } } @@ -178,3 +201,45 @@ fn words_into_sorter( Ok(()) } + +#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] +fn docids_into_writers( + word: &str, + deletions: &RoaringBitmap, + additions: &RoaringBitmap, + writer: &mut grenad::Writer, +) -> Result<()> +where + W: std::io::Write, +{ + if deletions == additions { + // if the same value is deleted and added, do nothing. + return Ok(()); + } + + // Write each value in the same KvDelAdd before inserting it in the final writer. + let mut obkv = KvWriterDelAdd::memory(); + // deletions: + if !deletions.is_empty() && !deletions.is_subset(additions) { + obkv.insert( + DelAdd::Deletion, + CboRoaringBitmapCodec::bytes_encode(deletions).map_err(|_| { + SerializationError::Encoding { db_name: Some(DOCID_WORD_POSITIONS) } + })?, + )?; + } + // additions: + if !additions.is_empty() { + obkv.insert( + DelAdd::Addition, + CboRoaringBitmapCodec::bytes_encode(additions).map_err(|_| { + SerializationError::Encoding { db_name: Some(DOCID_WORD_POSITIONS) } + })?, + )?; + } + + // insert everything in the same writer. + writer.insert(word.as_bytes(), obkv.into_inner().unwrap())?; + + Ok(()) +} diff --git a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs index 82a94ce00..23f70ccd2 100644 --- a/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_pair_proximity_docids.rs @@ -11,8 +11,9 @@ use super::helpers::{ }; use crate::error::SerializationError; use crate::index::db_name::DOCID_WORD_POSITIONS; -use crate::proximity::{index_proximity, MAX_DISTANCE}; +use crate::proximity::{index_proximity, ProximityPrecision, MAX_DISTANCE}; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; +use crate::update::settings::InnerIndexSettingsDiff; use crate::{DocumentId, Result}; /// Extracts the best proximity between pairs of words and the documents ids where this pair appear. @@ -23,8 +24,21 @@ use crate::{DocumentId, Result}; pub fn extract_word_pair_proximity_docids( docid_word_positions: grenad::Reader, indexer: GrenadParameters, + settings_diff: &InnerIndexSettingsDiff, ) -> Result>> { puffin::profile_function!(); + let any_deletion = settings_diff.old.proximity_precision == ProximityPrecision::ByWord; + let any_addition = settings_diff.new.proximity_precision == ProximityPrecision::ByWord; + + // early return if the data shouldn't be deleted nor created. + if !any_deletion && !any_addition { + let writer = create_writer( + indexer.chunk_compression_type, + indexer.chunk_compression_level, + tempfile::tempfile()?, + ); + return writer_into_reader(writer); + } let max_memory = indexer.max_memory_by_thread(); @@ -77,6 +91,10 @@ pub fn extract_word_pair_proximity_docids( let (del, add): (Result<_>, Result<_>) = rayon::join( || { + if !any_deletion { + return Ok(()); + } + // deletions if let Some(deletion) = KvReaderDelAdd::new(value).get(DelAdd::Deletion) { for (position, word) in KvReaderU16::new(deletion).iter() { @@ -106,6 +124,10 @@ pub fn extract_word_pair_proximity_docids( Ok(()) }, || { + if !any_addition { + return Ok(()); + } + // additions if let Some(addition) = KvReaderDelAdd::new(value).get(DelAdd::Addition) { for (position, word) in KvReaderU16::new(addition).iter() { diff --git a/milli/src/update/index_documents/extract/extract_word_position_docids.rs b/milli/src/update/index_documents/extract/extract_word_position_docids.rs index 4bc553d9a..45a05b0d0 100644 --- a/milli/src/update/index_documents/extract/extract_word_position_docids.rs +++ b/milli/src/update/index_documents/extract/extract_word_position_docids.rs @@ -11,6 +11,7 @@ use super::helpers::{ use crate::error::SerializationError; use crate::index::db_name::DOCID_WORD_POSITIONS; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; +use crate::update::settings::InnerIndexSettingsDiff; use crate::update::MergeFn; use crate::{bucketed_position, DocumentId, Result}; @@ -22,6 +23,7 @@ use crate::{bucketed_position, DocumentId, Result}; pub fn extract_word_position_docids( docid_word_positions: grenad::Reader, indexer: GrenadParameters, + _settings_diff: &InnerIndexSettingsDiff, ) -> Result>> { puffin::profile_function!(); diff --git a/milli/src/update/index_documents/extract/mod.rs b/milli/src/update/index_documents/extract/mod.rs index 82486f3a8..bc6fe2aff 100644 --- a/milli/src/update/index_documents/extract/mod.rs +++ b/milli/src/update/index_documents/extract/mod.rs @@ -9,9 +9,9 @@ mod extract_word_docids; mod extract_word_pair_proximity_docids; mod extract_word_position_docids; -use std::collections::HashSet; use std::fs::File; use std::io::BufReader; +use std::sync::Arc; use crossbeam_channel::Sender; use rayon::prelude::*; @@ -30,9 +30,8 @@ use self::extract_word_pair_proximity_docids::extract_word_pair_proximity_docids use self::extract_word_position_docids::extract_word_position_docids; use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters}; use super::{helpers, TypedChunk}; -use crate::proximity::ProximityPrecision; -use crate::vector::EmbeddingConfigs; -use crate::{FieldId, FieldsIdsMap, Result}; +use crate::update::settings::InnerIndexSettingsDiff; +use crate::{FieldId, Result}; /// Extract data for each databases from obkv documents in parallel. /// Send data in grenad file over provided Sender. @@ -43,18 +42,10 @@ pub(crate) fn data_from_obkv_documents( flattened_obkv_chunks: impl Iterator>>> + Send, indexer: GrenadParameters, lmdb_writer_sx: Sender>, - searchable_fields: Option>, - faceted_fields: HashSet, primary_key_id: FieldId, geo_fields_ids: Option<(FieldId, FieldId)>, - field_id_map: FieldsIdsMap, - stop_words: Option>>, - allowed_separators: Option<&[&str]>, - dictionary: Option<&[&str]>, + settings_diff: Arc, max_positions_per_attributes: Option, - exact_attributes: HashSet, - proximity_precision: ProximityPrecision, - embedders: EmbeddingConfigs, ) -> Result<()> { puffin::profile_function!(); @@ -67,8 +58,7 @@ pub(crate) fn data_from_obkv_documents( original_documents_chunk, indexer, lmdb_writer_sx.clone(), - field_id_map.clone(), - embedders.clone(), + settings_diff.clone(), ) }) .collect::>() @@ -81,13 +71,9 @@ pub(crate) fn data_from_obkv_documents( flattened_obkv_chunks, indexer, lmdb_writer_sx.clone(), - &searchable_fields, - &faceted_fields, primary_key_id, geo_fields_ids, - &stop_words, - &allowed_separators, - &dictionary, + settings_diff.clone(), max_positions_per_attributes, ) }) @@ -100,13 +86,12 @@ pub(crate) fn data_from_obkv_documents( run_extraction_task::<_, _, grenad::Reader>>( docid_word_positions_chunk.clone(), indexer, + settings_diff.clone(), lmdb_writer_sx.clone(), extract_fid_word_count_docids, TypedChunk::FieldIdWordCountDocids, "field-id-wordcount-docids", ); - - let exact_attributes = exact_attributes.clone(); run_extraction_task::< _, _, @@ -118,10 +103,9 @@ pub(crate) fn data_from_obkv_documents( >( docid_word_positions_chunk.clone(), indexer, + settings_diff.clone(), lmdb_writer_sx.clone(), - move |doc_word_pos, indexer| { - extract_word_docids(doc_word_pos, indexer, &exact_attributes) - }, + extract_word_docids, |( word_docids_reader, exact_word_docids_reader, @@ -139,6 +123,7 @@ pub(crate) fn data_from_obkv_documents( run_extraction_task::<_, _, grenad::Reader>>( docid_word_positions_chunk.clone(), indexer, + settings_diff.clone(), lmdb_writer_sx.clone(), extract_word_position_docids, TypedChunk::WordPositionDocids, @@ -152,6 +137,7 @@ pub(crate) fn data_from_obkv_documents( >( fid_docid_facet_strings_chunk.clone(), indexer, + settings_diff.clone(), lmdb_writer_sx.clone(), extract_facet_string_docids, TypedChunk::FieldIdFacetStringDocids, @@ -161,22 +147,22 @@ pub(crate) fn data_from_obkv_documents( run_extraction_task::<_, _, grenad::Reader>>( fid_docid_facet_numbers_chunk.clone(), indexer, + settings_diff.clone(), lmdb_writer_sx.clone(), extract_facet_number_docids, TypedChunk::FieldIdFacetNumberDocids, "field-id-facet-number-docids", ); - if proximity_precision == ProximityPrecision::ByWord { - run_extraction_task::<_, _, grenad::Reader>>( - docid_word_positions_chunk.clone(), - indexer, - lmdb_writer_sx.clone(), - extract_word_pair_proximity_docids, - TypedChunk::WordPairProximityDocids, - "word-pair-proximity-docids", - ); - } + run_extraction_task::<_, _, grenad::Reader>>( + docid_word_positions_chunk.clone(), + indexer, + settings_diff.clone(), + lmdb_writer_sx.clone(), + extract_word_pair_proximity_docids, + TypedChunk::WordPairProximityDocids, + "word-pair-proximity-docids", + ); } Ok(()) @@ -195,12 +181,17 @@ pub(crate) fn data_from_obkv_documents( fn run_extraction_task( chunk: grenad::Reader, indexer: GrenadParameters, + settings_diff: Arc, lmdb_writer_sx: Sender>, extract_fn: FE, serialize_fn: FS, name: &'static str, ) where - FE: Fn(grenad::Reader, GrenadParameters) -> Result + FE: Fn( + grenad::Reader, + GrenadParameters, + &InnerIndexSettingsDiff, + ) -> Result + Sync + Send + 'static, @@ -213,7 +204,7 @@ fn run_extraction_task( let child_span = tracing::trace_span!(target: "indexing::extract::details", parent: ¤t_span, "extract_multiple_chunks"); let _entered = child_span.enter(); puffin::profile_scope!("extract_multiple_chunks", name); - match extract_fn(chunk, indexer) { + match extract_fn(chunk, indexer, &settings_diff) { Ok(chunk) => { let _ = lmdb_writer_sx.send(Ok(serialize_fn(chunk))); } @@ -230,8 +221,7 @@ fn send_original_documents_data( original_documents_chunk: Result>>, indexer: GrenadParameters, lmdb_writer_sx: Sender>, - field_id_map: FieldsIdsMap, - embedders: EmbeddingConfigs, + settings_diff: Arc, ) -> Result<()> { let original_documents_chunk = original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; @@ -244,50 +234,53 @@ fn send_original_documents_data( .thread_name(|index| format!("embedding-request-{index}")) .build()?; - rayon::spawn(move || { - for (name, (embedder, prompt)) in embedders { - let result = extract_vector_points( - documents_chunk_cloned.clone(), - indexer, - &field_id_map, - &prompt, - &name, - ); - match result { - Ok(ExtractedVectorPoints { manual_vectors, remove_vectors, prompts }) => { - let embeddings = match extract_embeddings( - prompts, - indexer, - embedder.clone(), - &request_threads, - ) { - Ok(results) => Some(results), - Err(error) => { - let _ = lmdb_writer_sx_cloned.send(Err(error)); - None - } - }; + if settings_diff.reindex_vectors() || !settings_diff.settings_update_only() { + let settings_diff = settings_diff.clone(); + rayon::spawn(move || { + for (name, (embedder, prompt)) in settings_diff.new.embedding_configs.clone() { + let result = extract_vector_points( + documents_chunk_cloned.clone(), + indexer, + &settings_diff, + &prompt, + &name, + ); + match result { + Ok(ExtractedVectorPoints { manual_vectors, remove_vectors, prompts }) => { + let embeddings = match extract_embeddings( + prompts, + indexer, + embedder.clone(), + &request_threads, + ) { + Ok(results) => Some(results), + Err(error) => { + let _ = lmdb_writer_sx_cloned.send(Err(error)); + None + } + }; - if !(remove_vectors.is_empty() - && manual_vectors.is_empty() - && embeddings.as_ref().map_or(true, |e| e.is_empty())) - { - let _ = lmdb_writer_sx_cloned.send(Ok(TypedChunk::VectorPoints { - remove_vectors, - embeddings, - expected_dimension: embedder.dimensions(), - manual_vectors, - embedder_name: name, - })); + if !(remove_vectors.is_empty() + && manual_vectors.is_empty() + && embeddings.as_ref().map_or(true, |e| e.is_empty())) + { + let _ = lmdb_writer_sx_cloned.send(Ok(TypedChunk::VectorPoints { + remove_vectors, + embeddings, + expected_dimension: embedder.dimensions(), + manual_vectors, + embedder_name: name, + })); + } + } + + Err(error) => { + let _ = lmdb_writer_sx_cloned.send(Err(error)); } } - - Err(error) => { - let _ = lmdb_writer_sx_cloned.send(Err(error)); - } } - } - }); + }); + } // TODO: create a custom internal error let _ = lmdb_writer_sx.send(Ok(TypedChunk::Documents(original_documents_chunk))); @@ -306,13 +299,9 @@ fn send_and_extract_flattened_documents_data( flattened_documents_chunk: Result>>, indexer: GrenadParameters, lmdb_writer_sx: Sender>, - searchable_fields: &Option>, - faceted_fields: &HashSet, primary_key_id: FieldId, geo_fields_ids: Option<(FieldId, FieldId)>, - stop_words: &Option>>, - allowed_separators: &Option<&[&str]>, - dictionary: &Option<&[&str]>, + settings_diff: Arc, max_positions_per_attributes: Option, ) -> Result<( grenad::Reader, @@ -341,10 +330,7 @@ fn send_and_extract_flattened_documents_data( extract_docid_word_positions( flattened_documents_chunk.clone(), indexer, - searchable_fields, - stop_words.as_ref(), - *allowed_separators, - *dictionary, + &settings_diff, max_positions_per_attributes, )?; @@ -367,7 +353,7 @@ fn send_and_extract_flattened_documents_data( } = extract_fid_docid_facet_values( flattened_documents_chunk.clone(), indexer, - faceted_fields, + &settings_diff, geo_fields_ids, )?; diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index d534661da..aa9789a1a 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -6,9 +6,9 @@ mod typed_chunk; use std::collections::{HashMap, HashSet}; use std::io::{Read, Seek}; -use std::iter::FromIterator; use std::num::NonZeroU32; use std::result::Result as StdResult; +use std::sync::Arc; use crossbeam_channel::{Receiver, Sender}; use grenad::{Merger, MergerBuilder}; @@ -259,21 +259,6 @@ where .expect("Invalid document addition state") .output_from_sorter(self.wtxn, &self.progress)?; - let new_facets = output.compute_real_facets(self.wtxn, self.index)?; - self.index.put_faceted_fields(self.wtxn, &new_facets)?; - - // in case new fields were introduced we're going to recreate the searchable fields. - if let Some(faceted_fields) = self.index.user_defined_searchable_fields(self.wtxn)? { - // we can't keep references on the faceted fields while we update the index thus we need to own it. - let faceted_fields: Vec = - faceted_fields.into_iter().map(str::to_string).collect(); - self.index.put_all_searchable_fields_from_fields_ids_map( - self.wtxn, - &faceted_fields.iter().map(String::as_ref).collect::>(), - &output.fields_ids_map, - )?; - } - let indexed_documents = output.documents_count as u64; let number_of_documents = self.execute_raw(output)?; @@ -296,16 +281,19 @@ where let TransformOutput { primary_key, - fields_ids_map, + mut settings_diff, field_distribution, documents_count, original_documents, flattened_documents, } = output; - // The fields_ids_map is put back to the store now so the rest of the transaction sees an - // up to date field map. - self.index.put_fields_ids_map(self.wtxn, &fields_ids_map)?; + // update the internal facet and searchable list, + // because they might have changed due to the nested documents flattening. + settings_diff.new.recompute_facets(self.wtxn, self.index)?; + settings_diff.new.recompute_searchables(self.wtxn, self.index)?; + + let settings_diff = Arc::new(settings_diff); let backup_pool; let pool = match self.indexer_config.thread_pool { @@ -333,13 +321,8 @@ where ) = crossbeam_channel::unbounded(); // get the primary key field id - let primary_key_id = fields_ids_map.id(&primary_key).unwrap(); + let primary_key_id = settings_diff.new.fields_ids_map.id(&primary_key).unwrap(); - // get searchable fields for word databases - let searchable_fields = - self.index.searchable_fields_ids(self.wtxn)?.map(HashSet::from_iter); - // get filterable fields for facet databases - let faceted_fields = self.index.faceted_fields_ids(self.wtxn)?; // get the fid of the `_geo.lat` and `_geo.lng` fields. let mut field_id_map = self.index.fields_ids_map(self.wtxn)?; @@ -362,12 +345,6 @@ where None => None, }; - let stop_words = self.index.stop_words(self.wtxn)?; - let separators = self.index.allowed_separators(self.wtxn)?; - let dictionary = self.index.dictionary(self.wtxn)?; - let exact_attributes = self.index.exact_attributes_ids(self.wtxn)?; - let proximity_precision = self.index.proximity_precision(self.wtxn)?.unwrap_or_default(); - let pool_params = GrenadParameters { chunk_compression_type: self.indexer_config.chunk_compression_type, chunk_compression_level: self.indexer_config.chunk_compression_level, @@ -400,8 +377,6 @@ where let max_positions_per_attributes = self.indexer_config.max_positions_per_attributes; - let cloned_embedder = self.embedders.clone(); - let mut final_documents_ids = RoaringBitmap::new(); let mut databases_seen = 0; let mut word_position_docids = None; @@ -410,7 +385,6 @@ where let mut exact_word_docids = None; let mut chunk_accumulator = ChunkAccumulator::default(); let mut dimension = HashMap::new(); - let stop_words = stop_words.map(|sw| sw.map_data(Vec::from).unwrap()); let current_span = tracing::Span::current(); @@ -428,10 +402,6 @@ where let flattened_chunk_iter = grenad_obkv_into_chunks(flattened_documents, pool_params, documents_chunk_size); - let separators: Option> = - separators.as_ref().map(|x| x.iter().map(String::as_str).collect()); - let dictionary: Option> = - dictionary.as_ref().map(|x| x.iter().map(String::as_str).collect()); let result = original_chunk_iter.and_then(|original_chunk| { let flattened_chunk = flattened_chunk_iter?; // extract all databases from the chunked obkv douments @@ -440,18 +410,10 @@ where flattened_chunk, pool_params, lmdb_writer_sx.clone(), - searchable_fields, - faceted_fields, primary_key_id, geo_fields_ids, - field_id_map, - stop_words, - separators.as_deref(), - dictionary.as_deref(), + settings_diff.clone(), max_positions_per_attributes, - exact_attributes, - proximity_precision, - cloned_embedder, ) }); diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index e5392092f..8a3463e6f 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -1,12 +1,11 @@ use std::borrow::Cow; use std::collections::btree_map::Entry as BEntry; use std::collections::hash_map::Entry as HEntry; -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use std::fs::File; use std::io::{Read, Seek}; use fxhash::FxHashMap; -use heed::RoTxn; use itertools::Itertools; use obkv::{KvReader, KvReaderU16, KvWriter}; use roaring::RoaringBitmap; @@ -21,14 +20,17 @@ use super::{IndexDocumentsMethod, IndexerConfig}; use crate::documents::{DocumentsBatchIndex, EnrichedDocument, EnrichedDocumentsBatchReader}; use crate::error::{Error, InternalError, UserError}; use crate::index::{db_name, main_key}; -use crate::update::del_add::{into_del_add_obkv, DelAdd, DelAddOperation, KvReaderDelAdd}; +use crate::update::del_add::{ + del_add_from_two_obkvs, into_del_add_obkv, DelAdd, DelAddOperation, KvReaderDelAdd, +}; use crate::update::index_documents::GrenadParameters; -use crate::update::{AvailableDocumentsIds, ClearDocuments, UpdateIndexingStep}; +use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff}; +use crate::update::{AvailableDocumentsIds, UpdateIndexingStep}; use crate::{FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result}; pub struct TransformOutput { pub primary_key: String, - pub fields_ids_map: FieldsIdsMap, + pub settings_diff: InnerIndexSettingsDiff, pub field_distribution: FieldDistribution, pub documents_count: usize, pub original_documents: File, @@ -282,7 +284,9 @@ impl<'a, 'i> Transform<'a, 'i> { self.original_sorter .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; let base_obkv = KvReader::new(base_obkv); - if let Some(flattened_obkv) = self.flatten_from_fields_ids_map(base_obkv)? { + if let Some(flattened_obkv) = + Self::flatten_from_fields_ids_map(&base_obkv, &mut self.fields_ids_map)? + { // we recreate our buffer with the flattened documents document_sorter_value_buffer.clear(); document_sorter_value_buffer.push(Operation::Addition as u8); @@ -315,7 +319,9 @@ impl<'a, 'i> Transform<'a, 'i> { .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; let flattened_obkv = KvReader::new(&obkv_buffer); - if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? { + if let Some(obkv) = + Self::flatten_from_fields_ids_map(&flattened_obkv, &mut self.fields_ids_map)? + { document_sorter_value_buffer.clear(); document_sorter_value_buffer.push(Operation::Addition as u8); into_del_add_obkv( @@ -524,7 +530,9 @@ impl<'a, 'i> Transform<'a, 'i> { // flatten it and push it as to delete in the flattened_sorter let flattened_obkv = KvReader::new(base_obkv); - if let Some(obkv) = self.flatten_from_fields_ids_map(flattened_obkv)? { + if let Some(obkv) = + Self::flatten_from_fields_ids_map(&flattened_obkv, &mut self.fields_ids_map)? + { // we recreate our buffer with the flattened documents document_sorter_value_buffer.clear(); document_sorter_value_buffer.push(Operation::Deletion as u8); @@ -541,8 +549,15 @@ impl<'a, 'i> Transform<'a, 'i> { // Flatten a document from the fields ids map contained in self and insert the new // created fields. Returns `None` if the document doesn't need to be flattened. - #[tracing::instrument(level = "trace", skip(self, obkv), target = "indexing::transform")] - fn flatten_from_fields_ids_map(&mut self, obkv: KvReader) -> Result>> { + #[tracing::instrument( + level = "trace", + skip(obkv, fields_ids_map), + target = "indexing::transform" + )] + fn flatten_from_fields_ids_map( + obkv: &KvReader, + fields_ids_map: &mut FieldsIdsMap, + ) -> Result>> { if obkv .iter() .all(|(_, value)| !json_depth_checker::should_flatten_from_unchecked_slice(value)) @@ -563,7 +578,7 @@ impl<'a, 'i> Transform<'a, 'i> { // all the raw values get inserted directly in the `key_value` vec. for (key, value) in obkv.iter() { if json_depth_checker::should_flatten_from_unchecked_slice(value) { - let key = self.fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId { + let key = fields_ids_map.name(key).ok_or(FieldIdMapMissingEntry::FieldId { field_id: key, process: "Flatten from fields ids map.", })?; @@ -581,7 +596,7 @@ impl<'a, 'i> Transform<'a, 'i> { // Once we have the flattened version we insert all the new generated fields_ids // (if any) in the fields ids map and serialize the value. for (key, value) in flattened.into_iter() { - let fid = self.fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?; + let fid = fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?; let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?; key_value.push((fid, value.into())); } @@ -792,9 +807,19 @@ impl<'a, 'i> Transform<'a, 'i> { fst_new_external_documents_ids_builder.insert(key, value) })?; + let old_inner_settings = InnerIndexSettings::from_index(self.index, wtxn)?; + let mut new_inner_settings = old_inner_settings.clone(); + new_inner_settings.fields_ids_map = self.fields_ids_map; + let settings_diff = InnerIndexSettingsDiff { + old: old_inner_settings, + new: new_inner_settings, + embedding_configs_updated: false, + settings_update_only: false, + }; + Ok(TransformOutput { primary_key, - fields_ids_map: self.fields_ids_map, + settings_diff, field_distribution, documents_count: self.documents_count, original_documents: original_documents.into_inner().map_err(|err| err.into_error())?, @@ -804,6 +829,44 @@ impl<'a, 'i> Transform<'a, 'i> { }) } + /// Rebind the field_ids of the provided document to their values + /// based on the field_ids_maps difference between the old and the new settings, + /// then fill the provided buffers with delta documents using KvWritterDelAdd. + fn rebind_existing_document( + old_obkv: KvReader, + settings_diff: &InnerIndexSettingsDiff, + original_obkv_buffer: &mut Vec, + flattened_obkv_buffer: &mut Vec, + ) -> Result<()> { + let mut old_fields_ids_map = settings_diff.old.fields_ids_map.clone(); + let mut new_fields_ids_map = settings_diff.new.fields_ids_map.clone(); + let mut obkv_writer = KvWriter::<_, FieldId>::memory(); + // We iterate over the new `FieldsIdsMap` ids in order and construct the new obkv. + for (id, name) in new_fields_ids_map.iter() { + if let Some(val) = old_fields_ids_map.id(name).and_then(|id| old_obkv.get(id)) { + obkv_writer.insert(id, val)?; + } + } + let data = obkv_writer.into_inner()?; + let new_obkv = KvReader::::new(&data); + + // take the non-flattened version if flatten_from_fields_ids_map returns None. + let old_flattened = Self::flatten_from_fields_ids_map(&old_obkv, &mut old_fields_ids_map)?; + let old_flattened = + old_flattened.as_deref().map_or_else(|| old_obkv, KvReader::::new); + let new_flattened = Self::flatten_from_fields_ids_map(&new_obkv, &mut new_fields_ids_map)?; + let new_flattened = + new_flattened.as_deref().map_or_else(|| new_obkv, KvReader::::new); + + original_obkv_buffer.clear(); + flattened_obkv_buffer.clear(); + + del_add_from_two_obkvs(&old_obkv, &new_obkv, original_obkv_buffer)?; + del_add_from_two_obkvs(&old_flattened, &new_flattened, flattened_obkv_buffer)?; + + Ok(()) + } + /// Clear all databases. Returns a `TransformOutput` with a file that contains the documents /// of the index with the attributes reordered accordingly to the `FieldsIdsMap` given as argument. /// @@ -811,8 +874,7 @@ impl<'a, 'i> Transform<'a, 'i> { pub fn prepare_for_documents_reindexing( self, wtxn: &mut heed::RwTxn<'i>, - old_fields_ids_map: FieldsIdsMap, - mut new_fields_ids_map: FieldsIdsMap, + settings_diff: InnerIndexSettingsDiff, ) -> Result { // There already has been a document addition, the primary key should be set by now. let primary_key = self @@ -848,78 +910,27 @@ impl<'a, 'i> Transform<'a, 'i> { self.indexer_settings.max_memory.map(|mem| mem / 2), ); - let mut obkv_buffer = Vec::new(); + let mut original_obkv_buffer = Vec::new(); + let mut flattened_obkv_buffer = Vec::new(); let mut document_sorter_key_buffer = Vec::new(); - let mut document_sorter_value_buffer = Vec::new(); for result in self.index.external_documents_ids().iter(wtxn)? { let (external_id, docid) = result?; - let obkv = self.index.documents.get(wtxn, &docid)?.ok_or( + let old_obkv = self.index.documents.get(wtxn, &docid)?.ok_or( InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, )?; - obkv_buffer.clear(); - let mut obkv_writer = KvWriter::<_, FieldId>::new(&mut obkv_buffer); - - // We iterate over the new `FieldsIdsMap` ids in order and construct the new obkv. - for (id, name) in new_fields_ids_map.iter() { - if let Some(val) = old_fields_ids_map.id(name).and_then(|id| obkv.get(id)) { - obkv_writer.insert(id, val)?; - } - } - - let buffer = obkv_writer.into_inner()?; + Self::rebind_existing_document( + old_obkv, + &settings_diff, + &mut original_obkv_buffer, + &mut flattened_obkv_buffer, + )?; document_sorter_key_buffer.clear(); document_sorter_key_buffer.extend_from_slice(&docid.to_be_bytes()); document_sorter_key_buffer.extend_from_slice(external_id.as_bytes()); - document_sorter_value_buffer.clear(); - into_del_add_obkv( - KvReaderU16::new(buffer), - DelAddOperation::Addition, - &mut document_sorter_value_buffer, - )?; - original_sorter.insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; - - // Once we have the document. We're going to flatten it - // and insert it in the flattened sorter. - let mut doc = serde_json::Map::new(); - - let reader = obkv::KvReader::new(buffer); - for (k, v) in reader.iter() { - let key = new_fields_ids_map.name(k).ok_or(FieldIdMapMissingEntry::FieldId { - field_id: k, - process: "Accessing field distribution in transform.", - })?; - let value = serde_json::from_slice::(v) - .map_err(InternalError::SerdeJson)?; - doc.insert(key.to_string(), value); - } - - let flattened = flatten_serde_json::flatten(&doc); - - // Once we have the flattened version we can convert it back to obkv and - // insert all the new generated fields_ids (if any) in the fields ids map. - let mut buffer: Vec = Vec::new(); - let mut writer = KvWriter::new(&mut buffer); - let mut flattened: Vec<_> = flattened.into_iter().collect(); - // we reorder the field to get all the known field first - flattened.sort_unstable_by_key(|(key, _)| { - new_fields_ids_map.id(key).unwrap_or(FieldId::MAX) - }); - - for (key, value) in flattened { - let fid = - new_fields_ids_map.insert(&key).ok_or(UserError::AttributeLimitReached)?; - let value = serde_json::to_vec(&value).map_err(InternalError::SerdeJson)?; - writer.insert(fid, &value)?; - } - document_sorter_value_buffer.clear(); - into_del_add_obkv( - KvReaderU16::new(&buffer), - DelAddOperation::Addition, - &mut document_sorter_value_buffer, - )?; - flattened_sorter.insert(docid.to_be_bytes(), &document_sorter_value_buffer)?; + original_sorter.insert(&document_sorter_key_buffer, &original_obkv_buffer)?; + flattened_sorter.insert(docid.to_be_bytes(), &flattened_obkv_buffer)?; } let grenad_params = GrenadParameters { @@ -934,22 +945,14 @@ impl<'a, 'i> Transform<'a, 'i> { let flattened_documents = sorter_into_reader(flattened_sorter, grenad_params)?; - let output = TransformOutput { + Ok(TransformOutput { primary_key, - fields_ids_map: new_fields_ids_map, field_distribution, + settings_diff, documents_count, original_documents: original_documents.into_inner().into_inner(), flattened_documents: flattened_documents.into_inner().into_inner(), - }; - - let new_facets = output.compute_real_facets(wtxn, self.index)?; - self.index.put_faceted_fields(wtxn, &new_facets)?; - - // We clear the full database (words-fst, documents ids and documents content). - ClearDocuments::new(wtxn, self.index).execute()?; - - Ok(output) + }) } } @@ -964,20 +967,6 @@ fn drop_and_reuse(mut vec: Vec) -> Vec { vec.into_iter().map(|_| unreachable!()).collect() } -impl TransformOutput { - // find and insert the new field ids - pub fn compute_real_facets(&self, rtxn: &RoTxn, index: &Index) -> Result> { - let user_defined_facets = index.user_defined_faceted_fields(rtxn)?; - - Ok(self - .fields_ids_map - .names() - .filter(|&field| crate::is_faceted(field, &user_defined_facets)) - .map(|field| field.to_string()) - .collect()) - } -} - #[cfg(test)] mod test { use super::*; diff --git a/milli/src/update/settings.rs b/milli/src/update/settings.rs index beca4fe51..1997e966e 100644 --- a/milli/src/update/settings.rs +++ b/milli/src/update/settings.rs @@ -20,7 +20,7 @@ use crate::update::index_documents::IndexDocumentsMethod; use crate::update::{IndexDocuments, UpdateIndexingStep}; use crate::vector::settings::{check_set, check_unset, EmbedderSource, EmbeddingSettings}; use crate::vector::{Embedder, EmbeddingConfig, EmbeddingConfigs}; -use crate::{FieldsIdsMap, Index, Result}; +use crate::{FieldId, FieldsIdsMap, Index, Result}; #[derive(Debug, Clone, PartialEq, Eq, Copy)] pub enum Setting { @@ -385,14 +385,14 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { #[tracing::instrument( level = "trace" - skip(self, progress_callback, should_abort, old_fields_ids_map), + skip(self, progress_callback, should_abort, settings_diff), target = "indexing::documents" )] fn reindex( &mut self, progress_callback: &FP, should_abort: &FA, - old_fields_ids_map: FieldsIdsMap, + settings_diff: InnerIndexSettingsDiff, ) -> Result<()> where FP: Fn(UpdateIndexingStep) + Sync, @@ -400,7 +400,6 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { { puffin::profile_function!(); - let fields_ids_map = self.index.fields_ids_map(self.wtxn)?; // if the settings are set before any document update, we don't need to do anything, and // will set the primary key during the first document addition. if self.index.number_of_documents(self.wtxn)? == 0 { @@ -416,14 +415,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { )?; // We clear the databases and remap the documents fields based on the new `FieldsIdsMap`. - let output = transform.prepare_for_documents_reindexing( - self.wtxn, - old_fields_ids_map, - fields_ids_map, - )?; - - let embedder_configs = self.index.embedding_configs(self.wtxn)?; - let embedders = self.embedders(embedder_configs)?; + let output = transform.prepare_for_documents_reindexing(self.wtxn, settings_diff)?; // We index the generated `TransformOutput` which must contain // all the documents with fields in the newly defined searchable order. @@ -436,32 +428,11 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { &should_abort, )?; - let indexing_builder = indexing_builder.with_embedders(embedders); indexing_builder.execute_raw(output)?; Ok(()) } - fn embedders( - &self, - embedding_configs: Vec<(String, EmbeddingConfig)>, - ) -> Result { - let res: Result<_> = embedding_configs - .into_iter() - .map(|(name, EmbeddingConfig { embedder_options, prompt })| { - let prompt = Arc::new(prompt.try_into().map_err(crate::Error::from)?); - - let embedder = Arc::new( - Embedder::new(embedder_options.clone()) - .map_err(crate::vector::Error::from) - .map_err(crate::Error::from)?, - ); - Ok((name, (embedder, prompt))) - }) - .collect(); - res.map(EmbeddingConfigs::new) - } - fn update_displayed(&mut self) -> Result { match self.displayed_fields { Setting::Set(ref fields) => { @@ -1038,6 +1009,13 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { } Setting::NotSet => false, }; + + // if any changes force a reindexing + // clear the vector database. + if update { + self.index.vector_arroy.clear(self.wtxn)?; + } + Ok(update) } @@ -1066,20 +1044,10 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { { self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?; - // Note: this MUST be before `update_sortable` so that we can get the old value to compare with the updated value afterwards - - let existing_fields: HashSet<_> = self - .index - .field_distribution(self.wtxn)? - .into_iter() - .filter_map(|(field, count)| (count != 0).then_some(field)) - .collect(); - let old_faceted_fields = self.index.user_defined_faceted_fields(self.wtxn)?; - let old_fields_ids_map = self.index.fields_ids_map(self.wtxn)?; + let old_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn)?; + // never trigger re-indexing self.update_displayed()?; - self.update_filterable()?; - self.update_sortable()?; self.update_distinct_field()?; self.update_criteria()?; self.update_primary_key()?; @@ -1089,16 +1057,19 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { self.update_max_values_per_facet()?; self.update_sort_facet_values_by()?; self.update_pagination_max_total_hits()?; + self.update_search_cutoff()?; - let faceted_updated = self.update_faceted(existing_fields, old_faceted_fields)?; - let stop_words_updated = self.update_stop_words()?; - let non_separator_tokens_updated = self.update_non_separator_tokens()?; - let separator_tokens_updated = self.update_separator_tokens()?; - let dictionary_updated = self.update_dictionary()?; - let synonyms_updated = self.update_synonyms()?; - let searchable_updated = self.update_searchable()?; - let exact_attributes_updated = self.update_exact_attributes()?; - let proximity_precision = self.update_proximity_precision()?; + // could trigger re-indexing + self.update_filterable()?; + self.update_sortable()?; + self.update_stop_words()?; + self.update_non_separator_tokens()?; + self.update_separator_tokens()?; + self.update_dictionary()?; + self.update_synonyms()?; + self.update_searchable()?; + self.update_exact_attributes()?; + self.update_proximity_precision()?; // TODO: very rough approximation of the needs for reindexing where any change will result in // a full reindexing. // What can be done instead: @@ -1107,53 +1078,193 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { // 3. Keep the old vectors but reattempt indexing on a prompt change: only actually changed prompt will need embedding + storage let embedding_configs_updated = self.update_embedding_configs()?; - // never trigger re-indexing - self.update_search_cutoff()?; + let new_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn)?; + let inner_settings_diff = InnerIndexSettingsDiff { + old: old_inner_settings, + new: new_inner_settings, + embedding_configs_updated, + settings_update_only: true, + }; - if stop_words_updated - || non_separator_tokens_updated - || separator_tokens_updated - || dictionary_updated - || faceted_updated - || synonyms_updated - || searchable_updated - || exact_attributes_updated - || proximity_precision - || embedding_configs_updated - { - self.reindex(&progress_callback, &should_abort, old_fields_ids_map)?; + if inner_settings_diff.any_reindexing_needed() { + self.reindex(&progress_callback, &should_abort, inner_settings_diff)?; } Ok(()) } +} - fn update_faceted( - &self, - existing_fields: HashSet, - old_faceted_fields: HashSet, - ) -> Result { +pub struct InnerIndexSettingsDiff { + pub(crate) old: InnerIndexSettings, + pub(crate) new: InnerIndexSettings, + + // TODO: compare directly the embedders. + pub(crate) embedding_configs_updated: bool, + + pub(crate) settings_update_only: bool, +} + +impl InnerIndexSettingsDiff { + pub fn any_reindexing_needed(&self) -> bool { + self.reindex_searchable() || self.reindex_facets() || self.reindex_vectors() + } + + pub fn reindex_searchable(&self) -> bool { + self.old + .fields_ids_map + .iter() + .zip(self.new.fields_ids_map.iter()) + .any(|(old, new)| old != new) + || self.old.stop_words.as_ref().map(|set| set.as_fst().as_bytes()) + != self.new.stop_words.as_ref().map(|set| set.as_fst().as_bytes()) + || self.old.allowed_separators != self.new.allowed_separators + || self.old.dictionary != self.new.dictionary + || self.old.user_defined_searchable_fields != self.new.user_defined_searchable_fields + || self.old.exact_attributes != self.new.exact_attributes + || self.old.proximity_precision != self.new.proximity_precision + } + + pub fn reindex_facets(&self) -> bool { + let existing_fields = &self.new.existing_fields; if existing_fields.iter().any(|field| field.contains('.')) { - return Ok(true); + return true; } + let old_faceted_fields = &self.old.user_defined_faceted_fields; if old_faceted_fields.iter().any(|field| field.contains('.')) { - return Ok(true); + return true; } // If there is new faceted fields we indicate that we must reindex as we must // index new fields as facets. It means that the distinct attribute, // an Asc/Desc criterion or a filtered attribute as be added or removed. - let new_faceted_fields = self.index.user_defined_faceted_fields(self.wtxn)?; - + let new_faceted_fields = &self.new.user_defined_faceted_fields; if new_faceted_fields.iter().any(|field| field.contains('.')) { - return Ok(true); + return true; } let faceted_updated = - (&existing_fields - &old_faceted_fields) != (&existing_fields - &new_faceted_fields); + (existing_fields - old_faceted_fields) != (existing_fields - new_faceted_fields); - Ok(faceted_updated) + self.old + .fields_ids_map + .iter() + .zip(self.new.fields_ids_map.iter()) + .any(|(old, new)| old != new) + || faceted_updated } + + pub fn reindex_vectors(&self) -> bool { + self.embedding_configs_updated + } + + pub fn settings_update_only(&self) -> bool { + self.settings_update_only + } +} + +#[derive(Clone)] +pub(crate) struct InnerIndexSettings { + pub stop_words: Option>>, + pub allowed_separators: Option>, + pub dictionary: Option>, + pub fields_ids_map: FieldsIdsMap, + pub user_defined_faceted_fields: HashSet, + pub user_defined_searchable_fields: Option>, + pub faceted_fields_ids: HashSet, + pub searchable_fields_ids: Option>, + pub exact_attributes: HashSet, + pub proximity_precision: ProximityPrecision, + pub embedding_configs: EmbeddingConfigs, + pub existing_fields: HashSet, +} + +impl InnerIndexSettings { + pub fn from_index(index: &Index, rtxn: &heed::RoTxn) -> Result { + let stop_words = index.stop_words(rtxn)?; + let stop_words = stop_words.map(|sw| sw.map_data(Vec::from).unwrap()); + let allowed_separators = index.allowed_separators(rtxn)?; + let dictionary = index.dictionary(rtxn)?; + let fields_ids_map = index.fields_ids_map(rtxn)?; + let user_defined_searchable_fields = index.user_defined_searchable_fields(rtxn)?; + let user_defined_searchable_fields = + user_defined_searchable_fields.map(|sf| sf.into_iter().map(String::from).collect()); + let user_defined_faceted_fields = index.user_defined_faceted_fields(rtxn)?; + let searchable_fields_ids = index.searchable_fields_ids(rtxn)?; + let faceted_fields_ids = index.faceted_fields_ids(rtxn)?; + let exact_attributes = index.exact_attributes_ids(rtxn)?; + let proximity_precision = index.proximity_precision(rtxn)?.unwrap_or_default(); + let embedding_configs = embedders(index.embedding_configs(rtxn)?)?; + let existing_fields: HashSet<_> = index + .field_distribution(rtxn)? + .into_iter() + .filter_map(|(field, count)| (count != 0).then_some(field)) + .collect(); + + Ok(Self { + stop_words, + allowed_separators, + dictionary, + fields_ids_map, + user_defined_faceted_fields, + user_defined_searchable_fields, + faceted_fields_ids, + searchable_fields_ids, + exact_attributes, + proximity_precision, + embedding_configs, + existing_fields, + }) + } + + // find and insert the new field ids + pub fn recompute_facets(&mut self, wtxn: &mut heed::RwTxn, index: &Index) -> Result<()> { + let new_facets = self + .fields_ids_map + .names() + .filter(|&field| crate::is_faceted(field, &self.user_defined_faceted_fields)) + .map(|field| field.to_string()) + .collect(); + index.put_faceted_fields(wtxn, &new_facets)?; + + self.faceted_fields_ids = index.faceted_fields_ids(wtxn)?; + Ok(()) + } + + // find and insert the new field ids + pub fn recompute_searchables(&mut self, wtxn: &mut heed::RwTxn, index: &Index) -> Result<()> { + // in case new fields were introduced we're going to recreate the searchable fields. + if let Some(searchable_fields) = self.user_defined_searchable_fields.as_ref() { + let searchable_fields = + searchable_fields.iter().map(String::as_ref).collect::>(); + index.put_all_searchable_fields_from_fields_ids_map( + wtxn, + &searchable_fields, + &self.fields_ids_map, + )?; + let searchable_fields_ids = index.searchable_fields_ids(wtxn)?; + self.searchable_fields_ids = searchable_fields_ids; + } + + Ok(()) + } +} + +fn embedders(embedding_configs: Vec<(String, EmbeddingConfig)>) -> Result { + let res: Result<_> = embedding_configs + .into_iter() + .map(|(name, EmbeddingConfig { embedder_options, prompt })| { + let prompt = Arc::new(prompt.try_into().map_err(crate::Error::from)?); + + let embedder = Arc::new( + Embedder::new(embedder_options.clone()) + .map_err(crate::vector::Error::from) + .map_err(crate::Error::from)?, + ); + Ok((name, (embedder, prompt))) + }) + .collect(); + res.map(EmbeddingConfigs::new) } fn validate_prompt( @@ -1643,6 +1754,70 @@ mod tests { .unwrap() .count(); assert_eq!(count, 4); + + // Set the filterable fields to be the age and the name. + index + .update_settings(|settings| { + settings.set_filterable_fields(hashset! { S("age"), S("name") }); + }) + .unwrap(); + + // Check that the displayed fields are correctly set. + let rtxn = index.read_txn().unwrap(); + let fields_ids = index.filterable_fields(&rtxn).unwrap(); + assert_eq!(fields_ids, hashset! { S("age"), S("name") }); + + let rtxn = index.read_txn().unwrap(); + // Only count the field_id 0 and level 0 facet values. + let count = index + .facet_id_f64_docids + .remap_key_type::() + .prefix_iter(&rtxn, &[0, 1, 0]) + .unwrap() + .count(); + assert_eq!(count, 4); + + let rtxn = index.read_txn().unwrap(); + // Only count the field_id 0 and level 0 facet values. + let count = index + .facet_id_string_docids + .remap_key_type::() + .prefix_iter(&rtxn, &[0, 0]) + .unwrap() + .count(); + assert_eq!(count, 5); + + // Remove the age from the filterable fields. + index + .update_settings(|settings| { + settings.set_filterable_fields(hashset! { S("name") }); + }) + .unwrap(); + + // Check that the displayed fields are correctly set. + let rtxn = index.read_txn().unwrap(); + let fields_ids = index.filterable_fields(&rtxn).unwrap(); + assert_eq!(fields_ids, hashset! { S("name") }); + + let rtxn = index.read_txn().unwrap(); + // Only count the field_id 0 and level 0 facet values. + let count = index + .facet_id_f64_docids + .remap_key_type::() + .prefix_iter(&rtxn, &[0, 1, 0]) + .unwrap() + .count(); + assert_eq!(count, 0); + + let rtxn = index.read_txn().unwrap(); + // Only count the field_id 0 and level 0 facet values. + let count = index + .facet_id_string_docids + .remap_key_type::() + .prefix_iter(&rtxn, &[0, 0]) + .unwrap() + .count(); + assert_eq!(count, 5); } #[test] diff --git a/workloads/settings-add-remove-filters.json b/workloads/settings-add-remove-filters.json index 04a57c707..12493a8fc 100644 --- a/workloads/settings-add-remove-filters.json +++ b/workloads/settings-add-remove-filters.json @@ -1,6 +1,6 @@ { "name": "settings-add-remove-filters.json", - "run_count": 2, + "run_count": 5, "extra_cli_args": [ "--max-indexing-threads=4" ], diff --git a/workloads/settings-proximity-precision.json b/workloads/settings-proximity-precision.json index 48cfad49d..384f99e37 100644 --- a/workloads/settings-proximity-precision.json +++ b/workloads/settings-proximity-precision.json @@ -1,6 +1,6 @@ { "name": "settings-proximity-precision.json", - "run_count": 2, + "run_count": 5, "extra_cli_args": [ "--max-indexing-threads=4" ], diff --git a/workloads/settings-remove-add-swap-searchable.json b/workloads/settings-remove-add-swap-searchable.json index ba315680f..61db8822e 100644 --- a/workloads/settings-remove-add-swap-searchable.json +++ b/workloads/settings-remove-add-swap-searchable.json @@ -1,6 +1,6 @@ { "name": "settings-remove-add-swap-searchable.json", - "run_count": 2, + "run_count": 5, "extra_cli_args": [ "--max-indexing-threads=4" ], diff --git a/workloads/settings-typo.json b/workloads/settings-typo.json index a272e6d1f..45163bc98 100644 --- a/workloads/settings-typo.json +++ b/workloads/settings-typo.json @@ -1,6 +1,6 @@ { "name": "settings-typo.json", - "run_count": 2, + "run_count": 5, "extra_cli_args": [ "--max-indexing-threads=4" ],