diff --git a/crates/benchmarks/benches/indexing.rs b/crates/benchmarks/benches/indexing.rs index 3afad8ee5..16e7a2f81 100644 --- a/crates/benchmarks/benches/indexing.rs +++ b/crates/benchmarks/benches/indexing.rs @@ -65,7 +65,7 @@ fn setup_settings<'t>( let sortable_fields = sortable_fields.iter().map(|s| s.to_string()).collect(); builder.set_sortable_fields(sortable_fields); - builder.execute(|_| (), || false, Default::default()).unwrap(); + builder.execute(&|| false, &Progress::default(), Default::default()).unwrap(); } fn setup_index_with_settings( diff --git a/crates/benchmarks/benches/utils.rs b/crates/benchmarks/benches/utils.rs index 32e844a0b..54bb7e51b 100644 --- a/crates/benchmarks/benches/utils.rs +++ b/crates/benchmarks/benches/utils.rs @@ -90,7 +90,7 @@ pub fn base_setup(conf: &Conf) -> Index { (conf.configure)(&mut builder); - builder.execute(|_| (), || false, Default::default()).unwrap(); + builder.execute(&|| false, &Progress::default(), Default::default()).unwrap(); wtxn.commit().unwrap(); let config = IndexerConfig::default(); diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 5261692b6..e6bf6f713 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -246,8 +246,8 @@ impl IndexScheduler { builder .execute( - |indexing_step| tracing::debug!(update = ?indexing_step), - || must_stop_processing.get(), + &|| must_stop_processing.get(), + &progress, current_batch.embedder_stats.clone(), ) .map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?; diff --git a/crates/index-scheduler/src/scheduler/process_index_operation.rs b/crates/index-scheduler/src/scheduler/process_index_operation.rs index 4c0db9ce4..04aaf9a84 100644 --- a/crates/index-scheduler/src/scheduler/process_index_operation.rs +++ b/crates/index-scheduler/src/scheduler/process_index_operation.rs @@ -474,15 +474,11 @@ impl IndexScheduler { } progress.update_progress(SettingsProgress::ApplyTheSettings); - builder - .execute( - |indexing_step| tracing::debug!(update = ?indexing_step), - || must_stop_processing.get(), - embedder_stats, - ) + let congestion = builder + .execute(&|| must_stop_processing.get(), progress, embedder_stats) .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; - Ok((tasks, None)) + Ok((tasks, congestion)) } IndexOperation::DocumentClearAndSetting { index_uid, diff --git a/crates/index-scheduler/src/scheduler/test_embedders.rs b/crates/index-scheduler/src/scheduler/test_embedders.rs index 772aa1520..305894d0a 100644 --- a/crates/index-scheduler/src/scheduler/test_embedders.rs +++ b/crates/index-scheduler/src/scheduler/test_embedders.rs @@ -399,7 +399,7 @@ fn import_vectors_first_and_embedder_later() { .collect::>(); // the all the vectors linked to the new specified embedder have been removed // Only the unknown embedders stays in the document DB - snapshot!(serde_json::to_string(&documents).unwrap(), @r###"[{"id":0,"doggo":"kefir"},{"id":1,"doggo":"intel","_vectors":{"unknown embedder":[1.0,2.0,3.0]}},{"id":2,"doggo":"max","_vectors":{"unknown embedder":[4.0,5.0]}},{"id":3,"doggo":"marcel"},{"id":4,"doggo":"sora"}]"###); + snapshot!(serde_json::to_string(&documents).unwrap(), @r###"[{"id":0,"doggo":"kefir"},{"id":1,"doggo":"intel","_vectors":{"unknown embedder":[1,2,3]}},{"id":2,"doggo":"max","_vectors":{"unknown embedder":[4,5]}},{"id":3,"doggo":"marcel"},{"id":4,"doggo":"sora"}]"###); let conf = index.embedding_configs(&rtxn).unwrap(); // even though we specified the vector for the ID 3, it shouldn't be marked // as user provided since we explicitely marked it as NOT user provided. @@ -800,7 +800,7 @@ fn delete_embedder_with_user_provided_vectors() { .unwrap() .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .collect::>(); - snapshot!(serde_json::to_string(&documents).unwrap(), @r###"[{"id":0,"doggo":"kefir","_vectors":{"manual":{"embeddings":[[0.0,0.0,0.0]],"regenerate":false}}},{"id":1,"doggo":"intel","_vectors":{"manual":{"embeddings":[[1.0,1.0,1.0]],"regenerate":false}}}]"###); + snapshot!(serde_json::to_string(&documents).unwrap(), @r###"[{"id":0,"doggo":"kefir","_vectors":{"manual":{"regenerate":false,"embeddings":[[0.0,0.0,0.0]]}}},{"id":1,"doggo":"intel","_vectors":{"manual":{"regenerate":false,"embeddings":[[1.0,1.0,1.0]]}}}]"###); } { @@ -835,6 +835,6 @@ fn delete_embedder_with_user_provided_vectors() { .collect::>(); // FIXME: redaction - snapshot!(json_string!(serde_json::to_string(&documents).unwrap(), { "[]._vectors.doggo_embedder.embeddings" => "[vector]" }), @r###""[{\"id\":0,\"doggo\":\"kefir\",\"_vectors\":{\"manual\":{\"embeddings\":[[0.0,0.0,0.0]],\"regenerate\":false},\"my_doggo_embedder\":{\"embeddings\":[[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]],\"regenerate\":false}}},{\"id\":1,\"doggo\":\"intel\",\"_vectors\":{\"manual\":{\"embeddings\":[[1.0,1.0,1.0]],\"regenerate\":false}}}]""###); + snapshot!(json_string!(serde_json::to_string(&documents).unwrap(), { "[]._vectors.doggo_embedder.embeddings" => "[vector]" }), @r###""[{\"id\":0,\"doggo\":\"kefir\",\"_vectors\":{\"manual\":{\"regenerate\":false,\"embeddings\":[[0.0,0.0,0.0]]},\"my_doggo_embedder\":{\"regenerate\":false,\"embeddings\":[[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]]}}},{\"id\":1,\"doggo\":\"intel\",\"_vectors\":{\"manual\":{\"regenerate\":false,\"embeddings\":[[1.0,1.0,1.0]]}}}]""###); } } diff --git a/crates/meilisearch/src/analytics/segment_analytics.rs b/crates/meilisearch/src/analytics/segment_analytics.rs index c7e0634f4..668a7fded 100644 --- a/crates/meilisearch/src/analytics/segment_analytics.rs +++ b/crates/meilisearch/src/analytics/segment_analytics.rs @@ -202,6 +202,7 @@ struct Infos { experimental_composite_embedders: bool, experimental_embedding_cache_entries: usize, experimental_no_snapshot_compaction: bool, + experimental_no_edition_2024_for_settings: bool, gpu_enabled: bool, db_path: bool, import_dump: bool, @@ -286,8 +287,12 @@ impl Infos { ScheduleSnapshot::Enabled(interval) => Some(interval), }; - let IndexerOpts { max_indexing_memory, max_indexing_threads, skip_index_budget: _ } = - indexer_options; + let IndexerOpts { + max_indexing_memory, + max_indexing_threads, + skip_index_budget: _, + experimental_no_edition_2024_for_settings, + } = indexer_options; let RuntimeTogglableFeatures { metrics, @@ -350,6 +355,7 @@ impl Infos { ssl_require_auth, ssl_resumption, ssl_tickets, + experimental_no_edition_2024_for_settings, } } } diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index c902f4e60..871bd688e 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -37,7 +37,7 @@ use index_scheduler::{IndexScheduler, IndexSchedulerOptions}; use meilisearch_auth::{open_auth_store_env, AuthController}; use meilisearch_types::milli::constants::VERSION_MAJOR; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; -use meilisearch_types::milli::progress::EmbedderStats; +use meilisearch_types::milli::progress::{EmbedderStats, Progress}; use meilisearch_types::milli::update::{ default_thread_pool_and_threads, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, }; @@ -464,6 +464,7 @@ fn import_dump( index_scheduler: &mut IndexScheduler, auth: &mut AuthController, ) -> Result<(), anyhow::Error> { + let progress = Progress::default(); let reader = File::open(dump_path)?; let mut dump_reader = dump::DumpReader::open(reader)?; @@ -544,11 +545,7 @@ fn import_dump( let settings = index_reader.settings()?; apply_settings_to_builder(&settings, &mut builder); let embedder_stats: Arc = Default::default(); - builder.execute( - |indexing_step| tracing::debug!("update: {:?}", indexing_step), - || false, - embedder_stats.clone(), - )?; + builder.execute(&|| false, &progress, embedder_stats.clone())?; // 4.3 Import the documents. // 4.3.1 We need to recreate the grenad+obkv format accepted by the index. diff --git a/crates/meilisearch/src/option.rs b/crates/meilisearch/src/option.rs index 5b7d1e52f..9658352c8 100644 --- a/crates/meilisearch/src/option.rs +++ b/crates/meilisearch/src/option.rs @@ -53,6 +53,8 @@ const MEILI_EXPERIMENTAL_DUMPLESS_UPGRADE: &str = "MEILI_EXPERIMENTAL_DUMPLESS_U const MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS: &str = "MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS"; const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE"; const MEILI_EXPERIMENTAL_CONTAINS_FILTER: &str = "MEILI_EXPERIMENTAL_CONTAINS_FILTER"; +const MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_SETTINGS: &str = + "MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_SETTINGS"; const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS"; const MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE: &str = "MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE"; const MEILI_EXPERIMENTAL_DROP_SEARCH_AFTER: &str = "MEILI_EXPERIMENTAL_DROP_SEARCH_AFTER"; @@ -749,12 +751,25 @@ pub struct IndexerOpts { #[clap(skip)] #[serde(skip)] pub skip_index_budget: bool, + + /// Experimental no edition 2024 for settings feature. For more information, + /// see: + /// + /// Enables the experimental no edition 2024 for settings feature. + #[clap(long, env = MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_SETTINGS)] + #[serde(default)] + pub experimental_no_edition_2024_for_settings: bool, } impl IndexerOpts { /// Exports the values to their corresponding env vars if they are not set. pub fn export_to_env(self) { - let IndexerOpts { max_indexing_memory, max_indexing_threads, skip_index_budget: _ } = self; + let IndexerOpts { + max_indexing_memory, + max_indexing_threads, + skip_index_budget: _, + experimental_no_edition_2024_for_settings, + } = self; if let Some(max_indexing_memory) = max_indexing_memory.0 { export_to_env_if_not_present( MEILI_MAX_INDEXING_MEMORY, @@ -767,6 +782,12 @@ impl IndexerOpts { max_indexing_threads.to_string(), ); } + if experimental_no_edition_2024_for_settings { + export_to_env_if_not_present( + MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_SETTINGS, + experimental_no_edition_2024_for_settings.to_string(), + ); + } } } @@ -785,7 +806,12 @@ impl TryFrom<&IndexerOpts> for IndexerConfig { max_threads: *other.max_indexing_threads, max_positions_per_attributes: None, skip_index_budget: other.skip_index_budget, - ..Default::default() + experimental_no_edition_2024_for_settings: other + .experimental_no_edition_2024_for_settings, + chunk_compression_type: Default::default(), + chunk_compression_level: Default::default(), + documents_chunk_size: Default::default(), + max_nb_chunks: Default::default(), }) } } diff --git a/crates/meilisearch/tests/common/server.rs b/crates/meilisearch/tests/common/server.rs index 1f5688a02..4367650c5 100644 --- a/crates/meilisearch/tests/common/server.rs +++ b/crates/meilisearch/tests/common/server.rs @@ -464,6 +464,7 @@ pub fn default_settings(dir: impl AsRef) -> Opt { skip_index_budget: true, // Having 2 threads makes the tests way faster max_indexing_threads: MaxThreads::from_str("2").unwrap(), + experimental_no_edition_2024_for_settings: false, }, experimental_enable_metrics: false, ..Parser::parse_from(None as Option<&str>) diff --git a/crates/milli/src/search/new/tests/integration.rs b/crates/milli/src/search/new/tests/integration.rs index 36917c10e..9e2afca97 100644 --- a/crates/milli/src/search/new/tests/integration.rs +++ b/crates/milli/src/search/new/tests/integration.rs @@ -44,7 +44,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { S("america") => vec![S("the united states")], }); builder.set_searchable_fields(vec![S("title"), S("description")]); - builder.execute(|_| (), || false, Default::default()).unwrap(); + builder.execute(&|| false, &Progress::default(), Default::default()).unwrap(); wtxn.commit().unwrap(); // index documents diff --git a/crates/milli/src/test_index.rs b/crates/milli/src/test_index.rs index d218bb3a6..f2e34c615 100644 --- a/crates/milli/src/test_index.rs +++ b/crates/milli/src/test_index.rs @@ -135,7 +135,7 @@ impl TempIndex { ) -> Result<(), crate::error::Error> { let mut builder = update::Settings::new(wtxn, &self.inner, &self.indexer_config); update(&mut builder); - builder.execute(drop, || false, Default::default())?; + builder.execute(&|| false, &Progress::default(), Default::default())?; Ok(()) } diff --git a/crates/milli/src/update/indexer_config.rs b/crates/milli/src/update/indexer_config.rs index eb7fbd4d5..a0f901818 100644 --- a/crates/milli/src/update/indexer_config.rs +++ b/crates/milli/src/update/indexer_config.rs @@ -15,6 +15,7 @@ pub struct IndexerConfig { pub thread_pool: ThreadPoolNoAbort, pub max_positions_per_attributes: Option, pub skip_index_budget: bool, + pub experimental_no_edition_2024_for_settings: bool, } impl IndexerConfig { @@ -63,6 +64,7 @@ impl Default for IndexerConfig { chunk_compression_level: None, max_positions_per_attributes: None, skip_index_budget: false, + experimental_no_edition_2024_for_settings: false, } } } diff --git a/crates/milli/src/update/new/document.rs b/crates/milli/src/update/new/document.rs index 1ef44fc8d..b07cc0298 100644 --- a/crates/milli/src/update/new/document.rs +++ b/crates/milli/src/update/new/document.rs @@ -1,7 +1,10 @@ +use std::cell::{Cell, RefCell}; use std::collections::{BTreeMap, BTreeSet}; +use std::sync::RwLock; +use bumpalo::Bump; use bumparaw_collections::RawMap; -use heed::RoTxn; +use heed::{RoTxn, WithoutTls}; use rustc_hash::FxBuildHasher; use serde_json::value::RawValue; @@ -9,7 +12,13 @@ use super::vector_document::VectorDocument; use super::{KvReaderFieldId, KvWriterFieldId}; use crate::constants::{RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME}; use crate::documents::FieldIdMapper; -use crate::{DocumentId, GlobalFieldsIdsMap, Index, InternalError, Result, UserError}; +use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; +use crate::update::new::vector_document::VectorDocumentFromDb; +use crate::vector::settings::EmbedderAction; +use crate::{ + DocumentId, FieldIdMapWithMetadata, FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, + Result, UserError, +}; /// A view into a document that can represent either the current version from the DB, /// the update data from payload or other means, or the merged updated version. @@ -309,6 +318,7 @@ where pub fn write_to_obkv<'s, 'a, 'map, 'buffer>( document: &'s impl Document<'s>, vector_document: Option<&'s impl VectorDocument<'s>>, + embedder_actions: &'a BTreeMap, fields_ids_map: &'a mut GlobalFieldsIdsMap<'map>, mut document_buffer: &'a mut bumpalo::collections::Vec<'buffer, u8>, ) -> Result<&'a KvReaderFieldId> @@ -338,20 +348,39 @@ where for res in vector_document.iter_vectors() { let (name, entry) = res?; if entry.has_configured_embedder { - continue; // we don't write vectors with configured embedder in documents + if let Some(action) = embedder_actions.get(name) { + if action.write_back().is_some() && !entry.regenerate { + vectors.insert( + name, + serde_json::json!({ + "regenerate": entry.regenerate, + // TODO: consider optimizing the shape of embedders here to store an array of f32 rather than a JSON object + "embeddings": entry.embeddings, + }), + ); + } + } + } else { + match embedder_actions.get(name) { + Some(action) if action.write_back().is_none() => { + continue; + } + _ => { + vectors.insert( + name, + if entry.implicit { + serde_json::json!(entry.embeddings) + } else { + serde_json::json!({ + "regenerate": entry.regenerate, + // TODO: consider optimizing the shape of embedders here to store an array of f32 rather than a JSON object + "embeddings": entry.embeddings, + }) + }, + ); + } + } } - vectors.insert( - name, - if entry.implicit { - serde_json::json!(entry.embeddings) - } else { - serde_json::json!({ - "regenerate": entry.regenerate, - // TODO: consider optimizing the shape of embedders here to store an array of f32 rather than a JSON object - "embeddings": entry.embeddings, - }) - }, - ); } if vectors.is_empty() { @@ -439,3 +468,127 @@ impl<'doc> Versions<'doc> { self.data.get(k) } } + +pub struct DocumentIdentifiers<'doc> { + docid: DocumentId, + external_document_id: &'doc str, +} + +impl<'doc> DocumentIdentifiers<'doc> { + pub fn create(docid: DocumentId, external_document_id: &'doc str) -> Self { + Self { docid, external_document_id } + } + + pub fn docid(&self) -> DocumentId { + self.docid + } + + pub fn external_document_id(&self) -> &'doc str { + self.external_document_id + } + + pub fn current<'a, Mapper: FieldIdMapper>( + &self, + rtxn: &'a RoTxn, + index: &'a Index, + mapper: &'a Mapper, + ) -> Result> { + Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or( + crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid }, + )?) + } + + pub fn current_vectors<'a, Mapper: FieldIdMapper>( + &self, + rtxn: &'a RoTxn, + index: &'a Index, + mapper: &'a Mapper, + doc_alloc: &'a Bump, + ) -> Result> { + Ok(VectorDocumentFromDb::new(self.docid, index, rtxn, mapper, doc_alloc)?.ok_or( + crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid }, + )?) + } +} + +pub struct DocumentContext< + 'doc, // covariant lifetime of a single `process` call + 'extractor: 'doc, // invariant lifetime of the extractor_allocs + 'fid: 'doc, // invariant lifetime of the new_fields_ids_map + 'indexer: 'doc, // covariant lifetime of objects that outlive a single `process` call + T: MostlySend, +> { + /// The index we're indexing in + pub index: &'indexer Index, + /// The fields ids map as it was at the start of this indexing process. Contains at least all top-level fields from documents + /// inside of the DB. + pub db_fields_ids_map: &'indexer FieldsIdsMap, + /// A transaction providing data from the DB before all indexing operations + pub rtxn: RoTxn<'indexer, WithoutTls>, + + /// Global field id map that is up to date with the current state of the indexing process. + /// + /// - Inserting a field will take a lock + /// - Retrieving a field may take a lock as well + pub new_fields_ids_map: &'doc std::cell::RefCell>, + + /// Data allocated in this allocator is cleared between each call to `process`. + pub doc_alloc: Bump, + + /// Data allocated in this allocator is not cleared between each call to `process`, unless the data spills. + pub extractor_alloc: &'extractor Bump, + + /// Pool of doc allocators, used to retrieve the doc allocator we provided for the documents + pub doc_allocs: &'doc ThreadLocal>>, + + /// Extractor-specific data + pub data: &'doc T, +} + +impl< + 'doc, // covariant lifetime of a single `process` call + 'data: 'doc, // invariant on T lifetime of the datastore + 'extractor: 'doc, // invariant lifetime of extractor_allocs + 'fid: 'doc, // invariant lifetime of fields ids map + 'indexer: 'doc, // covariant lifetime of objects that survive a `process` call + T: MostlySend, + > DocumentContext<'doc, 'extractor, 'fid, 'indexer, T> +{ + #[allow(clippy::too_many_arguments)] + pub fn new( + index: &'indexer Index, + db_fields_ids_map: &'indexer FieldsIdsMap, + new_fields_ids_map: &'fid RwLock, + extractor_allocs: &'extractor ThreadLocal>, + doc_allocs: &'doc ThreadLocal>>, + datastore: &'data ThreadLocal, + fields_ids_map_store: &'doc ThreadLocal>>>, + init_data: F, + ) -> Result + where + F: FnOnce(&'extractor Bump) -> Result, + { + let doc_alloc = + doc_allocs.get_or(|| FullySend(Cell::new(Bump::with_capacity(1024 * 1024)))); + let doc_alloc = doc_alloc.0.take(); + let fields_ids_map = fields_ids_map_store + .get_or(|| RefCell::new(GlobalFieldsIdsMap::new(new_fields_ids_map)).into()); + + let fields_ids_map = &fields_ids_map.0; + let extractor_alloc = extractor_allocs.get_or_default(); + + let data = datastore.get_or_try(move || init_data(&extractor_alloc.0))?; + + let txn = index.read_txn()?; + Ok(DocumentContext { + index, + rtxn: txn, + db_fields_ids_map, + new_fields_ids_map: fields_ids_map, + doc_alloc, + extractor_alloc: &extractor_alloc.0, + data, + doc_allocs, + }) + } +} diff --git a/crates/milli/src/update/new/document_change.rs b/crates/milli/src/update/new/document_change.rs index 8a8ac4bb3..2b9161319 100644 --- a/crates/milli/src/update/new/document_change.rs +++ b/crates/milli/src/update/new/document_change.rs @@ -10,20 +10,16 @@ use super::vector_document::{ }; use crate::attribute_patterns::PatternMatch; use crate::documents::FieldIdMapper; +use crate::update::new::document::DocumentIdentifiers; use crate::vector::EmbeddingConfigs; use crate::{DocumentId, Index, InternalError, Result}; pub enum DocumentChange<'doc> { - Deletion(Deletion<'doc>), + Deletion(DocumentIdentifiers<'doc>), Update(Update<'doc>), Insertion(Insertion<'doc>), } -pub struct Deletion<'doc> { - docid: DocumentId, - external_document_id: &'doc str, -} - pub struct Update<'doc> { docid: DocumentId, external_document_id: &'doc str, @@ -55,31 +51,6 @@ impl<'doc> DocumentChange<'doc> { } } -impl<'doc> Deletion<'doc> { - pub fn create(docid: DocumentId, external_document_id: &'doc str) -> Self { - Self { docid, external_document_id } - } - - pub fn docid(&self) -> DocumentId { - self.docid - } - - pub fn external_document_id(&self) -> &'doc str { - self.external_document_id - } - - pub fn current<'a, Mapper: FieldIdMapper>( - &self, - rtxn: &'a RoTxn, - index: &'a Index, - mapper: &'a Mapper, - ) -> Result> { - Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or( - crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid }, - )?) - } -} - impl<'doc> Insertion<'doc> { pub fn create(docid: DocumentId, external_document_id: &'doc str, new: Versions<'doc>) -> Self { Insertion { docid, external_document_id, new } diff --git a/crates/milli/src/update/new/extract/documents.rs b/crates/milli/src/update/new/extract/documents.rs index d1c92919b..5c1a1927a 100644 --- a/crates/milli/src/update/new/extract/documents.rs +++ b/crates/milli/src/update/new/extract/documents.rs @@ -1,16 +1,24 @@ use std::cell::RefCell; +use std::collections::BTreeMap; use bumpalo::Bump; use hashbrown::HashMap; use super::DelAddRoaringBitmap; use crate::constants::RESERVED_GEO_FIELD_NAME; -use crate::update::new::channel::DocumentsSender; -use crate::update::new::document::{write_to_obkv, Document as _}; -use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor}; +use crate::update::new::channel::{DocumentsSender, ExtractorBbqueueSender}; +use crate::update::new::document::{write_to_obkv, Document}; +use crate::update::new::document::{DocumentContext, DocumentIdentifiers}; +use crate::update::new::indexer::document_changes::{Extractor, IndexingContext}; +use crate::update::new::indexer::settings_changes::{ + settings_change_extract, DocumentsIndentifiers, SettingsChangeExtractor, +}; use crate::update::new::ref_cell_ext::RefCellExt as _; -use crate::update::new::thread_local::FullySend; +use crate::update::new::thread_local::{FullySend, ThreadLocal}; +use crate::update::new::vector_document::VectorDocument; use crate::update::new::DocumentChange; +use crate::update::settings::SettingsDelta; +use crate::vector::settings::EmbedderAction; use crate::vector::EmbeddingConfigs; use crate::Result; @@ -41,10 +49,11 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> { fn process<'doc>( &self, changes: impl Iterator>>, - context: &DocumentChangeContext, + context: &DocumentContext, ) -> Result<()> { let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc); let mut document_extractor_data = context.data.0.borrow_mut_or_yield(); + let embedder_actions = &Default::default(); for change in changes { let change = change?; @@ -121,9 +130,11 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> { let content = write_to_obkv( &content, vector_content.as_ref(), + embedder_actions, &mut new_fields_ids_map, &mut document_buffer, )?; + self.document_sender.uncompressed(docid, external_docid, content).unwrap(); } DocumentChange::Insertion(insertion) => { @@ -146,6 +157,7 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> { let content = write_to_obkv( &content, inserted_vectors.as_ref(), + embedder_actions, &mut new_fields_ids_map, &mut document_buffer, )?; @@ -158,3 +170,144 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> { Ok(()) } } + +pub struct SettingsChangeDocumentExtractor<'a, 'b> { + document_sender: DocumentsSender<'a, 'b>, + embedder_actions: &'a BTreeMap, +} + +impl<'a, 'b> SettingsChangeDocumentExtractor<'a, 'b> { + pub fn new( + document_sender: DocumentsSender<'a, 'b>, + embedder_actions: &'a BTreeMap, + ) -> Self { + Self { document_sender, embedder_actions } + } +} + +impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentExtractor<'_, '_> { + type Data = FullySend<()>; + + fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result { + Ok(FullySend(())) + } + + fn process<'doc>( + &self, + documents: impl Iterator>>, + context: &DocumentContext, + ) -> Result<()> { + let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc); + + for document in documents { + let document = document?; + // **WARNING**: the exclusive borrow on `new_fields_ids_map` needs to be taken **inside** of the `for change in changes` loop + // Otherwise, `BorrowMutError` will occur for document changes that also need the new_fields_ids_map (e.g.: UpdateByFunction) + let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); + + let external_docid = document.external_document_id().to_owned(); + let content = + document.current(&context.rtxn, context.index, &context.db_fields_ids_map)?; + let vector_content = document.current_vectors( + &context.rtxn, + context.index, + &context.db_fields_ids_map, + &context.doc_alloc, + )?; + + // if the document doesn't need to be updated, we skip it + if !must_update_document(&vector_content, self.embedder_actions)? { + continue; + } + + let content = write_to_obkv( + &content, + Some(&vector_content), + self.embedder_actions, + &mut new_fields_ids_map, + &mut document_buffer, + )?; + + self.document_sender.uncompressed(document.docid(), external_docid, content).unwrap(); + } + + Ok(()) + } +} + +/// Modify the database documents based on the settings changes. +/// +/// This function extracts the documents from the database, +/// modifies them by adding or removing vector fields based on embedder actions, +/// and then updates the database. +#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents::extract")] +pub fn update_database_documents<'indexer, 'extractor, MSP, SD>( + documents: &'indexer DocumentsIndentifiers<'indexer>, + indexing_context: IndexingContext, + extractor_sender: &ExtractorBbqueueSender, + settings_delta: &SD, + extractor_allocs: &'extractor mut ThreadLocal>, +) -> Result<()> +where + MSP: Fn() -> bool + Sync, + SD: SettingsDelta, +{ + if !must_update_database(settings_delta) { + return Ok(()); + } + + let document_sender = extractor_sender.documents(); + let document_extractor = + SettingsChangeDocumentExtractor::new(document_sender, settings_delta.embedder_actions()); + let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); + + settings_change_extract( + documents, + &document_extractor, + indexing_context, + extractor_allocs, + &datastore, + crate::update::new::steps::IndexingStep::ExtractingDocuments, + )?; + + Ok(()) +} + +fn must_update_database(settings_delta: &SD) -> bool { + settings_delta.embedder_actions().iter().any(|(name, action)| { + if action.reindex().is_some() { + // if action has a reindex, we need to update the documents database if the embedder is a new one + settings_delta.old_embedders().get(name).is_none() + } else { + // if action has a write_back, we need to update the documents database + action.write_back().is_some() + } + }) +} + +fn must_update_document<'s, 'a>( + vector_document: &'s impl VectorDocument<'s>, + embedder_actions: &'a BTreeMap, +) -> Result +where + 's: 'a, +{ + // Check if any vector needs to be written back for the document + for (name, action) in embedder_actions { + // if the vector entry is not found, we don't need to update the document + let Some(vector_entry) = vector_document.vectors_for_key(name)? else { + continue; + }; + + // if the vector entry is user provided, we need to update the document by writing back vectors. + let write_back = action.write_back().is_some() && !vector_entry.regenerate; + // if the vector entry is a new embedder, we need to update the document removing the vectors from the document. + let new_embedder = action.reindex().is_some() && !vector_entry.has_configured_embedder; + + if write_back || new_embedder { + return Ok(true); + } + } + + Ok(false) +} diff --git a/crates/milli/src/update/new/extract/faceted/extract_facets.rs b/crates/milli/src/update/new/extract/faceted/extract_facets.rs index 517ef3f2d..6e9ae7ee4 100644 --- a/crates/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/crates/milli/src/update/new/extract/faceted/extract_facets.rs @@ -15,9 +15,10 @@ use crate::filterable_attributes_rules::match_faceted_field; use crate::heed_codec::facet::OrderedF64Codec; use crate::update::del_add::DelAdd; use crate::update::new::channel::FieldIdDocidFacetSender; +use crate::update::new::document::DocumentContext; use crate::update::new::extract::perm_json_p; use crate::update::new::indexer::document_changes::{ - extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, + extract, DocumentChanges, Extractor, IndexingContext, }; use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::steps::IndexingStep; @@ -51,7 +52,7 @@ impl<'extractor> Extractor<'extractor> for FacetedExtractorData<'_, '_> { fn process<'doc>( &self, changes: impl Iterator>>, - context: &DocumentChangeContext, + context: &DocumentContext, ) -> Result<()> { for change in changes { let change = change?; @@ -75,7 +76,7 @@ pub struct FacetedDocidsExtractor; impl FacetedDocidsExtractor { #[allow(clippy::too_many_arguments)] fn extract_document_change( - context: &DocumentChangeContext>, + context: &DocumentContext>, filterable_attributes: &[FilterableAttributesRule], sortable_fields: &HashSet, asc_desc_fields: &HashSet, diff --git a/crates/milli/src/update/new/extract/geo/mod.rs b/crates/milli/src/update/new/extract/geo/mod.rs index b2ccc1b2b..8e164b48f 100644 --- a/crates/milli/src/update/new/extract/geo/mod.rs +++ b/crates/milli/src/update/new/extract/geo/mod.rs @@ -10,8 +10,8 @@ use serde_json::value::RawValue; use serde_json::Value; use crate::error::GeoError; -use crate::update::new::document::Document; -use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor}; +use crate::update::new::document::{Document, DocumentContext}; +use crate::update::new::indexer::document_changes::Extractor; use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::thread_local::MostlySend; use crate::update::new::DocumentChange; @@ -150,7 +150,7 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor { fn process<'doc>( &'doc self, changes: impl Iterator>>, - context: &'doc DocumentChangeContext, + context: &'doc DocumentContext, ) -> Result<()> { let rtxn = &context.rtxn; let index = context.index; diff --git a/crates/milli/src/update/new/extract/mod.rs b/crates/milli/src/update/new/extract/mod.rs index 2abefb098..05c90d8f8 100644 --- a/crates/milli/src/update/new/extract/mod.rs +++ b/crates/milli/src/update/new/extract/mod.rs @@ -12,7 +12,7 @@ pub use documents::*; pub use faceted::*; pub use geo::*; pub use searchable::*; -pub use vectors::EmbeddingExtractor; +pub use vectors::{EmbeddingExtractor, SettingsChangeEmbeddingExtractor}; /// TODO move in permissive json pointer pub mod perm_json_p { diff --git a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs index 046116939..5daf34ca4 100644 --- a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -8,10 +8,11 @@ use bumpalo::Bump; use super::match_searchable_field; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; +use crate::update::new::document::DocumentContext; use crate::update::new::extract::cache::BalancedCaches; use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::indexer::document_changes::{ - extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, + extract, DocumentChanges, Extractor, IndexingContext, }; use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::steps::IndexingStep; @@ -226,7 +227,7 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'_> { fn process<'doc>( &self, changes: impl Iterator>>, - context: &DocumentChangeContext, + context: &DocumentContext, ) -> Result<()> { for change in changes { let change = change?; @@ -305,7 +306,7 @@ impl WordDocidsExtractors { } fn extract_document_change( - context: &DocumentChangeContext>>, + context: &DocumentContext>>, document_tokenizer: &DocumentTokenizer, searchable_attributes: Option<&[&str]>, document_change: DocumentChange, diff --git a/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs b/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs index 3b358800f..c9acb9734 100644 --- a/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs +++ b/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs @@ -7,10 +7,10 @@ use bumpalo::Bump; use super::match_searchable_field; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use crate::proximity::{index_proximity, MAX_DISTANCE}; -use crate::update::new::document::Document; +use crate::update::new::document::{Document, DocumentContext}; use crate::update::new::extract::cache::BalancedCaches; use crate::update::new::indexer::document_changes::{ - extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, + extract, DocumentChanges, Extractor, IndexingContext, }; use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::steps::IndexingStep; @@ -39,7 +39,7 @@ impl<'extractor> Extractor<'extractor> for WordPairProximityDocidsExtractorData< fn process<'doc>( &self, changes: impl Iterator>>, - context: &DocumentChangeContext, + context: &DocumentContext, ) -> Result<()> { for change in changes { let change = change?; @@ -116,7 +116,7 @@ impl WordPairProximityDocidsExtractor { // and to store the docids of the documents that have a number of words in a given field // equal to or under than MAX_COUNTED_WORDS. fn extract_document_change( - context: &DocumentChangeContext>, + context: &DocumentContext>, document_tokenizer: &DocumentTokenizer, searchable_attributes: Option<&[&str]>, document_change: DocumentChange, diff --git a/crates/milli/src/update/new/extract/vectors/mod.rs b/crates/milli/src/update/new/extract/vectors/mod.rs index 85398aa99..4d308018a 100644 --- a/crates/milli/src/update/new/extract/vectors/mod.rs +++ b/crates/milli/src/update/new/extract/vectors/mod.rs @@ -1,4 +1,5 @@ use std::cell::RefCell; +use std::collections::BTreeMap; use bumpalo::collections::Vec as BVec; use bumpalo::Bump; @@ -9,13 +10,16 @@ use crate::error::FaultSource; use crate::progress::EmbedderStats; use crate::prompt::Prompt; use crate::update::new::channel::EmbeddingSender; -use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor}; +use crate::update::new::document::{DocumentContext, DocumentIdentifiers}; +use crate::update::new::indexer::document_changes::Extractor; +use crate::update::new::indexer::settings_changes::SettingsChangeExtractor; use crate::update::new::thread_local::MostlySend; use crate::update::new::vector_document::VectorDocument; use crate::update::new::DocumentChange; use crate::vector::error::{ EmbedErrorKind, PossibleEmbeddingMistakes, UnusedVectorsDistributionBump, }; +use crate::vector::settings::{EmbedderAction, ReindexAction}; use crate::vector::{Embedder, Embedding, EmbeddingConfigs}; use crate::{DocumentId, FieldDistribution, InternalError, Result, ThreadPoolNoAbort, UserError}; @@ -56,7 +60,7 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> { fn process<'doc>( &'doc self, changes: impl Iterator>>, - context: &'doc DocumentChangeContext, + context: &'doc DocumentContext, ) -> crate::Result<()> { let embedders = self.embedders.inner_as_ref(); let mut unused_vectors_distribution = @@ -294,6 +298,209 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> { } } +pub struct SettingsChangeEmbeddingExtractor<'a, 'b> { + embedders: &'a EmbeddingConfigs, + old_embedders: &'a EmbeddingConfigs, + embedder_actions: &'a BTreeMap, + embedder_category_id: &'a std::collections::HashMap, + embedder_stats: &'a EmbedderStats, + sender: EmbeddingSender<'a, 'b>, + possible_embedding_mistakes: PossibleEmbeddingMistakes, + threads: &'a ThreadPoolNoAbort, +} + +impl<'a, 'b> SettingsChangeEmbeddingExtractor<'a, 'b> { + #[allow(clippy::too_many_arguments)] + pub fn new( + embedders: &'a EmbeddingConfigs, + old_embedders: &'a EmbeddingConfigs, + embedder_actions: &'a BTreeMap, + embedder_category_id: &'a std::collections::HashMap, + embedder_stats: &'a EmbedderStats, + sender: EmbeddingSender<'a, 'b>, + field_distribution: &'a FieldDistribution, + threads: &'a ThreadPoolNoAbort, + ) -> Self { + let possible_embedding_mistakes = PossibleEmbeddingMistakes::new(field_distribution); + Self { + embedders, + old_embedders, + embedder_actions, + embedder_category_id, + embedder_stats, + sender, + threads, + possible_embedding_mistakes, + } + } +} + +impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeEmbeddingExtractor<'_, '_> { + type Data = RefCell>; + + fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> crate::Result { + Ok(RefCell::new(EmbeddingExtractorData(HashMap::new_in(extractor_alloc)))) + } + + fn process<'doc>( + &'doc self, + documents: impl Iterator>>, + context: &'doc DocumentContext, + ) -> crate::Result<()> { + let embedders = self.embedders.inner_as_ref(); + let old_embedders = self.old_embedders.inner_as_ref(); + let unused_vectors_distribution = UnusedVectorsDistributionBump::new_in(&context.doc_alloc); + + let mut all_chunks = BVec::with_capacity_in(embedders.len(), &context.doc_alloc); + for (embedder_name, (embedder, prompt, _is_quantized)) in embedders { + // if the embedder is not in the embedder_actions, we don't need to reindex. + if let Some((embedder_id, reindex_action)) = + self.embedder_actions + .get(embedder_name) + // keep only the reindex actions + .and_then(EmbedderAction::reindex) + // map the reindex action to the embedder_id + .map(|reindex| { + let embedder_id = self.embedder_category_id.get(embedder_name).expect( + "An embedder_category_id must exist for all reindexed embedders", + ); + (*embedder_id, reindex) + }) + { + all_chunks.push(( + Chunks::new( + embedder, + embedder_id, + embedder_name, + prompt, + context.data, + &self.possible_embedding_mistakes, + self.embedder_stats, + self.threads, + self.sender, + &context.doc_alloc, + ), + reindex_action, + )) + } + } + + for document in documents { + let document = document?; + + let current_vectors = document.current_vectors( + &context.rtxn, + context.index, + context.db_fields_ids_map, + &context.doc_alloc, + )?; + + for (chunks, reindex_action) in &mut all_chunks { + let embedder_name = chunks.embedder_name(); + let current_vectors = current_vectors.vectors_for_key(embedder_name)?; + + // if the vectors for this document have been already provided, we don't need to reindex. + let (is_new_embedder, must_regenerate) = + current_vectors.as_ref().map_or((true, true), |vectors| { + (!vectors.has_configured_embedder, vectors.regenerate) + }); + + match reindex_action { + ReindexAction::RegeneratePrompts => { + if !must_regenerate { + continue; + } + // we need to regenerate the prompts for the document + + // Get the old prompt and render the document with it + let Some((_, old_prompt, _)) = old_embedders.get(embedder_name) else { + unreachable!("ReindexAction::RegeneratePrompts implies that the embedder {embedder_name} is in the old_embedders") + }; + let old_rendered = old_prompt.render_document( + document.external_document_id(), + document.current( + &context.rtxn, + context.index, + context.db_fields_ids_map, + )?, + context.new_fields_ids_map, + &context.doc_alloc, + )?; + + // Get the new prompt and render the document with it + let new_prompt = chunks.prompt(); + let new_rendered = new_prompt.render_document( + document.external_document_id(), + document.current( + &context.rtxn, + context.index, + context.db_fields_ids_map, + )?, + context.new_fields_ids_map, + &context.doc_alloc, + )?; + + // Compare the rendered documents + // if they are different, regenerate the vectors + if new_rendered != old_rendered { + chunks.set_autogenerated( + document.docid(), + document.external_document_id(), + new_rendered, + &unused_vectors_distribution, + )?; + } + } + ReindexAction::FullReindex => { + let prompt = chunks.prompt(); + // if no inserted vectors, then regenerate: true + no embeddings => autogenerate + if let Some(embeddings) = current_vectors + .and_then(|vectors| vectors.embeddings) + // insert the embeddings only for new embedders + .filter(|_| is_new_embedder) + { + chunks.set_regenerate(document.docid(), must_regenerate); + chunks.set_vectors( + document.external_document_id(), + document.docid(), + embeddings.into_vec(&context.doc_alloc, embedder_name).map_err( + |error| UserError::InvalidVectorsEmbedderConf { + document_id: document.external_document_id().to_string(), + error: error.to_string(), + }, + )?, + )?; + } else if must_regenerate { + let rendered = prompt.render_document( + document.external_document_id(), + document.current( + &context.rtxn, + context.index, + context.db_fields_ids_map, + )?, + context.new_fields_ids_map, + &context.doc_alloc, + )?; + chunks.set_autogenerated( + document.docid(), + document.external_document_id(), + rendered, + &unused_vectors_distribution, + )?; + } + } + } + } + } + + for (chunk, _) in all_chunks { + chunk.drain(&unused_vectors_distribution)?; + } + + Ok(()) + } +} + // **Warning**: the destructor of this struct is not normally run, make sure that all its fields: // 1. don't have side effects tied to they destructors // 2. if allocated, are allocated inside of the bumpalo diff --git a/crates/milli/src/update/new/indexer/document_changes.rs b/crates/milli/src/update/new/indexer/document_changes.rs index 5302c9d05..c88751ee3 100644 --- a/crates/milli/src/update/new/indexer/document_changes.rs +++ b/crates/milli/src/update/new/indexer/document_changes.rs @@ -3,100 +3,18 @@ use std::sync::atomic::Ordering; use std::sync::{Arc, RwLock}; use bumpalo::Bump; -use heed::{RoTxn, WithoutTls}; use rayon::iter::IndexedParallelIterator; use super::super::document_change::DocumentChange; use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; use crate::progress::{AtomicDocumentStep, Progress}; +use crate::update::new::document::DocumentContext; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; use crate::update::GrenadParameters; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result}; -pub struct DocumentChangeContext< - 'doc, // covariant lifetime of a single `process` call - 'extractor: 'doc, // invariant lifetime of the extractor_allocs - 'fid: 'doc, // invariant lifetime of the new_fields_ids_map - 'indexer: 'doc, // covariant lifetime of objects that outlive a single `process` call - T: MostlySend, -> { - /// The index we're indexing in - pub index: &'indexer Index, - /// The fields ids map as it was at the start of this indexing process. Contains at least all top-level fields from documents - /// inside of the DB. - pub db_fields_ids_map: &'indexer FieldsIdsMap, - /// A transaction providing data from the DB before all indexing operations - pub rtxn: RoTxn<'indexer, WithoutTls>, - - /// Global field id map that is up to date with the current state of the indexing process. - /// - /// - Inserting a field will take a lock - /// - Retrieving a field may take a lock as well - pub new_fields_ids_map: &'doc std::cell::RefCell>, - - /// Data allocated in this allocator is cleared between each call to `process`. - pub doc_alloc: Bump, - - /// Data allocated in this allocator is not cleared between each call to `process`, unless the data spills. - pub extractor_alloc: &'extractor Bump, - - /// Pool of doc allocators, used to retrieve the doc allocator we provided for the documents - doc_allocs: &'doc ThreadLocal>>, - - /// Extractor-specific data - pub data: &'doc T, -} - -impl< - 'doc, // covariant lifetime of a single `process` call - 'data: 'doc, // invariant on T lifetime of the datastore - 'extractor: 'doc, // invariant lifetime of extractor_allocs - 'fid: 'doc, // invariant lifetime of fields ids map - 'indexer: 'doc, // covariant lifetime of objects that survive a `process` call - T: MostlySend, - > DocumentChangeContext<'doc, 'extractor, 'fid, 'indexer, T> -{ - #[allow(clippy::too_many_arguments)] - pub fn new( - index: &'indexer Index, - db_fields_ids_map: &'indexer FieldsIdsMap, - new_fields_ids_map: &'fid RwLock, - extractor_allocs: &'extractor ThreadLocal>, - doc_allocs: &'doc ThreadLocal>>, - datastore: &'data ThreadLocal, - fields_ids_map_store: &'doc ThreadLocal>>>, - init_data: F, - ) -> Result - where - F: FnOnce(&'extractor Bump) -> Result, - { - let doc_alloc = - doc_allocs.get_or(|| FullySend(Cell::new(Bump::with_capacity(1024 * 1024)))); - let doc_alloc = doc_alloc.0.take(); - let fields_ids_map = fields_ids_map_store - .get_or(|| RefCell::new(GlobalFieldsIdsMap::new(new_fields_ids_map)).into()); - - let fields_ids_map = &fields_ids_map.0; - let extractor_alloc = extractor_allocs.get_or_default(); - - let data = datastore.get_or_try(move || init_data(&extractor_alloc.0))?; - - let txn = index.read_txn()?; - Ok(DocumentChangeContext { - index, - rtxn: txn, - db_fields_ids_map, - new_fields_ids_map: fields_ids_map, - doc_alloc, - extractor_alloc: &extractor_alloc.0, - data, - doc_allocs, - }) - } -} - /// An internal iterator (i.e. using `foreach`) of `DocumentChange`s pub trait Extractor<'extractor>: Sync { type Data: MostlySend; @@ -106,7 +24,7 @@ pub trait Extractor<'extractor>: Sync { fn process<'doc>( &'doc self, changes: impl Iterator>>, - context: &'doc DocumentChangeContext, + context: &'doc DocumentContext, ) -> Result<()>; } @@ -125,7 +43,7 @@ pub trait DocumentChanges<'pl // lifetime of the underlying payload fn item_to_document_change<'doc, // lifetime of a single `process` call T: MostlySend>( &'doc self, - context: &'doc DocumentChangeContext, + context: &'doc DocumentContext, item: &'doc Self::Item, ) -> Result>> where 'pl: 'doc // the payload must survive the process calls ; @@ -224,7 +142,7 @@ where let pi = document_changes.iter(CHUNK_SIZE); pi.try_arc_for_each_try_init( || { - DocumentChangeContext::new( + DocumentContext::new( index, db_fields_ids_map, new_fields_ids_map, diff --git a/crates/milli/src/update/new/indexer/document_deletion.rs b/crates/milli/src/update/new/indexer/document_deletion.rs index c4a72a2a1..157e20bb0 100644 --- a/crates/milli/src/update/new/indexer/document_deletion.rs +++ b/crates/milli/src/update/new/indexer/document_deletion.rs @@ -4,10 +4,11 @@ use rayon::iter::IndexedParallelIterator; use rayon::slice::ParallelSlice as _; use roaring::RoaringBitmap; -use super::document_changes::{DocumentChangeContext, DocumentChanges}; +use super::document_changes::DocumentChanges; use crate::documents::PrimaryKey; +use crate::update::new::document::DocumentContext; use crate::update::new::thread_local::MostlySend; -use crate::update::new::{Deletion, DocumentChange}; +use crate::update::new::{DocumentChange, DocumentIdentifiers}; use crate::{DocumentId, Result}; #[derive(Default)] @@ -58,7 +59,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> { T: MostlySend, >( &'doc self, - context: &'doc DocumentChangeContext, + context: &'doc DocumentContext, docid: &'doc Self::Item, ) -> Result>> where @@ -74,7 +75,10 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> { let external_document_id = external_document_id.to_bump(&context.doc_alloc); - Ok(Some(DocumentChange::Deletion(Deletion::create(*docid, external_document_id)))) + Ok(Some(DocumentChange::Deletion(DocumentIdentifiers::create( + *docid, + external_document_id, + )))) } fn len(&self) -> usize { @@ -93,9 +97,8 @@ mod test { use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; use crate::index::tests::TempIndex; use crate::progress::Progress; - use crate::update::new::indexer::document_changes::{ - extract, DocumentChangeContext, Extractor, IndexingContext, - }; + use crate::update::new::document::DocumentContext; + use crate::update::new::indexer::document_changes::{extract, Extractor, IndexingContext}; use crate::update::new::indexer::DocumentDeletion; use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::{MostlySend, ThreadLocal}; @@ -125,7 +128,7 @@ mod test { fn process<'doc>( &self, changes: impl Iterator>>, - context: &DocumentChangeContext, + context: &DocumentContext, ) -> crate::Result<()> { for change in changes { let change = change?; diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index ca433c043..98faaf145 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -12,14 +12,14 @@ use serde_json::value::RawValue; use serde_json::Deserializer; use super::super::document_change::DocumentChange; -use super::document_changes::{DocumentChangeContext, DocumentChanges}; +use super::document_changes::DocumentChanges; use super::guess_primary_key::retrieve_or_guess_primary_key; use crate::documents::PrimaryKey; use crate::progress::{AtomicPayloadStep, Progress}; -use crate::update::new::document::Versions; +use crate::update::new::document::{DocumentContext, Versions}; use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::MostlySend; -use crate::update::new::{Deletion, Insertion, Update}; +use crate::update::new::{DocumentIdentifiers, Insertion, Update}; use crate::update::{AvailableIds, IndexDocumentsMethod}; use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError}; @@ -411,7 +411,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> { fn item_to_document_change<'doc, T: MostlySend + 'doc>( &'doc self, - context: &'doc DocumentChangeContext, + context: &'doc DocumentContext, item: &'doc Self::Item, ) -> Result>> where @@ -577,7 +577,7 @@ impl<'pl> PayloadOperations<'pl> { if self.is_new { Ok(None) } else { - let deletion = Deletion::create(self.docid, external_doc); + let deletion = DocumentIdentifiers::create(self.docid, external_doc); Ok(Some(DocumentChange::Deletion(deletion))) } } diff --git a/crates/milli/src/update/new/indexer/extract.rs b/crates/milli/src/update/new/indexer/extract.rs index 97ffc8624..bb275d8aa 100644 --- a/crates/milli/src/update/new/indexer/extract.rs +++ b/crates/milli/src/update/new/indexer/extract.rs @@ -12,14 +12,21 @@ use super::super::steps::IndexingStep; use super::super::thread_local::{FullySend, ThreadLocal}; use super::super::FacetFieldIdsDelta; use super::document_changes::{extract, DocumentChanges, IndexingContext}; +use super::settings_changes::settings_change_extract; +use crate::documents::FieldIdMapper; +use crate::documents::PrimaryKey; use crate::index::IndexEmbeddingConfig; use crate::progress::EmbedderStats; use crate::progress::MergingWordCache; use crate::proximity::ProximityPrecision; use crate::update::new::extract::EmbeddingExtractor; +use crate::update::new::indexer::settings_changes::DocumentsIndentifiers; use crate::update::new::merger::merge_and_send_rtree; use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; +use crate::update::settings::SettingsDelta; use crate::vector::EmbeddingConfigs; +use crate::Index; +use crate::InternalError; use crate::{Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; #[allow(clippy::too_many_arguments)] @@ -315,6 +322,115 @@ where Result::Ok((facet_field_ids_delta, index_embeddings)) } +#[allow(clippy::too_many_arguments)] +pub(super) fn extract_all_settings_changes( + indexing_context: IndexingContext, + indexer_span: Span, + extractor_sender: ExtractorBbqueueSender, + settings_delta: &SD, + extractor_allocs: &mut ThreadLocal>, + finished_extraction: &AtomicBool, + field_distribution: &mut BTreeMap, + mut index_embeddings: Vec, + modified_docids: &mut RoaringBitmap, + embedder_stats: &EmbedderStats, +) -> Result> +where + MSP: Fn() -> bool + Sync, + SD: SettingsDelta, +{ + // Create the list of document ids to extract + let rtxn = indexing_context.index.read_txn()?; + let all_document_ids = + indexing_context.index.documents_ids(&rtxn)?.into_iter().collect::>(); + let primary_key = + primary_key_from_db(indexing_context.index, &rtxn, &indexing_context.db_fields_ids_map)?; + let documents = DocumentsIndentifiers::new(&all_document_ids, primary_key); + + let span = + tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); + let _entered = span.enter(); + + update_database_documents( + &documents, + indexing_context, + &extractor_sender, + settings_delta, + extractor_allocs, + )?; + + 'vectors: { + if settings_delta.embedder_actions().is_empty() { + break 'vectors; + } + + let embedding_sender = extractor_sender.embeddings(); + + // extract the remaining embeddings + let extractor = SettingsChangeEmbeddingExtractor::new( + settings_delta.new_embedders(), + settings_delta.old_embedders(), + settings_delta.embedder_actions(), + settings_delta.new_embedder_category_id(), + embedder_stats, + embedding_sender, + field_distribution, + request_threads(), + ); + let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); + { + let span = tracing::debug_span!(target: "indexing::documents::extract", "vectors"); + let _entered = span.enter(); + + settings_change_extract( + &documents, + &extractor, + indexing_context, + extractor_allocs, + &datastore, + IndexingStep::ExtractingEmbeddings, + )?; + } + { + let span = tracing::debug_span!(target: "indexing::documents::merge", "vectors"); + let _entered = span.enter(); + + for config in &mut index_embeddings { + 'data: for data in datastore.iter_mut() { + let data = &mut data.get_mut().0; + let Some(deladd) = data.remove(&config.name) else { + continue 'data; + }; + deladd.apply_to(&mut config.user_provided, modified_docids); + } + } + } + } + + indexing_context.progress.update_progress(IndexingStep::WaitingForDatabaseWrites); + finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed); + + Result::Ok(index_embeddings) +} + +fn primary_key_from_db<'indexer>( + index: &'indexer Index, + rtxn: &'indexer heed::RoTxn<'_>, + fields: &'indexer impl FieldIdMapper, +) -> Result> { + let Some(primary_key) = index.primary_key(rtxn)? else { + return Err(InternalError::DatabaseMissingEntry { + db_name: crate::index::db_name::MAIN, + key: Some(crate::index::main_key::PRIMARY_KEY_KEY), + } + .into()); + }; + let Some(primary_key) = PrimaryKey::new(primary_key, fields) else { + unreachable!("Primary key must exist at this point"); + }; + Ok(primary_key) +} + fn request_threads() -> &'static ThreadPoolNoAbort { static REQUEST_THREADS: OnceLock = OnceLock::new(); diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index bb6ba0102..0efef48fd 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -1,5 +1,6 @@ +use std::collections::BTreeMap; use std::sync::atomic::AtomicBool; -use std::sync::{Once, RwLock}; +use std::sync::{Arc, Once, RwLock}; use std::thread::{self, Builder}; use big_s::S; @@ -20,8 +21,10 @@ use super::thread_local::ThreadLocal; use crate::documents::PrimaryKey; use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; use crate::progress::{EmbedderStats, Progress}; +use crate::update::settings::SettingsDelta; use crate::update::GrenadParameters; -use crate::vector::{ArroyWrapper, EmbeddingConfigs}; +use crate::vector::settings::{EmbedderAction, WriteBackToDocuments}; +use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs}; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort}; pub(crate) mod de; @@ -32,6 +35,7 @@ mod extract; mod guess_primary_key; mod partial_dump; mod post_processing; +pub mod settings_changes; mod update_by_function; mod write; @@ -40,8 +44,6 @@ static LOG_MEMORY_METRICS_ONCE: Once = Once::new(); /// This is the main function of this crate. /// /// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`]. -/// -/// TODO return stats #[allow(clippy::too_many_arguments)] // clippy: 😝 pub fn index<'pl, 'indexer, 'index, DC, MSP>( wtxn: &mut RwTxn, @@ -66,48 +68,8 @@ where let arroy_memory = grenad_parameters.max_memory; - // We reduce the actual memory used to 5%. The reason we do this here and not in Meilisearch - // is because we still use the old indexer for the settings and it is highly impacted by the - // max memory. So we keep the changes here and will remove these changes once we use the new - // indexer to also index settings. Related to #5125 and #5141. - let grenad_parameters = GrenadParameters { - max_memory: grenad_parameters.max_memory.map(|mm| mm * 5 / 100), - ..grenad_parameters - }; - - // 5% percent of the allocated memory for the extractors, or min 100MiB - // 5% percent of the allocated memory for the bbqueues, or min 50MiB - // - // Minimum capacity for bbqueues - let minimum_total_bbbuffer_capacity = 50 * 1024 * 1024 * pool.current_num_threads(); // 50 MiB - let minimum_total_extractors_capacity = minimum_total_bbbuffer_capacity * 2; - - let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or( - ( - GrenadParameters { - max_memory: Some(minimum_total_extractors_capacity), - ..grenad_parameters - }, - minimum_total_bbbuffer_capacity, - ), // 100 MiB by thread by default - |max_memory| { - let total_bbbuffer_capacity = max_memory.max(minimum_total_bbbuffer_capacity); - let new_grenad_parameters = GrenadParameters { - max_memory: Some(max_memory.max(minimum_total_extractors_capacity)), - ..grenad_parameters - }; - (new_grenad_parameters, total_bbbuffer_capacity) - }, - ); - - LOG_MEMORY_METRICS_ONCE.call_once(|| { - tracing::debug!( - "Indexation allocated memory metrics - \ - Total BBQueue size: {total_bbbuffer_capacity}, \ - Total extractor memory: {:?}", - grenad_parameters.max_memory, - ); - }); + let (grenad_parameters, total_bbbuffer_capacity) = + indexer_memory_settings(pool.current_num_threads(), grenad_parameters); let (extractor_sender, writer_receiver) = pool .install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000)) @@ -208,6 +170,7 @@ where index_embeddings, arroy_memory, &mut arroy_writers, + None, &indexing_context.must_stop_processing, ) }) @@ -241,3 +204,238 @@ where Ok(congestion) } + +#[allow(clippy::too_many_arguments)] +pub fn reindex<'indexer, 'index, MSP, SD>( + wtxn: &mut RwTxn<'index>, + index: &'index Index, + pool: &ThreadPoolNoAbort, + grenad_parameters: GrenadParameters, + settings_delta: &'indexer SD, + must_stop_processing: &'indexer MSP, + progress: &'indexer Progress, + embedder_stats: Arc, +) -> Result +where + MSP: Fn() -> bool + Sync, + SD: SettingsDelta + Sync, +{ + delete_old_embedders(wtxn, index, settings_delta)?; + + let mut bbbuffers = Vec::new(); + let finished_extraction = AtomicBool::new(false); + + let arroy_memory = grenad_parameters.max_memory; + + let (grenad_parameters, total_bbbuffer_capacity) = + indexer_memory_settings(pool.current_num_threads(), grenad_parameters); + + let (extractor_sender, writer_receiver) = pool + .install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000)) + .unwrap(); + + let mut extractor_allocs = ThreadLocal::with_capacity(rayon::current_num_threads()); + + let db_fields_ids_map = index.fields_ids_map(wtxn)?; + let new_fields_ids_map = settings_delta.new_fields_ids_map().clone(); + let new_fields_ids_map = RwLock::new(new_fields_ids_map); + let fields_ids_map_store = ThreadLocal::with_capacity(rayon::current_num_threads()); + let doc_allocs = ThreadLocal::with_capacity(rayon::current_num_threads()); + + let indexing_context = IndexingContext { + index, + db_fields_ids_map: &db_fields_ids_map, + new_fields_ids_map: &new_fields_ids_map, + doc_allocs: &doc_allocs, + fields_ids_map_store: &fields_ids_map_store, + must_stop_processing, + progress, + grenad_parameters: &grenad_parameters, + }; + + let index_embeddings = index.embedding_configs(wtxn)?; + let mut field_distribution = index.field_distribution(wtxn)?; + let mut modified_docids = roaring::RoaringBitmap::new(); + + let congestion = thread::scope(|s| -> Result { + let indexer_span = tracing::Span::current(); + let finished_extraction = &finished_extraction; + // prevent moving the field_distribution and document_ids in the inner closure... + let field_distribution = &mut field_distribution; + let modified_docids = &mut modified_docids; + let extractor_handle = + Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { + pool.install(move || { + extract::extract_all_settings_changes( + indexing_context, + indexer_span, + extractor_sender, + settings_delta, + &mut extractor_allocs, + finished_extraction, + field_distribution, + index_embeddings, + modified_docids, + &embedder_stats, + ) + }) + .unwrap() + })?; + + let new_embedders = settings_delta.new_embedders(); + let embedder_actions = settings_delta.embedder_actions(); + let index_embedder_category_ids = settings_delta.new_embedder_category_id(); + let mut arroy_writers = arroy_writers_from_embedder_actions( + index, + embedder_actions, + new_embedders, + index_embedder_category_ids, + )?; + + let congestion = + write_to_db(writer_receiver, finished_extraction, index, wtxn, &arroy_writers)?; + + indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors); + + let index_embeddings = extractor_handle.join().unwrap()?; + + indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase); + + pool.install(|| { + build_vectors( + index, + wtxn, + indexing_context.progress, + index_embeddings, + arroy_memory, + &mut arroy_writers, + Some(embedder_actions), + &indexing_context.must_stop_processing, + ) + }) + .unwrap()?; + + indexing_context.progress.update_progress(IndexingStep::Finalizing); + + Ok(congestion) as Result<_> + })?; + + // required to into_inner the new_fields_ids_map + drop(fields_ids_map_store); + + let new_fields_ids_map = new_fields_ids_map.into_inner().unwrap(); + let document_ids = index.documents_ids(wtxn)?; + update_index( + index, + wtxn, + new_fields_ids_map, + None, + settings_delta.new_embedders().clone(), + field_distribution, + document_ids, + )?; + + Ok(congestion) +} + +fn arroy_writers_from_embedder_actions<'indexer>( + index: &Index, + embedder_actions: &'indexer BTreeMap, + embedders: &'indexer EmbeddingConfigs, + index_embedder_category_ids: &'indexer std::collections::HashMap, +) -> Result> { + let vector_arroy = index.vector_arroy; + + embedders + .inner_as_ref() + .iter() + .filter_map(|(embedder_name, (embedder, _, _))| match embedder_actions.get(embedder_name) { + None => None, + Some(action) if action.write_back().is_some() => None, + Some(action) => { + let Some(&embedder_category_id) = index_embedder_category_ids.get(embedder_name) + else { + return Some(Err(crate::error::Error::InternalError( + crate::InternalError::DatabaseMissingEntry { + db_name: crate::index::db_name::VECTOR_EMBEDDER_CATEGORY_ID, + key: None, + }, + ))); + }; + let writer = + ArroyWrapper::new(vector_arroy, embedder_category_id, action.was_quantized); + let dimensions = embedder.dimensions(); + Some(Ok(( + embedder_category_id, + (embedder_name.as_str(), embedder.as_ref(), writer, dimensions), + ))) + } + }) + .collect() +} + +fn delete_old_embedders(wtxn: &mut RwTxn<'_>, index: &Index, settings_delta: &SD) -> Result<()> +where + SD: SettingsDelta, +{ + for action in settings_delta.embedder_actions().values() { + if let Some(WriteBackToDocuments { embedder_id, .. }) = action.write_back() { + let reader = ArroyWrapper::new(index.vector_arroy, *embedder_id, action.was_quantized); + let dimensions = reader.dimensions(wtxn)?; + reader.clear(wtxn, dimensions)?; + } + } + + Ok(()) +} + +fn indexer_memory_settings( + current_num_threads: usize, + grenad_parameters: GrenadParameters, +) -> (GrenadParameters, usize) { + // We reduce the actual memory used to 5%. The reason we do this here and not in Meilisearch + // is because we still use the old indexer for the settings and it is highly impacted by the + // max memory. So we keep the changes here and will remove these changes once we use the new + // indexer to also index settings. Related to #5125 and #5141. + let grenad_parameters = GrenadParameters { + max_memory: grenad_parameters.max_memory.map(|mm| mm * 5 / 100), + ..grenad_parameters + }; + + // 5% percent of the allocated memory for the extractors, or min 100MiB + // 5% percent of the allocated memory for the bbqueues, or min 50MiB + // + // Minimum capacity for bbqueues + let minimum_total_bbbuffer_capacity = 50 * 1024 * 1024 * current_num_threads; + // 50 MiB + let minimum_total_extractors_capacity = minimum_total_bbbuffer_capacity * 2; + + let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or( + ( + GrenadParameters { + max_memory: Some(minimum_total_extractors_capacity), + ..grenad_parameters + }, + minimum_total_bbbuffer_capacity, + ), // 100 MiB by thread by default + |max_memory| { + let total_bbbuffer_capacity = max_memory.max(minimum_total_bbbuffer_capacity); + let new_grenad_parameters = GrenadParameters { + max_memory: Some(max_memory.max(minimum_total_extractors_capacity)), + ..grenad_parameters + }; + (new_grenad_parameters, total_bbbuffer_capacity) + }, + ); + + LOG_MEMORY_METRICS_ONCE.call_once(|| { + tracing::debug!( + "Indexation allocated memory metrics - \ + Total BBQueue size: {total_bbbuffer_capacity}, \ + Total extractor memory: {:?}", + grenad_parameters.max_memory, + ); + }); + + (grenad_parameters, total_bbbuffer_capacity) +} diff --git a/crates/milli/src/update/new/indexer/partial_dump.rs b/crates/milli/src/update/new/indexer/partial_dump.rs index 6e4abd898..33e72f532 100644 --- a/crates/milli/src/update/new/indexer/partial_dump.rs +++ b/crates/milli/src/update/new/indexer/partial_dump.rs @@ -5,10 +5,10 @@ use rayon::iter::IndexedParallelIterator; use rustc_hash::FxBuildHasher; use serde_json::value::RawValue; -use super::document_changes::{DocumentChangeContext, DocumentChanges}; +use super::document_changes::DocumentChanges; use crate::documents::PrimaryKey; use crate::update::concurrent_available_ids::ConcurrentAvailableIds; -use crate::update::new::document::Versions; +use crate::update::new::document::{DocumentContext, Versions}; use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::thread_local::MostlySend; use crate::update::new::{DocumentChange, Insertion}; @@ -55,7 +55,7 @@ where fn item_to_document_change<'doc, T: MostlySend + 'doc>( &'doc self, - context: &'doc DocumentChangeContext, + context: &'doc DocumentContext, document: &'doc Self::Item, ) -> Result>> where diff --git a/crates/milli/src/update/new/indexer/settings_changes.rs b/crates/milli/src/update/new/indexer/settings_changes.rs new file mode 100644 index 000000000..984ab3a0b --- /dev/null +++ b/crates/milli/src/update/new/indexer/settings_changes.rs @@ -0,0 +1,146 @@ +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use bumpalo::Bump; +use rayon::iter::IndexedParallelIterator; +use rayon::slice::ParallelSlice; + +use super::document_changes::IndexingContext; +use crate::documents::PrimaryKey; +use crate::progress::AtomicDocumentStep; +use crate::update::new::document::{DocumentContext, DocumentIdentifiers}; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; +use crate::update::new::steps::IndexingStep; +use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; +use crate::{DocumentId, InternalError, Result}; + +/// An internal iterator (i.e. using `foreach`) of `DocumentChange`s +pub trait SettingsChangeExtractor<'extractor>: Sync { + type Data: MostlySend; + + fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result; + + fn process<'doc>( + &'doc self, + documents: impl Iterator>>, + context: &'doc DocumentContext, + ) -> Result<()>; +} +pub struct DocumentsIndentifiers<'indexer> { + documents: &'indexer [DocumentId], + primary_key: PrimaryKey<'indexer>, +} + +impl<'indexer> DocumentsIndentifiers<'indexer> { + pub fn new(documents: &'indexer [DocumentId], primary_key: PrimaryKey<'indexer>) -> Self { + Self { documents, primary_key } + } + + fn iter(&self, chunk_size: usize) -> impl IndexedParallelIterator { + self.documents.par_chunks(chunk_size) + } + + fn item_to_database_document< + 'doc, // lifetime of a single `process` call + T: MostlySend, + >( + &'doc self, + context: &'doc DocumentContext, + docid: &'doc DocumentId, + ) -> Result>> { + let current = context.index.document(&context.rtxn, *docid)?; + + let external_document_id = self.primary_key.extract_docid_from_db( + current, + &context.db_fields_ids_map, + &context.doc_alloc, + )?; + + let external_document_id = external_document_id.to_bump(&context.doc_alloc); + + Ok(Some(DocumentIdentifiers::create(*docid, external_document_id))) + } + + fn len(&self) -> usize { + self.documents.len() + } +} + +const CHUNK_SIZE: usize = 100; + +pub fn settings_change_extract< + 'extractor, // invariant lifetime of extractor_alloc + 'fid, // invariant lifetime of fields ids map + 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing + 'data, // invariant on EX::Data lifetime of datastore + 'index, // covariant lifetime of the index + EX: SettingsChangeExtractor<'extractor>, + MSP: Fn() -> bool + Sync, +>( + documents: &'indexer DocumentsIndentifiers<'indexer>, + extractor: &EX, + IndexingContext { + index, + db_fields_ids_map, + new_fields_ids_map, + doc_allocs, + fields_ids_map_store, + must_stop_processing, + progress, + grenad_parameters: _, + }: IndexingContext<'fid, 'indexer, 'index, MSP>, + extractor_allocs: &'extractor mut ThreadLocal>, + datastore: &'data ThreadLocal, + step: IndexingStep, +) -> Result<()> { + tracing::trace!("We are resetting the extractor allocators"); + progress.update_progress(step); + // Clean up and reuse the extractor allocs + for extractor_alloc in extractor_allocs.iter_mut() { + tracing::trace!("\tWith {} bytes reset", extractor_alloc.0.allocated_bytes()); + extractor_alloc.0.reset(); + } + + let total_documents = documents.len() as u32; + let (step, progress_step) = AtomicDocumentStep::new(total_documents); + progress.update_progress(progress_step); + + let pi = documents.iter(CHUNK_SIZE); + pi.try_arc_for_each_try_init( + || { + DocumentContext::new( + index, + db_fields_ids_map, + new_fields_ids_map, + extractor_allocs, + doc_allocs, + datastore, + fields_ids_map_store, + move |index_alloc| extractor.init_data(index_alloc), + ) + }, + |context, items| { + if (must_stop_processing)() { + return Err(Arc::new(InternalError::AbortedIndexation.into())); + } + + // Clean up and reuse the document-specific allocator + context.doc_alloc.reset(); + + let documents = items + .iter() + .filter_map(|item| documents.item_to_database_document(context, item).transpose()); + + let res = extractor.process(documents, context).map_err(Arc::new); + step.fetch_add(items.as_ref().len() as u32, Ordering::Relaxed); + + // send back the doc_alloc in the pool + context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc)); + + res + }, + )?; + step.store(total_documents, Ordering::Relaxed); + + Ok(()) +} diff --git a/crates/milli/src/update/new/indexer/update_by_function.rs b/crates/milli/src/update/new/indexer/update_by_function.rs index 3001648e6..daffe42ed 100644 --- a/crates/milli/src/update/new/indexer/update_by_function.rs +++ b/crates/milli/src/update/new/indexer/update_by_function.rs @@ -5,15 +5,14 @@ use rhai::{Dynamic, Engine, OptimizationLevel, Scope, AST}; use roaring::RoaringBitmap; use rustc_hash::FxBuildHasher; -use super::document_changes::DocumentChangeContext; use super::DocumentChanges; use crate::documents::Error::InvalidDocumentFormat; use crate::documents::PrimaryKey; use crate::error::{FieldIdMapMissingEntry, InternalError}; -use crate::update::new::document::Versions; +use crate::update::new::document::{DocumentContext, Versions}; use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::thread_local::MostlySend; -use crate::update::new::{Deletion, DocumentChange, KvReaderFieldId, Update}; +use crate::update::new::{DocumentChange, DocumentIdentifiers, KvReaderFieldId, Update}; use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError}; pub struct UpdateByFunction { @@ -86,13 +85,13 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> { fn item_to_document_change<'doc, T: MostlySend + 'doc>( &self, - context: &'doc DocumentChangeContext, + context: &'doc DocumentContext, docid: &'doc Self::Item, ) -> Result>> where 'index: 'doc, { - let DocumentChangeContext { + let DocumentContext { index, db_fields_ids_map, rtxn: txn, @@ -128,10 +127,9 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> { match scope.remove::("doc") { // If the "doc" variable has been set to (), we effectively delete the document. - Some(doc) if doc.is_unit() => Ok(Some(DocumentChange::Deletion(Deletion::create( - docid, - doc_alloc.alloc_str(&document_id), - )))), + Some(doc) if doc.is_unit() => Ok(Some(DocumentChange::Deletion( + DocumentIdentifiers::create(docid, doc_alloc.alloc_str(&document_id)), + ))), None => unreachable!("missing doc variable from the Rhai scope"), Some(new_document) => match new_document.try_cast() { Some(new_rhai_document) => { diff --git a/crates/milli/src/update/new/indexer/write.rs b/crates/milli/src/update/new/indexer/write.rs index 5a600eeb3..fa48ff589 100644 --- a/crates/milli/src/update/new/indexer/write.rs +++ b/crates/milli/src/update/new/indexer/write.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeMap; use std::sync::atomic::AtomicBool; use bstr::ByteSlice as _; @@ -13,6 +14,7 @@ use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; use crate::index::IndexEmbeddingConfig; use crate::progress::Progress; use crate::update::settings::InnerIndexSettings; +use crate::vector::settings::EmbedderAction; use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs, Embeddings}; use crate::{Error, Index, InternalError, Result, UserError}; @@ -99,6 +101,7 @@ impl ChannelCongestion { } #[tracing::instrument(level = "debug", skip_all, target = "indexing::vectors")] +#[allow(clippy::too_many_arguments)] pub fn build_vectors( index: &Index, wtxn: &mut RwTxn<'_>, @@ -106,6 +109,7 @@ pub fn build_vectors( index_embeddings: Vec, arroy_memory: Option, arroy_writers: &mut HashMap, + embeder_actions: Option<&BTreeMap>, must_stop_processing: &MSP, ) -> Result<()> where @@ -117,14 +121,17 @@ where let seed = rand::random(); let mut rng = rand::rngs::StdRng::seed_from_u64(seed); - for (_index, (_embedder_name, _embedder, writer, dimensions)) in arroy_writers { + for (_index, (embedder_name, _embedder, writer, dimensions)) in arroy_writers { let dimensions = *dimensions; + let is_being_quantized = embeder_actions + .and_then(|actions| actions.get(*embedder_name).map(|action| action.is_being_quantized)) + .unwrap_or(false); writer.build_and_quantize( wtxn, progress, &mut rng, dimensions, - false, + is_being_quantized, arroy_memory, must_stop_processing, )?; diff --git a/crates/milli/src/update/new/mod.rs b/crates/milli/src/update/new/mod.rs index 81ff93e54..ffe27ffda 100644 --- a/crates/milli/src/update/new/mod.rs +++ b/crates/milli/src/update/new/mod.rs @@ -1,4 +1,5 @@ -pub use document_change::{Deletion, DocumentChange, Insertion, Update}; +pub use document::DocumentIdentifiers; +pub use document_change::{DocumentChange, Insertion, Update}; pub use indexer::ChannelCongestion; pub use merger::{ merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases, FacetFieldIdsDelta, diff --git a/crates/milli/src/update/settings.rs b/crates/milli/src/update/settings.rs index 05dbb4784..c6ede7a1d 100644 --- a/crates/milli/src/update/settings.rs +++ b/crates/milli/src/update/settings.rs @@ -28,16 +28,20 @@ use crate::index::{ }; use crate::order_by_map::OrderByMap; use crate::progress::EmbedderStats; +use crate::progress::Progress; use crate::prompt::{default_max_bytes, default_template_text, PromptData}; use crate::proximity::ProximityPrecision; use crate::update::index_documents::IndexDocumentsMethod; +use crate::update::new::indexer::reindex; use crate::update::{IndexDocuments, UpdateIndexingStep}; use crate::vector::settings::{ EmbedderAction, EmbedderSource, EmbeddingSettings, NestingContext, ReindexAction, SubEmbeddingSettings, WriteBackToDocuments, }; use crate::vector::{Embedder, EmbeddingConfig, EmbeddingConfigs}; -use crate::{FieldId, FilterableAttributesRule, Index, LocalizedAttributesRule, Result}; +use crate::{ + ChannelCongestion, FieldId, FilterableAttributesRule, Index, LocalizedAttributesRule, Result, +}; #[derive(Debug, Clone, PartialEq, Eq, Copy)] pub enum Setting { @@ -1358,7 +1362,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { } } - pub fn execute( + pub fn legacy_execute( mut self, progress_callback: FP, should_abort: FA, @@ -1426,6 +1430,108 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { Ok(()) } + + pub fn execute<'indexer, MSP>( + mut self, + must_stop_processing: &'indexer MSP, + progress: &'indexer Progress, + embedder_stats: Arc, + ) -> Result> + where + MSP: Fn() -> bool + Sync, + { + // force the old indexer if the environment says so + if self.indexer_config.experimental_no_edition_2024_for_settings { + return self + .legacy_execute( + |indexing_step| tracing::debug!(update = ?indexing_step), + must_stop_processing, + embedder_stats, + ) + .map(|_| None); + } + + // only use the new indexer when only the embedder possibly changed + if let Self { + searchable_fields: Setting::NotSet, + displayed_fields: Setting::NotSet, + filterable_fields: Setting::NotSet, + sortable_fields: Setting::NotSet, + criteria: Setting::NotSet, + stop_words: Setting::NotSet, + non_separator_tokens: Setting::NotSet, + separator_tokens: Setting::NotSet, + dictionary: Setting::NotSet, + distinct_field: Setting::NotSet, + synonyms: Setting::NotSet, + primary_key: Setting::NotSet, + authorize_typos: Setting::NotSet, + min_word_len_two_typos: Setting::NotSet, + min_word_len_one_typo: Setting::NotSet, + exact_words: Setting::NotSet, + exact_attributes: Setting::NotSet, + max_values_per_facet: Setting::NotSet, + sort_facet_values_by: Setting::NotSet, + pagination_max_total_hits: Setting::NotSet, + proximity_precision: Setting::NotSet, + embedder_settings: _, + search_cutoff: Setting::NotSet, + localized_attributes_rules: Setting::NotSet, + prefix_search: Setting::NotSet, + facet_search: Setting::NotSet, + disable_on_numbers: Setting::NotSet, + chat: Setting::NotSet, + wtxn: _, + index: _, + indexer_config: _, + } = &self + { + self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?; + + let old_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn, None)?; + + // Update index settings + let embedding_config_updates = self.update_embedding_configs()?; + + let new_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn, None)?; + + let primary_key_id = self + .index + .primary_key(self.wtxn)? + .and_then(|name| new_inner_settings.fields_ids_map.id(name)); + let settings_update_only = true; + let inner_settings_diff = InnerIndexSettingsDiff::new( + old_inner_settings, + new_inner_settings, + primary_key_id, + embedding_config_updates, + settings_update_only, + ); + + if self.index.number_of_documents(self.wtxn)? > 0 { + reindex( + self.wtxn, + self.index, + &self.indexer_config.thread_pool, + self.indexer_config.grenad_parameters(), + &inner_settings_diff, + must_stop_processing, + progress, + embedder_stats, + ) + .map(Some) + } else { + Ok(None) + } + } else { + self.legacy_execute( + |indexing_step| tracing::debug!(update = ?indexing_step), + must_stop_processing, + embedder_stats, + ) + .map(|_| None) + } + } } pub struct InnerIndexSettingsDiff { @@ -1685,6 +1791,7 @@ pub(crate) struct InnerIndexSettings { pub disabled_typos_terms: DisabledTyposTerms, pub proximity_precision: ProximityPrecision, pub embedding_configs: EmbeddingConfigs, + pub embedder_category_id: HashMap, pub geo_fields_ids: Option<(FieldId, FieldId)>, pub prefix_search: PrefixSearch, pub facet_search: bool, @@ -1707,6 +1814,11 @@ impl InnerIndexSettings { Some(embedding_configs) => embedding_configs, None => embedders(index.embedding_configs(rtxn)?)?, }; + let embedder_category_id = index + .embedder_category_id + .iter(rtxn)? + .map(|r| r.map(|(k, v)| (k.to_string(), v))) + .collect::>()?; let prefix_search = index.prefix_search(rtxn)?.unwrap_or_default(); let facet_search = index.facet_search(rtxn)?; let geo_fields_ids = match fields_ids_map.id(RESERVED_GEO_FIELD_NAME) { @@ -1746,6 +1858,7 @@ impl InnerIndexSettings { exact_attributes, proximity_precision, embedding_configs, + embedder_category_id, geo_fields_ids, prefix_search, facet_search, @@ -2115,6 +2228,38 @@ fn deserialize_sub_embedder( } } +/// Implement this trait for the settings delta type. +/// This is used in the new settings update flow and will allow to easily replace the old settings delta type: `InnerIndexSettingsDiff`. +pub trait SettingsDelta { + fn new_embedders(&self) -> &EmbeddingConfigs; + fn old_embedders(&self) -> &EmbeddingConfigs; + fn new_embedder_category_id(&self) -> &HashMap; + fn embedder_actions(&self) -> &BTreeMap; + fn new_fields_ids_map(&self) -> &FieldIdMapWithMetadata; +} + +impl SettingsDelta for InnerIndexSettingsDiff { + fn new_embedders(&self) -> &EmbeddingConfigs { + &self.new.embedding_configs + } + + fn old_embedders(&self) -> &EmbeddingConfigs { + &self.old.embedding_configs + } + + fn new_embedder_category_id(&self) -> &HashMap { + &self.new.embedder_category_id + } + + fn embedder_actions(&self) -> &BTreeMap { + &self.embedding_config_updates + } + + fn new_fields_ids_map(&self) -> &FieldIdMapWithMetadata { + &self.new.fields_ids_map + } +} + #[cfg(test)] #[path = "test_settings.rs"] mod tests; diff --git a/crates/milli/tests/search/distinct.rs b/crates/milli/tests/search/distinct.rs index 15fcf70a2..c7fa9befa 100644 --- a/crates/milli/tests/search/distinct.rs +++ b/crates/milli/tests/search/distinct.rs @@ -1,6 +1,7 @@ use std::collections::HashSet; use big_s::S; +use milli::progress::Progress; use milli::update::Settings; use milli::{Criterion, Search, SearchResult, TermsMatchingStrategy}; use Criterion::*; @@ -19,7 +20,7 @@ macro_rules! test_distinct { let config = milli::update::IndexerConfig::default(); let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_distinct_field(S(stringify!($distinct))); - builder.execute(|_| (), || false, Default::default()).unwrap(); + builder.execute(&|| false, &Progress::default(), Default::default()).unwrap(); wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); diff --git a/crates/milli/tests/search/facet_distribution.rs b/crates/milli/tests/search/facet_distribution.rs index 8548f0d01..d04db425e 100644 --- a/crates/milli/tests/search/facet_distribution.rs +++ b/crates/milli/tests/search/facet_distribution.rs @@ -25,7 +25,7 @@ fn test_facet_distribution_with_no_facet_values() { FilterableAttributesRule::Field(S("genres")), FilterableAttributesRule::Field(S("tags")), ]); - builder.execute(|_| (), || false, Default::default()).unwrap(); + builder.execute(&|| false, &Progress::default(), Default::default()).unwrap(); wtxn.commit().unwrap(); // index documents diff --git a/crates/milli/tests/search/mod.rs b/crates/milli/tests/search/mod.rs index 4098af736..3ee78561d 100644 --- a/crates/milli/tests/search/mod.rs +++ b/crates/milli/tests/search/mod.rs @@ -63,7 +63,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { S("america") => vec![S("the united states")], }); builder.set_searchable_fields(vec![S("title"), S("description")]); - builder.execute(|_| (), || false, Default::default()).unwrap(); + builder.execute(&|| false, &Progress::default(), Default::default()).unwrap(); wtxn.commit().unwrap(); // index documents diff --git a/crates/milli/tests/search/phrase_search.rs b/crates/milli/tests/search/phrase_search.rs index 180fcd176..397729c20 100644 --- a/crates/milli/tests/search/phrase_search.rs +++ b/crates/milli/tests/search/phrase_search.rs @@ -1,3 +1,4 @@ +use milli::progress::Progress; use milli::update::{IndexerConfig, Settings}; use milli::{Criterion, Index, Search, TermsMatchingStrategy}; @@ -10,7 +11,7 @@ fn set_stop_words(index: &Index, stop_words: &[&str]) { let mut builder = Settings::new(&mut wtxn, index, &config); let stop_words = stop_words.iter().map(|s| s.to_string()).collect(); builder.set_stop_words(stop_words); - builder.execute(|_| (), || false, Default::default()).unwrap(); + builder.execute(&|| false, &Progress::default(), Default::default()).unwrap(); wtxn.commit().unwrap(); } diff --git a/crates/milli/tests/search/query_criteria.rs b/crates/milli/tests/search/query_criteria.rs index b72978330..cb0c23e42 100644 --- a/crates/milli/tests/search/query_criteria.rs +++ b/crates/milli/tests/search/query_criteria.rs @@ -236,7 +236,7 @@ fn criteria_mixup() { let mut wtxn = index.write_txn().unwrap(); let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_criteria(criteria.clone()); - builder.execute(|_| (), || false, Default::default()).unwrap(); + builder.execute(&|| false, &Progress::default(), Default::default()).unwrap(); wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); @@ -276,7 +276,7 @@ fn criteria_ascdesc() { S("name"), S("age"), }); - builder.execute(|_| (), || false, Default::default()).unwrap(); + builder.execute(&|| false, &Progress::default(), Default::default()).unwrap(); wtxn.commit().unwrap(); let mut wtxn = index.write_txn().unwrap(); @@ -359,7 +359,7 @@ fn criteria_ascdesc() { let mut wtxn = index.write_txn().unwrap(); let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_criteria(vec![criterion.clone()]); - builder.execute(|_| (), || false, Default::default()).unwrap(); + builder.execute(&|| false, &Progress::default(), Default::default()).unwrap(); wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); diff --git a/crates/milli/tests/search/typo_tolerance.rs b/crates/milli/tests/search/typo_tolerance.rs index 9aacbf82a..49c9c7b5d 100644 --- a/crates/milli/tests/search/typo_tolerance.rs +++ b/crates/milli/tests/search/typo_tolerance.rs @@ -46,7 +46,7 @@ fn test_typo_tolerance_one_typo() { let config = IndexerConfig::default(); let mut builder = Settings::new(&mut txn, &index, &config); builder.set_min_word_len_one_typo(4); - builder.execute(|_| (), || false, Default::default()).unwrap(); + builder.execute(&|| false, &Progress::default(), Default::default()).unwrap(); // typo is now supported for 4 letters words let mut search = Search::new(&txn, &index); @@ -92,7 +92,7 @@ fn test_typo_tolerance_two_typo() { let config = IndexerConfig::default(); let mut builder = Settings::new(&mut txn, &index, &config); builder.set_min_word_len_two_typos(7); - builder.execute(|_| (), || false, Default::default()).unwrap(); + builder.execute(&|| false, &Progress::default(), Default::default()).unwrap(); // typo is now supported for 4 letters words let mut search = Search::new(&txn, &index); @@ -181,7 +181,7 @@ fn test_typo_disabled_on_word() { // `zealand` doesn't allow typos anymore exact_words.insert("zealand".to_string()); builder.set_exact_words(exact_words); - builder.execute(|_| (), || false, Default::default()).unwrap(); + builder.execute(&|| false, &Progress::default(), Default::default()).unwrap(); let mut search = Search::new(&txn, &index); search.query("zealand"); @@ -219,7 +219,7 @@ fn test_disable_typo_on_attribute() { let mut builder = Settings::new(&mut txn, &index, &config); // disable typos on `description` builder.set_exact_attributes(vec!["description".to_string()].into_iter().collect()); - builder.execute(|_| (), || false, Default::default()).unwrap(); + builder.execute(&|| false, &Progress::default(), Default::default()).unwrap(); let mut search = Search::new(&txn, &index); search.query("antebelum");