Merge pull request #5687 from meilisearch/settings-indexer-edition-2024

Settings indexer edition 2024
This commit is contained in:
Louis Dureuil 2025-07-01 07:35:21 +00:00 committed by GitHub
commit 0ef52941c7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
38 changed files with 1314 additions and 266 deletions

View file

@ -65,7 +65,7 @@ fn setup_settings<'t>(
let sortable_fields = sortable_fields.iter().map(|s| s.to_string()).collect(); let sortable_fields = sortable_fields.iter().map(|s| s.to_string()).collect();
builder.set_sortable_fields(sortable_fields); builder.set_sortable_fields(sortable_fields);
builder.execute(|_| (), || false, Default::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
} }
fn setup_index_with_settings( fn setup_index_with_settings(

View file

@ -90,7 +90,7 @@ pub fn base_setup(conf: &Conf) -> Index {
(conf.configure)(&mut builder); (conf.configure)(&mut builder);
builder.execute(|_| (), || false, Default::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
let config = IndexerConfig::default(); let config = IndexerConfig::default();

View file

@ -246,8 +246,8 @@ impl IndexScheduler {
builder builder
.execute( .execute(
|indexing_step| tracing::debug!(update = ?indexing_step), &|| must_stop_processing.get(),
|| must_stop_processing.get(), &progress,
current_batch.embedder_stats.clone(), current_batch.embedder_stats.clone(),
) )
.map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?; .map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?;

View file

@ -474,15 +474,11 @@ impl IndexScheduler {
} }
progress.update_progress(SettingsProgress::ApplyTheSettings); progress.update_progress(SettingsProgress::ApplyTheSettings);
builder let congestion = builder
.execute( .execute(&|| must_stop_processing.get(), progress, embedder_stats)
|indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.get(),
embedder_stats,
)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
Ok((tasks, None)) Ok((tasks, congestion))
} }
IndexOperation::DocumentClearAndSetting { IndexOperation::DocumentClearAndSetting {
index_uid, index_uid,

View file

@ -399,7 +399,7 @@ fn import_vectors_first_and_embedder_later() {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// the all the vectors linked to the new specified embedder have been removed // the all the vectors linked to the new specified embedder have been removed
// Only the unknown embedders stays in the document DB // Only the unknown embedders stays in the document DB
snapshot!(serde_json::to_string(&documents).unwrap(), @r###"[{"id":0,"doggo":"kefir"},{"id":1,"doggo":"intel","_vectors":{"unknown embedder":[1.0,2.0,3.0]}},{"id":2,"doggo":"max","_vectors":{"unknown embedder":[4.0,5.0]}},{"id":3,"doggo":"marcel"},{"id":4,"doggo":"sora"}]"###); snapshot!(serde_json::to_string(&documents).unwrap(), @r###"[{"id":0,"doggo":"kefir"},{"id":1,"doggo":"intel","_vectors":{"unknown embedder":[1,2,3]}},{"id":2,"doggo":"max","_vectors":{"unknown embedder":[4,5]}},{"id":3,"doggo":"marcel"},{"id":4,"doggo":"sora"}]"###);
let conf = index.embedding_configs(&rtxn).unwrap(); let conf = index.embedding_configs(&rtxn).unwrap();
// even though we specified the vector for the ID 3, it shouldn't be marked // even though we specified the vector for the ID 3, it shouldn't be marked
// as user provided since we explicitely marked it as NOT user provided. // as user provided since we explicitely marked it as NOT user provided.
@ -800,7 +800,7 @@ fn delete_embedder_with_user_provided_vectors() {
.unwrap() .unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
snapshot!(serde_json::to_string(&documents).unwrap(), @r###"[{"id":0,"doggo":"kefir","_vectors":{"manual":{"embeddings":[[0.0,0.0,0.0]],"regenerate":false}}},{"id":1,"doggo":"intel","_vectors":{"manual":{"embeddings":[[1.0,1.0,1.0]],"regenerate":false}}}]"###); snapshot!(serde_json::to_string(&documents).unwrap(), @r###"[{"id":0,"doggo":"kefir","_vectors":{"manual":{"regenerate":false,"embeddings":[[0.0,0.0,0.0]]}}},{"id":1,"doggo":"intel","_vectors":{"manual":{"regenerate":false,"embeddings":[[1.0,1.0,1.0]]}}}]"###);
} }
{ {
@ -835,6 +835,6 @@ fn delete_embedder_with_user_provided_vectors() {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// FIXME: redaction // FIXME: redaction
snapshot!(json_string!(serde_json::to_string(&documents).unwrap(), { "[]._vectors.doggo_embedder.embeddings" => "[vector]" }), @r###""[{\"id\":0,\"doggo\":\"kefir\",\"_vectors\":{\"manual\":{\"embeddings\":[[0.0,0.0,0.0]],\"regenerate\":false},\"my_doggo_embedder\":{\"embeddings\":[[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]],\"regenerate\":false}}},{\"id\":1,\"doggo\":\"intel\",\"_vectors\":{\"manual\":{\"embeddings\":[[1.0,1.0,1.0]],\"regenerate\":false}}}]""###); snapshot!(json_string!(serde_json::to_string(&documents).unwrap(), { "[]._vectors.doggo_embedder.embeddings" => "[vector]" }), @r###""[{\"id\":0,\"doggo\":\"kefir\",\"_vectors\":{\"manual\":{\"regenerate\":false,\"embeddings\":[[0.0,0.0,0.0]]},\"my_doggo_embedder\":{\"regenerate\":false,\"embeddings\":[[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]]}}},{\"id\":1,\"doggo\":\"intel\",\"_vectors\":{\"manual\":{\"regenerate\":false,\"embeddings\":[[1.0,1.0,1.0]]}}}]""###);
} }
} }

View file

@ -202,6 +202,7 @@ struct Infos {
experimental_composite_embedders: bool, experimental_composite_embedders: bool,
experimental_embedding_cache_entries: usize, experimental_embedding_cache_entries: usize,
experimental_no_snapshot_compaction: bool, experimental_no_snapshot_compaction: bool,
experimental_no_edition_2024_for_settings: bool,
gpu_enabled: bool, gpu_enabled: bool,
db_path: bool, db_path: bool,
import_dump: bool, import_dump: bool,
@ -286,8 +287,12 @@ impl Infos {
ScheduleSnapshot::Enabled(interval) => Some(interval), ScheduleSnapshot::Enabled(interval) => Some(interval),
}; };
let IndexerOpts { max_indexing_memory, max_indexing_threads, skip_index_budget: _ } = let IndexerOpts {
indexer_options; max_indexing_memory,
max_indexing_threads,
skip_index_budget: _,
experimental_no_edition_2024_for_settings,
} = indexer_options;
let RuntimeTogglableFeatures { let RuntimeTogglableFeatures {
metrics, metrics,
@ -350,6 +355,7 @@ impl Infos {
ssl_require_auth, ssl_require_auth,
ssl_resumption, ssl_resumption,
ssl_tickets, ssl_tickets,
experimental_no_edition_2024_for_settings,
} }
} }
} }

View file

@ -37,7 +37,7 @@ use index_scheduler::{IndexScheduler, IndexSchedulerOptions};
use meilisearch_auth::{open_auth_store_env, AuthController}; use meilisearch_auth::{open_auth_store_env, AuthController};
use meilisearch_types::milli::constants::VERSION_MAJOR; use meilisearch_types::milli::constants::VERSION_MAJOR;
use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
use meilisearch_types::milli::progress::EmbedderStats; use meilisearch_types::milli::progress::{EmbedderStats, Progress};
use meilisearch_types::milli::update::{ use meilisearch_types::milli::update::{
default_thread_pool_and_threads, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, default_thread_pool_and_threads, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig,
}; };
@ -464,6 +464,7 @@ fn import_dump(
index_scheduler: &mut IndexScheduler, index_scheduler: &mut IndexScheduler,
auth: &mut AuthController, auth: &mut AuthController,
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
let progress = Progress::default();
let reader = File::open(dump_path)?; let reader = File::open(dump_path)?;
let mut dump_reader = dump::DumpReader::open(reader)?; let mut dump_reader = dump::DumpReader::open(reader)?;
@ -544,11 +545,7 @@ fn import_dump(
let settings = index_reader.settings()?; let settings = index_reader.settings()?;
apply_settings_to_builder(&settings, &mut builder); apply_settings_to_builder(&settings, &mut builder);
let embedder_stats: Arc<EmbedderStats> = Default::default(); let embedder_stats: Arc<EmbedderStats> = Default::default();
builder.execute( builder.execute(&|| false, &progress, embedder_stats.clone())?;
|indexing_step| tracing::debug!("update: {:?}", indexing_step),
|| false,
embedder_stats.clone(),
)?;
// 4.3 Import the documents. // 4.3 Import the documents.
// 4.3.1 We need to recreate the grenad+obkv format accepted by the index. // 4.3.1 We need to recreate the grenad+obkv format accepted by the index.

View file

@ -53,6 +53,8 @@ const MEILI_EXPERIMENTAL_DUMPLESS_UPGRADE: &str = "MEILI_EXPERIMENTAL_DUMPLESS_U
const MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS: &str = "MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS"; const MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS: &str = "MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS";
const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE"; const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE";
const MEILI_EXPERIMENTAL_CONTAINS_FILTER: &str = "MEILI_EXPERIMENTAL_CONTAINS_FILTER"; const MEILI_EXPERIMENTAL_CONTAINS_FILTER: &str = "MEILI_EXPERIMENTAL_CONTAINS_FILTER";
const MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_SETTINGS: &str =
"MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_SETTINGS";
const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS"; const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS";
const MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE: &str = "MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE"; const MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE: &str = "MEILI_EXPERIMENTAL_SEARCH_QUEUE_SIZE";
const MEILI_EXPERIMENTAL_DROP_SEARCH_AFTER: &str = "MEILI_EXPERIMENTAL_DROP_SEARCH_AFTER"; const MEILI_EXPERIMENTAL_DROP_SEARCH_AFTER: &str = "MEILI_EXPERIMENTAL_DROP_SEARCH_AFTER";
@ -749,12 +751,25 @@ pub struct IndexerOpts {
#[clap(skip)] #[clap(skip)]
#[serde(skip)] #[serde(skip)]
pub skip_index_budget: bool, pub skip_index_budget: bool,
/// Experimental no edition 2024 for settings feature. For more information,
/// see: <https://github.com/orgs/meilisearch/discussions/847>
///
/// Enables the experimental no edition 2024 for settings feature.
#[clap(long, env = MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_SETTINGS)]
#[serde(default)]
pub experimental_no_edition_2024_for_settings: bool,
} }
impl IndexerOpts { impl IndexerOpts {
/// Exports the values to their corresponding env vars if they are not set. /// Exports the values to their corresponding env vars if they are not set.
pub fn export_to_env(self) { pub fn export_to_env(self) {
let IndexerOpts { max_indexing_memory, max_indexing_threads, skip_index_budget: _ } = self; let IndexerOpts {
max_indexing_memory,
max_indexing_threads,
skip_index_budget: _,
experimental_no_edition_2024_for_settings,
} = self;
if let Some(max_indexing_memory) = max_indexing_memory.0 { if let Some(max_indexing_memory) = max_indexing_memory.0 {
export_to_env_if_not_present( export_to_env_if_not_present(
MEILI_MAX_INDEXING_MEMORY, MEILI_MAX_INDEXING_MEMORY,
@ -767,6 +782,12 @@ impl IndexerOpts {
max_indexing_threads.to_string(), max_indexing_threads.to_string(),
); );
} }
if experimental_no_edition_2024_for_settings {
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_NO_EDITION_2024_FOR_SETTINGS,
experimental_no_edition_2024_for_settings.to_string(),
);
}
} }
} }
@ -785,7 +806,12 @@ impl TryFrom<&IndexerOpts> for IndexerConfig {
max_threads: *other.max_indexing_threads, max_threads: *other.max_indexing_threads,
max_positions_per_attributes: None, max_positions_per_attributes: None,
skip_index_budget: other.skip_index_budget, skip_index_budget: other.skip_index_budget,
..Default::default() experimental_no_edition_2024_for_settings: other
.experimental_no_edition_2024_for_settings,
chunk_compression_type: Default::default(),
chunk_compression_level: Default::default(),
documents_chunk_size: Default::default(),
max_nb_chunks: Default::default(),
}) })
} }
} }

View file

@ -464,6 +464,7 @@ pub fn default_settings(dir: impl AsRef<Path>) -> Opt {
skip_index_budget: true, skip_index_budget: true,
// Having 2 threads makes the tests way faster // Having 2 threads makes the tests way faster
max_indexing_threads: MaxThreads::from_str("2").unwrap(), max_indexing_threads: MaxThreads::from_str("2").unwrap(),
experimental_no_edition_2024_for_settings: false,
}, },
experimental_enable_metrics: false, experimental_enable_metrics: false,
..Parser::parse_from(None as Option<&str>) ..Parser::parse_from(None as Option<&str>)

View file

@ -44,7 +44,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
S("america") => vec![S("the united states")], S("america") => vec![S("the united states")],
}); });
builder.set_searchable_fields(vec![S("title"), S("description")]); builder.set_searchable_fields(vec![S("title"), S("description")]);
builder.execute(|_| (), || false, Default::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// index documents // index documents

View file

@ -135,7 +135,7 @@ impl TempIndex {
) -> Result<(), crate::error::Error> { ) -> Result<(), crate::error::Error> {
let mut builder = update::Settings::new(wtxn, &self.inner, &self.indexer_config); let mut builder = update::Settings::new(wtxn, &self.inner, &self.indexer_config);
update(&mut builder); update(&mut builder);
builder.execute(drop, || false, Default::default())?; builder.execute(&|| false, &Progress::default(), Default::default())?;
Ok(()) Ok(())
} }

View file

@ -15,6 +15,7 @@ pub struct IndexerConfig {
pub thread_pool: ThreadPoolNoAbort, pub thread_pool: ThreadPoolNoAbort,
pub max_positions_per_attributes: Option<u32>, pub max_positions_per_attributes: Option<u32>,
pub skip_index_budget: bool, pub skip_index_budget: bool,
pub experimental_no_edition_2024_for_settings: bool,
} }
impl IndexerConfig { impl IndexerConfig {
@ -63,6 +64,7 @@ impl Default for IndexerConfig {
chunk_compression_level: None, chunk_compression_level: None,
max_positions_per_attributes: None, max_positions_per_attributes: None,
skip_index_budget: false, skip_index_budget: false,
experimental_no_edition_2024_for_settings: false,
} }
} }
} }

View file

@ -1,7 +1,10 @@
use std::cell::{Cell, RefCell};
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use std::sync::RwLock;
use bumpalo::Bump;
use bumparaw_collections::RawMap; use bumparaw_collections::RawMap;
use heed::RoTxn; use heed::{RoTxn, WithoutTls};
use rustc_hash::FxBuildHasher; use rustc_hash::FxBuildHasher;
use serde_json::value::RawValue; use serde_json::value::RawValue;
@ -9,7 +12,13 @@ use super::vector_document::VectorDocument;
use super::{KvReaderFieldId, KvWriterFieldId}; use super::{KvReaderFieldId, KvWriterFieldId};
use crate::constants::{RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME}; use crate::constants::{RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME};
use crate::documents::FieldIdMapper; use crate::documents::FieldIdMapper;
use crate::{DocumentId, GlobalFieldsIdsMap, Index, InternalError, Result, UserError}; use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
use crate::update::new::vector_document::VectorDocumentFromDb;
use crate::vector::settings::EmbedderAction;
use crate::{
DocumentId, FieldIdMapWithMetadata, FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError,
Result, UserError,
};
/// A view into a document that can represent either the current version from the DB, /// A view into a document that can represent either the current version from the DB,
/// the update data from payload or other means, or the merged updated version. /// the update data from payload or other means, or the merged updated version.
@ -309,6 +318,7 @@ where
pub fn write_to_obkv<'s, 'a, 'map, 'buffer>( pub fn write_to_obkv<'s, 'a, 'map, 'buffer>(
document: &'s impl Document<'s>, document: &'s impl Document<'s>,
vector_document: Option<&'s impl VectorDocument<'s>>, vector_document: Option<&'s impl VectorDocument<'s>>,
embedder_actions: &'a BTreeMap<String, EmbedderAction>,
fields_ids_map: &'a mut GlobalFieldsIdsMap<'map>, fields_ids_map: &'a mut GlobalFieldsIdsMap<'map>,
mut document_buffer: &'a mut bumpalo::collections::Vec<'buffer, u8>, mut document_buffer: &'a mut bumpalo::collections::Vec<'buffer, u8>,
) -> Result<&'a KvReaderFieldId> ) -> Result<&'a KvReaderFieldId>
@ -338,20 +348,39 @@ where
for res in vector_document.iter_vectors() { for res in vector_document.iter_vectors() {
let (name, entry) = res?; let (name, entry) = res?;
if entry.has_configured_embedder { if entry.has_configured_embedder {
continue; // we don't write vectors with configured embedder in documents if let Some(action) = embedder_actions.get(name) {
if action.write_back().is_some() && !entry.regenerate {
vectors.insert(
name,
serde_json::json!({
"regenerate": entry.regenerate,
// TODO: consider optimizing the shape of embedders here to store an array of f32 rather than a JSON object
"embeddings": entry.embeddings,
}),
);
}
}
} else {
match embedder_actions.get(name) {
Some(action) if action.write_back().is_none() => {
continue;
}
_ => {
vectors.insert(
name,
if entry.implicit {
serde_json::json!(entry.embeddings)
} else {
serde_json::json!({
"regenerate": entry.regenerate,
// TODO: consider optimizing the shape of embedders here to store an array of f32 rather than a JSON object
"embeddings": entry.embeddings,
})
},
);
}
}
} }
vectors.insert(
name,
if entry.implicit {
serde_json::json!(entry.embeddings)
} else {
serde_json::json!({
"regenerate": entry.regenerate,
// TODO: consider optimizing the shape of embedders here to store an array of f32 rather than a JSON object
"embeddings": entry.embeddings,
})
},
);
} }
if vectors.is_empty() { if vectors.is_empty() {
@ -439,3 +468,127 @@ impl<'doc> Versions<'doc> {
self.data.get(k) self.data.get(k)
} }
} }
pub struct DocumentIdentifiers<'doc> {
docid: DocumentId,
external_document_id: &'doc str,
}
impl<'doc> DocumentIdentifiers<'doc> {
pub fn create(docid: DocumentId, external_document_id: &'doc str) -> Self {
Self { docid, external_document_id }
}
pub fn docid(&self) -> DocumentId {
self.docid
}
pub fn external_document_id(&self) -> &'doc str {
self.external_document_id
}
pub fn current<'a, Mapper: FieldIdMapper>(
&self,
rtxn: &'a RoTxn,
index: &'a Index,
mapper: &'a Mapper,
) -> Result<DocumentFromDb<'a, Mapper>> {
Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or(
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
)?)
}
pub fn current_vectors<'a, Mapper: FieldIdMapper>(
&self,
rtxn: &'a RoTxn,
index: &'a Index,
mapper: &'a Mapper,
doc_alloc: &'a Bump,
) -> Result<VectorDocumentFromDb<'a>> {
Ok(VectorDocumentFromDb::new(self.docid, index, rtxn, mapper, doc_alloc)?.ok_or(
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
)?)
}
}
pub struct DocumentContext<
'doc, // covariant lifetime of a single `process` call
'extractor: 'doc, // invariant lifetime of the extractor_allocs
'fid: 'doc, // invariant lifetime of the new_fields_ids_map
'indexer: 'doc, // covariant lifetime of objects that outlive a single `process` call
T: MostlySend,
> {
/// The index we're indexing in
pub index: &'indexer Index,
/// The fields ids map as it was at the start of this indexing process. Contains at least all top-level fields from documents
/// inside of the DB.
pub db_fields_ids_map: &'indexer FieldsIdsMap,
/// A transaction providing data from the DB before all indexing operations
pub rtxn: RoTxn<'indexer, WithoutTls>,
/// Global field id map that is up to date with the current state of the indexing process.
///
/// - Inserting a field will take a lock
/// - Retrieving a field may take a lock as well
pub new_fields_ids_map: &'doc std::cell::RefCell<GlobalFieldsIdsMap<'fid>>,
/// Data allocated in this allocator is cleared between each call to `process`.
pub doc_alloc: Bump,
/// Data allocated in this allocator is not cleared between each call to `process`, unless the data spills.
pub extractor_alloc: &'extractor Bump,
/// Pool of doc allocators, used to retrieve the doc allocator we provided for the documents
pub doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
/// Extractor-specific data
pub data: &'doc T,
}
impl<
'doc, // covariant lifetime of a single `process` call
'data: 'doc, // invariant on T lifetime of the datastore
'extractor: 'doc, // invariant lifetime of extractor_allocs
'fid: 'doc, // invariant lifetime of fields ids map
'indexer: 'doc, // covariant lifetime of objects that survive a `process` call
T: MostlySend,
> DocumentContext<'doc, 'extractor, 'fid, 'indexer, T>
{
#[allow(clippy::too_many_arguments)]
pub fn new<F>(
index: &'indexer Index,
db_fields_ids_map: &'indexer FieldsIdsMap,
new_fields_ids_map: &'fid RwLock<FieldIdMapWithMetadata>,
extractor_allocs: &'extractor ThreadLocal<FullySend<Bump>>,
doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
datastore: &'data ThreadLocal<T>,
fields_ids_map_store: &'doc ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>,
init_data: F,
) -> Result<Self>
where
F: FnOnce(&'extractor Bump) -> Result<T>,
{
let doc_alloc =
doc_allocs.get_or(|| FullySend(Cell::new(Bump::with_capacity(1024 * 1024))));
let doc_alloc = doc_alloc.0.take();
let fields_ids_map = fields_ids_map_store
.get_or(|| RefCell::new(GlobalFieldsIdsMap::new(new_fields_ids_map)).into());
let fields_ids_map = &fields_ids_map.0;
let extractor_alloc = extractor_allocs.get_or_default();
let data = datastore.get_or_try(move || init_data(&extractor_alloc.0))?;
let txn = index.read_txn()?;
Ok(DocumentContext {
index,
rtxn: txn,
db_fields_ids_map,
new_fields_ids_map: fields_ids_map,
doc_alloc,
extractor_alloc: &extractor_alloc.0,
data,
doc_allocs,
})
}
}

View file

@ -10,20 +10,16 @@ use super::vector_document::{
}; };
use crate::attribute_patterns::PatternMatch; use crate::attribute_patterns::PatternMatch;
use crate::documents::FieldIdMapper; use crate::documents::FieldIdMapper;
use crate::update::new::document::DocumentIdentifiers;
use crate::vector::EmbeddingConfigs; use crate::vector::EmbeddingConfigs;
use crate::{DocumentId, Index, InternalError, Result}; use crate::{DocumentId, Index, InternalError, Result};
pub enum DocumentChange<'doc> { pub enum DocumentChange<'doc> {
Deletion(Deletion<'doc>), Deletion(DocumentIdentifiers<'doc>),
Update(Update<'doc>), Update(Update<'doc>),
Insertion(Insertion<'doc>), Insertion(Insertion<'doc>),
} }
pub struct Deletion<'doc> {
docid: DocumentId,
external_document_id: &'doc str,
}
pub struct Update<'doc> { pub struct Update<'doc> {
docid: DocumentId, docid: DocumentId,
external_document_id: &'doc str, external_document_id: &'doc str,
@ -55,31 +51,6 @@ impl<'doc> DocumentChange<'doc> {
} }
} }
impl<'doc> Deletion<'doc> {
pub fn create(docid: DocumentId, external_document_id: &'doc str) -> Self {
Self { docid, external_document_id }
}
pub fn docid(&self) -> DocumentId {
self.docid
}
pub fn external_document_id(&self) -> &'doc str {
self.external_document_id
}
pub fn current<'a, Mapper: FieldIdMapper>(
&self,
rtxn: &'a RoTxn,
index: &'a Index,
mapper: &'a Mapper,
) -> Result<DocumentFromDb<'a, Mapper>> {
Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or(
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
)?)
}
}
impl<'doc> Insertion<'doc> { impl<'doc> Insertion<'doc> {
pub fn create(docid: DocumentId, external_document_id: &'doc str, new: Versions<'doc>) -> Self { pub fn create(docid: DocumentId, external_document_id: &'doc str, new: Versions<'doc>) -> Self {
Insertion { docid, external_document_id, new } Insertion { docid, external_document_id, new }

View file

@ -1,16 +1,24 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::BTreeMap;
use bumpalo::Bump; use bumpalo::Bump;
use hashbrown::HashMap; use hashbrown::HashMap;
use super::DelAddRoaringBitmap; use super::DelAddRoaringBitmap;
use crate::constants::RESERVED_GEO_FIELD_NAME; use crate::constants::RESERVED_GEO_FIELD_NAME;
use crate::update::new::channel::DocumentsSender; use crate::update::new::channel::{DocumentsSender, ExtractorBbqueueSender};
use crate::update::new::document::{write_to_obkv, Document as _}; use crate::update::new::document::{write_to_obkv, Document};
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor}; use crate::update::new::document::{DocumentContext, DocumentIdentifiers};
use crate::update::new::indexer::document_changes::{Extractor, IndexingContext};
use crate::update::new::indexer::settings_changes::{
settings_change_extract, DocumentsIndentifiers, SettingsChangeExtractor,
};
use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::thread_local::FullySend; use crate::update::new::thread_local::{FullySend, ThreadLocal};
use crate::update::new::vector_document::VectorDocument;
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::update::settings::SettingsDelta;
use crate::vector::settings::EmbedderAction;
use crate::vector::EmbeddingConfigs; use crate::vector::EmbeddingConfigs;
use crate::Result; use crate::Result;
@ -41,10 +49,11 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> {
fn process<'doc>( fn process<'doc>(
&self, &self,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>, changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &DocumentChangeContext<Self::Data>, context: &DocumentContext<Self::Data>,
) -> Result<()> { ) -> Result<()> {
let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc); let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc);
let mut document_extractor_data = context.data.0.borrow_mut_or_yield(); let mut document_extractor_data = context.data.0.borrow_mut_or_yield();
let embedder_actions = &Default::default();
for change in changes { for change in changes {
let change = change?; let change = change?;
@ -121,9 +130,11 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> {
let content = write_to_obkv( let content = write_to_obkv(
&content, &content,
vector_content.as_ref(), vector_content.as_ref(),
embedder_actions,
&mut new_fields_ids_map, &mut new_fields_ids_map,
&mut document_buffer, &mut document_buffer,
)?; )?;
self.document_sender.uncompressed(docid, external_docid, content).unwrap(); self.document_sender.uncompressed(docid, external_docid, content).unwrap();
} }
DocumentChange::Insertion(insertion) => { DocumentChange::Insertion(insertion) => {
@ -146,6 +157,7 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> {
let content = write_to_obkv( let content = write_to_obkv(
&content, &content,
inserted_vectors.as_ref(), inserted_vectors.as_ref(),
embedder_actions,
&mut new_fields_ids_map, &mut new_fields_ids_map,
&mut document_buffer, &mut document_buffer,
)?; )?;
@ -158,3 +170,144 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> {
Ok(()) Ok(())
} }
} }
pub struct SettingsChangeDocumentExtractor<'a, 'b> {
document_sender: DocumentsSender<'a, 'b>,
embedder_actions: &'a BTreeMap<String, EmbedderAction>,
}
impl<'a, 'b> SettingsChangeDocumentExtractor<'a, 'b> {
pub fn new(
document_sender: DocumentsSender<'a, 'b>,
embedder_actions: &'a BTreeMap<String, EmbedderAction>,
) -> Self {
Self { document_sender, embedder_actions }
}
}
impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentExtractor<'_, '_> {
type Data = FullySend<()>;
fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
Ok(FullySend(()))
}
fn process<'doc>(
&self,
documents: impl Iterator<Item = Result<DocumentIdentifiers<'doc>>>,
context: &DocumentContext<Self::Data>,
) -> Result<()> {
let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc);
for document in documents {
let document = document?;
// **WARNING**: the exclusive borrow on `new_fields_ids_map` needs to be taken **inside** of the `for change in changes` loop
// Otherwise, `BorrowMutError` will occur for document changes that also need the new_fields_ids_map (e.g.: UpdateByFunction)
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
let external_docid = document.external_document_id().to_owned();
let content =
document.current(&context.rtxn, context.index, &context.db_fields_ids_map)?;
let vector_content = document.current_vectors(
&context.rtxn,
context.index,
&context.db_fields_ids_map,
&context.doc_alloc,
)?;
// if the document doesn't need to be updated, we skip it
if !must_update_document(&vector_content, self.embedder_actions)? {
continue;
}
let content = write_to_obkv(
&content,
Some(&vector_content),
self.embedder_actions,
&mut new_fields_ids_map,
&mut document_buffer,
)?;
self.document_sender.uncompressed(document.docid(), external_docid, content).unwrap();
}
Ok(())
}
}
/// Modify the database documents based on the settings changes.
///
/// This function extracts the documents from the database,
/// modifies them by adding or removing vector fields based on embedder actions,
/// and then updates the database.
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents::extract")]
pub fn update_database_documents<'indexer, 'extractor, MSP, SD>(
documents: &'indexer DocumentsIndentifiers<'indexer>,
indexing_context: IndexingContext<MSP>,
extractor_sender: &ExtractorBbqueueSender,
settings_delta: &SD,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
) -> Result<()>
where
MSP: Fn() -> bool + Sync,
SD: SettingsDelta,
{
if !must_update_database(settings_delta) {
return Ok(());
}
let document_sender = extractor_sender.documents();
let document_extractor =
SettingsChangeDocumentExtractor::new(document_sender, settings_delta.embedder_actions());
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
settings_change_extract(
documents,
&document_extractor,
indexing_context,
extractor_allocs,
&datastore,
crate::update::new::steps::IndexingStep::ExtractingDocuments,
)?;
Ok(())
}
fn must_update_database<SD: SettingsDelta>(settings_delta: &SD) -> bool {
settings_delta.embedder_actions().iter().any(|(name, action)| {
if action.reindex().is_some() {
// if action has a reindex, we need to update the documents database if the embedder is a new one
settings_delta.old_embedders().get(name).is_none()
} else {
// if action has a write_back, we need to update the documents database
action.write_back().is_some()
}
})
}
fn must_update_document<'s, 'a>(
vector_document: &'s impl VectorDocument<'s>,
embedder_actions: &'a BTreeMap<String, EmbedderAction>,
) -> Result<bool>
where
's: 'a,
{
// Check if any vector needs to be written back for the document
for (name, action) in embedder_actions {
// if the vector entry is not found, we don't need to update the document
let Some(vector_entry) = vector_document.vectors_for_key(name)? else {
continue;
};
// if the vector entry is user provided, we need to update the document by writing back vectors.
let write_back = action.write_back().is_some() && !vector_entry.regenerate;
// if the vector entry is a new embedder, we need to update the document removing the vectors from the document.
let new_embedder = action.reindex().is_some() && !vector_entry.has_configured_embedder;
if write_back || new_embedder {
return Ok(true);
}
}
Ok(false)
}

View file

@ -15,9 +15,10 @@ use crate::filterable_attributes_rules::match_faceted_field;
use crate::heed_codec::facet::OrderedF64Codec; use crate::heed_codec::facet::OrderedF64Codec;
use crate::update::del_add::DelAdd; use crate::update::del_add::DelAdd;
use crate::update::new::channel::FieldIdDocidFacetSender; use crate::update::new::channel::FieldIdDocidFacetSender;
use crate::update::new::document::DocumentContext;
use crate::update::new::extract::perm_json_p; use crate::update::new::extract::perm_json_p;
use crate::update::new::indexer::document_changes::{ use crate::update::new::indexer::document_changes::{
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, extract, DocumentChanges, Extractor, IndexingContext,
}; };
use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::steps::IndexingStep; use crate::update::new::steps::IndexingStep;
@ -51,7 +52,7 @@ impl<'extractor> Extractor<'extractor> for FacetedExtractorData<'_, '_> {
fn process<'doc>( fn process<'doc>(
&self, &self,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>, changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &DocumentChangeContext<Self::Data>, context: &DocumentContext<Self::Data>,
) -> Result<()> { ) -> Result<()> {
for change in changes { for change in changes {
let change = change?; let change = change?;
@ -75,7 +76,7 @@ pub struct FacetedDocidsExtractor;
impl FacetedDocidsExtractor { impl FacetedDocidsExtractor {
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
fn extract_document_change( fn extract_document_change(
context: &DocumentChangeContext<RefCell<BalancedCaches>>, context: &DocumentContext<RefCell<BalancedCaches>>,
filterable_attributes: &[FilterableAttributesRule], filterable_attributes: &[FilterableAttributesRule],
sortable_fields: &HashSet<String>, sortable_fields: &HashSet<String>,
asc_desc_fields: &HashSet<String>, asc_desc_fields: &HashSet<String>,

View file

@ -10,8 +10,8 @@ use serde_json::value::RawValue;
use serde_json::Value; use serde_json::Value;
use crate::error::GeoError; use crate::error::GeoError;
use crate::update::new::document::Document; use crate::update::new::document::{Document, DocumentContext};
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor}; use crate::update::new::indexer::document_changes::Extractor;
use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::thread_local::MostlySend; use crate::update::new::thread_local::MostlySend;
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
@ -150,7 +150,7 @@ impl<'extractor> Extractor<'extractor> for GeoExtractor {
fn process<'doc>( fn process<'doc>(
&'doc self, &'doc self,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>, changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &'doc DocumentChangeContext<Self::Data>, context: &'doc DocumentContext<Self::Data>,
) -> Result<()> { ) -> Result<()> {
let rtxn = &context.rtxn; let rtxn = &context.rtxn;
let index = context.index; let index = context.index;

View file

@ -12,7 +12,7 @@ pub use documents::*;
pub use faceted::*; pub use faceted::*;
pub use geo::*; pub use geo::*;
pub use searchable::*; pub use searchable::*;
pub use vectors::EmbeddingExtractor; pub use vectors::{EmbeddingExtractor, SettingsChangeEmbeddingExtractor};
/// TODO move in permissive json pointer /// TODO move in permissive json pointer
pub mod perm_json_p { pub mod perm_json_p {

View file

@ -8,10 +8,11 @@ use bumpalo::Bump;
use super::match_searchable_field; use super::match_searchable_field;
use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer};
use crate::update::new::document::DocumentContext;
use crate::update::new::extract::cache::BalancedCaches; use crate::update::new::extract::cache::BalancedCaches;
use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::extract::perm_json_p::contained_in;
use crate::update::new::indexer::document_changes::{ use crate::update::new::indexer::document_changes::{
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, extract, DocumentChanges, Extractor, IndexingContext,
}; };
use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::steps::IndexingStep; use crate::update::new::steps::IndexingStep;
@ -226,7 +227,7 @@ impl<'extractor> Extractor<'extractor> for WordDocidsExtractorData<'_> {
fn process<'doc>( fn process<'doc>(
&self, &self,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>, changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &DocumentChangeContext<Self::Data>, context: &DocumentContext<Self::Data>,
) -> Result<()> { ) -> Result<()> {
for change in changes { for change in changes {
let change = change?; let change = change?;
@ -305,7 +306,7 @@ impl WordDocidsExtractors {
} }
fn extract_document_change( fn extract_document_change(
context: &DocumentChangeContext<RefCell<Option<WordDocidsBalancedCaches>>>, context: &DocumentContext<RefCell<Option<WordDocidsBalancedCaches>>>,
document_tokenizer: &DocumentTokenizer, document_tokenizer: &DocumentTokenizer,
searchable_attributes: Option<&[&str]>, searchable_attributes: Option<&[&str]>,
document_change: DocumentChange, document_change: DocumentChange,

View file

@ -7,10 +7,10 @@ use bumpalo::Bump;
use super::match_searchable_field; use super::match_searchable_field;
use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer};
use crate::proximity::{index_proximity, MAX_DISTANCE}; use crate::proximity::{index_proximity, MAX_DISTANCE};
use crate::update::new::document::Document; use crate::update::new::document::{Document, DocumentContext};
use crate::update::new::extract::cache::BalancedCaches; use crate::update::new::extract::cache::BalancedCaches;
use crate::update::new::indexer::document_changes::{ use crate::update::new::indexer::document_changes::{
extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, extract, DocumentChanges, Extractor, IndexingContext,
}; };
use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::steps::IndexingStep; use crate::update::new::steps::IndexingStep;
@ -39,7 +39,7 @@ impl<'extractor> Extractor<'extractor> for WordPairProximityDocidsExtractorData<
fn process<'doc>( fn process<'doc>(
&self, &self,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>, changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &DocumentChangeContext<Self::Data>, context: &DocumentContext<Self::Data>,
) -> Result<()> { ) -> Result<()> {
for change in changes { for change in changes {
let change = change?; let change = change?;
@ -116,7 +116,7 @@ impl WordPairProximityDocidsExtractor {
// and to store the docids of the documents that have a number of words in a given field // and to store the docids of the documents that have a number of words in a given field
// equal to or under than MAX_COUNTED_WORDS. // equal to or under than MAX_COUNTED_WORDS.
fn extract_document_change( fn extract_document_change(
context: &DocumentChangeContext<RefCell<BalancedCaches>>, context: &DocumentContext<RefCell<BalancedCaches>>,
document_tokenizer: &DocumentTokenizer, document_tokenizer: &DocumentTokenizer,
searchable_attributes: Option<&[&str]>, searchable_attributes: Option<&[&str]>,
document_change: DocumentChange, document_change: DocumentChange,

View file

@ -1,4 +1,5 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::BTreeMap;
use bumpalo::collections::Vec as BVec; use bumpalo::collections::Vec as BVec;
use bumpalo::Bump; use bumpalo::Bump;
@ -9,13 +10,16 @@ use crate::error::FaultSource;
use crate::progress::EmbedderStats; use crate::progress::EmbedderStats;
use crate::prompt::Prompt; use crate::prompt::Prompt;
use crate::update::new::channel::EmbeddingSender; use crate::update::new::channel::EmbeddingSender;
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor}; use crate::update::new::document::{DocumentContext, DocumentIdentifiers};
use crate::update::new::indexer::document_changes::Extractor;
use crate::update::new::indexer::settings_changes::SettingsChangeExtractor;
use crate::update::new::thread_local::MostlySend; use crate::update::new::thread_local::MostlySend;
use crate::update::new::vector_document::VectorDocument; use crate::update::new::vector_document::VectorDocument;
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::vector::error::{ use crate::vector::error::{
EmbedErrorKind, PossibleEmbeddingMistakes, UnusedVectorsDistributionBump, EmbedErrorKind, PossibleEmbeddingMistakes, UnusedVectorsDistributionBump,
}; };
use crate::vector::settings::{EmbedderAction, ReindexAction};
use crate::vector::{Embedder, Embedding, EmbeddingConfigs}; use crate::vector::{Embedder, Embedding, EmbeddingConfigs};
use crate::{DocumentId, FieldDistribution, InternalError, Result, ThreadPoolNoAbort, UserError}; use crate::{DocumentId, FieldDistribution, InternalError, Result, ThreadPoolNoAbort, UserError};
@ -56,7 +60,7 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> {
fn process<'doc>( fn process<'doc>(
&'doc self, &'doc self,
changes: impl Iterator<Item = crate::Result<DocumentChange<'doc>>>, changes: impl Iterator<Item = crate::Result<DocumentChange<'doc>>>,
context: &'doc DocumentChangeContext<Self::Data>, context: &'doc DocumentContext<Self::Data>,
) -> crate::Result<()> { ) -> crate::Result<()> {
let embedders = self.embedders.inner_as_ref(); let embedders = self.embedders.inner_as_ref();
let mut unused_vectors_distribution = let mut unused_vectors_distribution =
@ -294,6 +298,209 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> {
} }
} }
pub struct SettingsChangeEmbeddingExtractor<'a, 'b> {
embedders: &'a EmbeddingConfigs,
old_embedders: &'a EmbeddingConfigs,
embedder_actions: &'a BTreeMap<String, EmbedderAction>,
embedder_category_id: &'a std::collections::HashMap<String, u8>,
embedder_stats: &'a EmbedderStats,
sender: EmbeddingSender<'a, 'b>,
possible_embedding_mistakes: PossibleEmbeddingMistakes,
threads: &'a ThreadPoolNoAbort,
}
impl<'a, 'b> SettingsChangeEmbeddingExtractor<'a, 'b> {
#[allow(clippy::too_many_arguments)]
pub fn new(
embedders: &'a EmbeddingConfigs,
old_embedders: &'a EmbeddingConfigs,
embedder_actions: &'a BTreeMap<String, EmbedderAction>,
embedder_category_id: &'a std::collections::HashMap<String, u8>,
embedder_stats: &'a EmbedderStats,
sender: EmbeddingSender<'a, 'b>,
field_distribution: &'a FieldDistribution,
threads: &'a ThreadPoolNoAbort,
) -> Self {
let possible_embedding_mistakes = PossibleEmbeddingMistakes::new(field_distribution);
Self {
embedders,
old_embedders,
embedder_actions,
embedder_category_id,
embedder_stats,
sender,
threads,
possible_embedding_mistakes,
}
}
}
impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeEmbeddingExtractor<'_, '_> {
type Data = RefCell<EmbeddingExtractorData<'extractor>>;
fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> crate::Result<Self::Data> {
Ok(RefCell::new(EmbeddingExtractorData(HashMap::new_in(extractor_alloc))))
}
fn process<'doc>(
&'doc self,
documents: impl Iterator<Item = crate::Result<DocumentIdentifiers<'doc>>>,
context: &'doc DocumentContext<Self::Data>,
) -> crate::Result<()> {
let embedders = self.embedders.inner_as_ref();
let old_embedders = self.old_embedders.inner_as_ref();
let unused_vectors_distribution = UnusedVectorsDistributionBump::new_in(&context.doc_alloc);
let mut all_chunks = BVec::with_capacity_in(embedders.len(), &context.doc_alloc);
for (embedder_name, (embedder, prompt, _is_quantized)) in embedders {
// if the embedder is not in the embedder_actions, we don't need to reindex.
if let Some((embedder_id, reindex_action)) =
self.embedder_actions
.get(embedder_name)
// keep only the reindex actions
.and_then(EmbedderAction::reindex)
// map the reindex action to the embedder_id
.map(|reindex| {
let embedder_id = self.embedder_category_id.get(embedder_name).expect(
"An embedder_category_id must exist for all reindexed embedders",
);
(*embedder_id, reindex)
})
{
all_chunks.push((
Chunks::new(
embedder,
embedder_id,
embedder_name,
prompt,
context.data,
&self.possible_embedding_mistakes,
self.embedder_stats,
self.threads,
self.sender,
&context.doc_alloc,
),
reindex_action,
))
}
}
for document in documents {
let document = document?;
let current_vectors = document.current_vectors(
&context.rtxn,
context.index,
context.db_fields_ids_map,
&context.doc_alloc,
)?;
for (chunks, reindex_action) in &mut all_chunks {
let embedder_name = chunks.embedder_name();
let current_vectors = current_vectors.vectors_for_key(embedder_name)?;
// if the vectors for this document have been already provided, we don't need to reindex.
let (is_new_embedder, must_regenerate) =
current_vectors.as_ref().map_or((true, true), |vectors| {
(!vectors.has_configured_embedder, vectors.regenerate)
});
match reindex_action {
ReindexAction::RegeneratePrompts => {
if !must_regenerate {
continue;
}
// we need to regenerate the prompts for the document
// Get the old prompt and render the document with it
let Some((_, old_prompt, _)) = old_embedders.get(embedder_name) else {
unreachable!("ReindexAction::RegeneratePrompts implies that the embedder {embedder_name} is in the old_embedders")
};
let old_rendered = old_prompt.render_document(
document.external_document_id(),
document.current(
&context.rtxn,
context.index,
context.db_fields_ids_map,
)?,
context.new_fields_ids_map,
&context.doc_alloc,
)?;
// Get the new prompt and render the document with it
let new_prompt = chunks.prompt();
let new_rendered = new_prompt.render_document(
document.external_document_id(),
document.current(
&context.rtxn,
context.index,
context.db_fields_ids_map,
)?,
context.new_fields_ids_map,
&context.doc_alloc,
)?;
// Compare the rendered documents
// if they are different, regenerate the vectors
if new_rendered != old_rendered {
chunks.set_autogenerated(
document.docid(),
document.external_document_id(),
new_rendered,
&unused_vectors_distribution,
)?;
}
}
ReindexAction::FullReindex => {
let prompt = chunks.prompt();
// if no inserted vectors, then regenerate: true + no embeddings => autogenerate
if let Some(embeddings) = current_vectors
.and_then(|vectors| vectors.embeddings)
// insert the embeddings only for new embedders
.filter(|_| is_new_embedder)
{
chunks.set_regenerate(document.docid(), must_regenerate);
chunks.set_vectors(
document.external_document_id(),
document.docid(),
embeddings.into_vec(&context.doc_alloc, embedder_name).map_err(
|error| UserError::InvalidVectorsEmbedderConf {
document_id: document.external_document_id().to_string(),
error: error.to_string(),
},
)?,
)?;
} else if must_regenerate {
let rendered = prompt.render_document(
document.external_document_id(),
document.current(
&context.rtxn,
context.index,
context.db_fields_ids_map,
)?,
context.new_fields_ids_map,
&context.doc_alloc,
)?;
chunks.set_autogenerated(
document.docid(),
document.external_document_id(),
rendered,
&unused_vectors_distribution,
)?;
}
}
}
}
}
for (chunk, _) in all_chunks {
chunk.drain(&unused_vectors_distribution)?;
}
Ok(())
}
}
// **Warning**: the destructor of this struct is not normally run, make sure that all its fields: // **Warning**: the destructor of this struct is not normally run, make sure that all its fields:
// 1. don't have side effects tied to they destructors // 1. don't have side effects tied to they destructors
// 2. if allocated, are allocated inside of the bumpalo // 2. if allocated, are allocated inside of the bumpalo

View file

@ -3,100 +3,18 @@ use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use bumpalo::Bump; use bumpalo::Bump;
use heed::{RoTxn, WithoutTls};
use rayon::iter::IndexedParallelIterator; use rayon::iter::IndexedParallelIterator;
use super::super::document_change::DocumentChange; use super::super::document_change::DocumentChange;
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
use crate::progress::{AtomicDocumentStep, Progress}; use crate::progress::{AtomicDocumentStep, Progress};
use crate::update::new::document::DocumentContext;
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
use crate::update::new::steps::IndexingStep; use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
use crate::update::GrenadParameters; use crate::update::GrenadParameters;
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result}; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result};
pub struct DocumentChangeContext<
'doc, // covariant lifetime of a single `process` call
'extractor: 'doc, // invariant lifetime of the extractor_allocs
'fid: 'doc, // invariant lifetime of the new_fields_ids_map
'indexer: 'doc, // covariant lifetime of objects that outlive a single `process` call
T: MostlySend,
> {
/// The index we're indexing in
pub index: &'indexer Index,
/// The fields ids map as it was at the start of this indexing process. Contains at least all top-level fields from documents
/// inside of the DB.
pub db_fields_ids_map: &'indexer FieldsIdsMap,
/// A transaction providing data from the DB before all indexing operations
pub rtxn: RoTxn<'indexer, WithoutTls>,
/// Global field id map that is up to date with the current state of the indexing process.
///
/// - Inserting a field will take a lock
/// - Retrieving a field may take a lock as well
pub new_fields_ids_map: &'doc std::cell::RefCell<GlobalFieldsIdsMap<'fid>>,
/// Data allocated in this allocator is cleared between each call to `process`.
pub doc_alloc: Bump,
/// Data allocated in this allocator is not cleared between each call to `process`, unless the data spills.
pub extractor_alloc: &'extractor Bump,
/// Pool of doc allocators, used to retrieve the doc allocator we provided for the documents
doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
/// Extractor-specific data
pub data: &'doc T,
}
impl<
'doc, // covariant lifetime of a single `process` call
'data: 'doc, // invariant on T lifetime of the datastore
'extractor: 'doc, // invariant lifetime of extractor_allocs
'fid: 'doc, // invariant lifetime of fields ids map
'indexer: 'doc, // covariant lifetime of objects that survive a `process` call
T: MostlySend,
> DocumentChangeContext<'doc, 'extractor, 'fid, 'indexer, T>
{
#[allow(clippy::too_many_arguments)]
pub fn new<F>(
index: &'indexer Index,
db_fields_ids_map: &'indexer FieldsIdsMap,
new_fields_ids_map: &'fid RwLock<FieldIdMapWithMetadata>,
extractor_allocs: &'extractor ThreadLocal<FullySend<Bump>>,
doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
datastore: &'data ThreadLocal<T>,
fields_ids_map_store: &'doc ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>,
init_data: F,
) -> Result<Self>
where
F: FnOnce(&'extractor Bump) -> Result<T>,
{
let doc_alloc =
doc_allocs.get_or(|| FullySend(Cell::new(Bump::with_capacity(1024 * 1024))));
let doc_alloc = doc_alloc.0.take();
let fields_ids_map = fields_ids_map_store
.get_or(|| RefCell::new(GlobalFieldsIdsMap::new(new_fields_ids_map)).into());
let fields_ids_map = &fields_ids_map.0;
let extractor_alloc = extractor_allocs.get_or_default();
let data = datastore.get_or_try(move || init_data(&extractor_alloc.0))?;
let txn = index.read_txn()?;
Ok(DocumentChangeContext {
index,
rtxn: txn,
db_fields_ids_map,
new_fields_ids_map: fields_ids_map,
doc_alloc,
extractor_alloc: &extractor_alloc.0,
data,
doc_allocs,
})
}
}
/// An internal iterator (i.e. using `foreach`) of `DocumentChange`s /// An internal iterator (i.e. using `foreach`) of `DocumentChange`s
pub trait Extractor<'extractor>: Sync { pub trait Extractor<'extractor>: Sync {
type Data: MostlySend; type Data: MostlySend;
@ -106,7 +24,7 @@ pub trait Extractor<'extractor>: Sync {
fn process<'doc>( fn process<'doc>(
&'doc self, &'doc self,
changes: impl Iterator<Item = Result<DocumentChange<'doc>>>, changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
context: &'doc DocumentChangeContext<Self::Data>, context: &'doc DocumentContext<Self::Data>,
) -> Result<()>; ) -> Result<()>;
} }
@ -125,7 +43,7 @@ pub trait DocumentChanges<'pl // lifetime of the underlying payload
fn item_to_document_change<'doc, // lifetime of a single `process` call fn item_to_document_change<'doc, // lifetime of a single `process` call
T: MostlySend>( T: MostlySend>(
&'doc self, &'doc self,
context: &'doc DocumentChangeContext<T>, context: &'doc DocumentContext<T>,
item: &'doc Self::Item, item: &'doc Self::Item,
) -> Result<Option<DocumentChange<'doc>>> where 'pl: 'doc // the payload must survive the process calls ) -> Result<Option<DocumentChange<'doc>>> where 'pl: 'doc // the payload must survive the process calls
; ;
@ -224,7 +142,7 @@ where
let pi = document_changes.iter(CHUNK_SIZE); let pi = document_changes.iter(CHUNK_SIZE);
pi.try_arc_for_each_try_init( pi.try_arc_for_each_try_init(
|| { || {
DocumentChangeContext::new( DocumentContext::new(
index, index,
db_fields_ids_map, db_fields_ids_map,
new_fields_ids_map, new_fields_ids_map,

View file

@ -4,10 +4,11 @@ use rayon::iter::IndexedParallelIterator;
use rayon::slice::ParallelSlice as _; use rayon::slice::ParallelSlice as _;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use super::document_changes::{DocumentChangeContext, DocumentChanges}; use super::document_changes::DocumentChanges;
use crate::documents::PrimaryKey; use crate::documents::PrimaryKey;
use crate::update::new::document::DocumentContext;
use crate::update::new::thread_local::MostlySend; use crate::update::new::thread_local::MostlySend;
use crate::update::new::{Deletion, DocumentChange}; use crate::update::new::{DocumentChange, DocumentIdentifiers};
use crate::{DocumentId, Result}; use crate::{DocumentId, Result};
#[derive(Default)] #[derive(Default)]
@ -58,7 +59,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
T: MostlySend, T: MostlySend,
>( >(
&'doc self, &'doc self,
context: &'doc DocumentChangeContext<T>, context: &'doc DocumentContext<T>,
docid: &'doc Self::Item, docid: &'doc Self::Item,
) -> Result<Option<DocumentChange<'doc>>> ) -> Result<Option<DocumentChange<'doc>>>
where where
@ -74,7 +75,10 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
let external_document_id = external_document_id.to_bump(&context.doc_alloc); let external_document_id = external_document_id.to_bump(&context.doc_alloc);
Ok(Some(DocumentChange::Deletion(Deletion::create(*docid, external_document_id)))) Ok(Some(DocumentChange::Deletion(DocumentIdentifiers::create(
*docid,
external_document_id,
))))
} }
fn len(&self) -> usize { fn len(&self) -> usize {
@ -93,9 +97,8 @@ mod test {
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
use crate::index::tests::TempIndex; use crate::index::tests::TempIndex;
use crate::progress::Progress; use crate::progress::Progress;
use crate::update::new::indexer::document_changes::{ use crate::update::new::document::DocumentContext;
extract, DocumentChangeContext, Extractor, IndexingContext, use crate::update::new::indexer::document_changes::{extract, Extractor, IndexingContext};
};
use crate::update::new::indexer::DocumentDeletion; use crate::update::new::indexer::DocumentDeletion;
use crate::update::new::steps::IndexingStep; use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::{MostlySend, ThreadLocal}; use crate::update::new::thread_local::{MostlySend, ThreadLocal};
@ -125,7 +128,7 @@ mod test {
fn process<'doc>( fn process<'doc>(
&self, &self,
changes: impl Iterator<Item = crate::Result<DocumentChange<'doc>>>, changes: impl Iterator<Item = crate::Result<DocumentChange<'doc>>>,
context: &DocumentChangeContext<Self::Data>, context: &DocumentContext<Self::Data>,
) -> crate::Result<()> { ) -> crate::Result<()> {
for change in changes { for change in changes {
let change = change?; let change = change?;

View file

@ -12,14 +12,14 @@ use serde_json::value::RawValue;
use serde_json::Deserializer; use serde_json::Deserializer;
use super::super::document_change::DocumentChange; use super::super::document_change::DocumentChange;
use super::document_changes::{DocumentChangeContext, DocumentChanges}; use super::document_changes::DocumentChanges;
use super::guess_primary_key::retrieve_or_guess_primary_key; use super::guess_primary_key::retrieve_or_guess_primary_key;
use crate::documents::PrimaryKey; use crate::documents::PrimaryKey;
use crate::progress::{AtomicPayloadStep, Progress}; use crate::progress::{AtomicPayloadStep, Progress};
use crate::update::new::document::Versions; use crate::update::new::document::{DocumentContext, Versions};
use crate::update::new::steps::IndexingStep; use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::MostlySend; use crate::update::new::thread_local::MostlySend;
use crate::update::new::{Deletion, Insertion, Update}; use crate::update::new::{DocumentIdentifiers, Insertion, Update};
use crate::update::{AvailableIds, IndexDocumentsMethod}; use crate::update::{AvailableIds, IndexDocumentsMethod};
use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError}; use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError};
@ -411,7 +411,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentOperationChanges<'pl> {
fn item_to_document_change<'doc, T: MostlySend + 'doc>( fn item_to_document_change<'doc, T: MostlySend + 'doc>(
&'doc self, &'doc self,
context: &'doc DocumentChangeContext<T>, context: &'doc DocumentContext<T>,
item: &'doc Self::Item, item: &'doc Self::Item,
) -> Result<Option<DocumentChange<'doc>>> ) -> Result<Option<DocumentChange<'doc>>>
where where
@ -577,7 +577,7 @@ impl<'pl> PayloadOperations<'pl> {
if self.is_new { if self.is_new {
Ok(None) Ok(None)
} else { } else {
let deletion = Deletion::create(self.docid, external_doc); let deletion = DocumentIdentifiers::create(self.docid, external_doc);
Ok(Some(DocumentChange::Deletion(deletion))) Ok(Some(DocumentChange::Deletion(deletion)))
} }
} }

View file

@ -12,14 +12,21 @@ use super::super::steps::IndexingStep;
use super::super::thread_local::{FullySend, ThreadLocal}; use super::super::thread_local::{FullySend, ThreadLocal};
use super::super::FacetFieldIdsDelta; use super::super::FacetFieldIdsDelta;
use super::document_changes::{extract, DocumentChanges, IndexingContext}; use super::document_changes::{extract, DocumentChanges, IndexingContext};
use super::settings_changes::settings_change_extract;
use crate::documents::FieldIdMapper;
use crate::documents::PrimaryKey;
use crate::index::IndexEmbeddingConfig; use crate::index::IndexEmbeddingConfig;
use crate::progress::EmbedderStats; use crate::progress::EmbedderStats;
use crate::progress::MergingWordCache; use crate::progress::MergingWordCache;
use crate::proximity::ProximityPrecision; use crate::proximity::ProximityPrecision;
use crate::update::new::extract::EmbeddingExtractor; use crate::update::new::extract::EmbeddingExtractor;
use crate::update::new::indexer::settings_changes::DocumentsIndentifiers;
use crate::update::new::merger::merge_and_send_rtree; use crate::update::new::merger::merge_and_send_rtree;
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
use crate::update::settings::SettingsDelta;
use crate::vector::EmbeddingConfigs; use crate::vector::EmbeddingConfigs;
use crate::Index;
use crate::InternalError;
use crate::{Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; use crate::{Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
@ -315,6 +322,115 @@ where
Result::Ok((facet_field_ids_delta, index_embeddings)) Result::Ok((facet_field_ids_delta, index_embeddings))
} }
#[allow(clippy::too_many_arguments)]
pub(super) fn extract_all_settings_changes<MSP, SD>(
indexing_context: IndexingContext<MSP>,
indexer_span: Span,
extractor_sender: ExtractorBbqueueSender,
settings_delta: &SD,
extractor_allocs: &mut ThreadLocal<FullySend<Bump>>,
finished_extraction: &AtomicBool,
field_distribution: &mut BTreeMap<String, u64>,
mut index_embeddings: Vec<IndexEmbeddingConfig>,
modified_docids: &mut RoaringBitmap,
embedder_stats: &EmbedderStats,
) -> Result<Vec<IndexEmbeddingConfig>>
where
MSP: Fn() -> bool + Sync,
SD: SettingsDelta,
{
// Create the list of document ids to extract
let rtxn = indexing_context.index.read_txn()?;
let all_document_ids =
indexing_context.index.documents_ids(&rtxn)?.into_iter().collect::<Vec<_>>();
let primary_key =
primary_key_from_db(indexing_context.index, &rtxn, &indexing_context.db_fields_ids_map)?;
let documents = DocumentsIndentifiers::new(&all_document_ids, primary_key);
let span =
tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract");
let _entered = span.enter();
update_database_documents(
&documents,
indexing_context,
&extractor_sender,
settings_delta,
extractor_allocs,
)?;
'vectors: {
if settings_delta.embedder_actions().is_empty() {
break 'vectors;
}
let embedding_sender = extractor_sender.embeddings();
// extract the remaining embeddings
let extractor = SettingsChangeEmbeddingExtractor::new(
settings_delta.new_embedders(),
settings_delta.old_embedders(),
settings_delta.embedder_actions(),
settings_delta.new_embedder_category_id(),
embedder_stats,
embedding_sender,
field_distribution,
request_threads(),
);
let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
{
let span = tracing::debug_span!(target: "indexing::documents::extract", "vectors");
let _entered = span.enter();
settings_change_extract(
&documents,
&extractor,
indexing_context,
extractor_allocs,
&datastore,
IndexingStep::ExtractingEmbeddings,
)?;
}
{
let span = tracing::debug_span!(target: "indexing::documents::merge", "vectors");
let _entered = span.enter();
for config in &mut index_embeddings {
'data: for data in datastore.iter_mut() {
let data = &mut data.get_mut().0;
let Some(deladd) = data.remove(&config.name) else {
continue 'data;
};
deladd.apply_to(&mut config.user_provided, modified_docids);
}
}
}
}
indexing_context.progress.update_progress(IndexingStep::WaitingForDatabaseWrites);
finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed);
Result::Ok(index_embeddings)
}
fn primary_key_from_db<'indexer>(
index: &'indexer Index,
rtxn: &'indexer heed::RoTxn<'_>,
fields: &'indexer impl FieldIdMapper,
) -> Result<PrimaryKey<'indexer>> {
let Some(primary_key) = index.primary_key(rtxn)? else {
return Err(InternalError::DatabaseMissingEntry {
db_name: crate::index::db_name::MAIN,
key: Some(crate::index::main_key::PRIMARY_KEY_KEY),
}
.into());
};
let Some(primary_key) = PrimaryKey::new(primary_key, fields) else {
unreachable!("Primary key must exist at this point");
};
Ok(primary_key)
}
fn request_threads() -> &'static ThreadPoolNoAbort { fn request_threads() -> &'static ThreadPoolNoAbort {
static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new(); static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new();

View file

@ -1,5 +1,6 @@
use std::collections::BTreeMap;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::{Once, RwLock}; use std::sync::{Arc, Once, RwLock};
use std::thread::{self, Builder}; use std::thread::{self, Builder};
use big_s::S; use big_s::S;
@ -20,8 +21,10 @@ use super::thread_local::ThreadLocal;
use crate::documents::PrimaryKey; use crate::documents::PrimaryKey;
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
use crate::progress::{EmbedderStats, Progress}; use crate::progress::{EmbedderStats, Progress};
use crate::update::settings::SettingsDelta;
use crate::update::GrenadParameters; use crate::update::GrenadParameters;
use crate::vector::{ArroyWrapper, EmbeddingConfigs}; use crate::vector::settings::{EmbedderAction, WriteBackToDocuments};
use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort}; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort};
pub(crate) mod de; pub(crate) mod de;
@ -32,6 +35,7 @@ mod extract;
mod guess_primary_key; mod guess_primary_key;
mod partial_dump; mod partial_dump;
mod post_processing; mod post_processing;
pub mod settings_changes;
mod update_by_function; mod update_by_function;
mod write; mod write;
@ -40,8 +44,6 @@ static LOG_MEMORY_METRICS_ONCE: Once = Once::new();
/// This is the main function of this crate. /// This is the main function of this crate.
/// ///
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`]. /// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
///
/// TODO return stats
#[allow(clippy::too_many_arguments)] // clippy: 😝 #[allow(clippy::too_many_arguments)] // clippy: 😝
pub fn index<'pl, 'indexer, 'index, DC, MSP>( pub fn index<'pl, 'indexer, 'index, DC, MSP>(
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
@ -66,48 +68,8 @@ where
let arroy_memory = grenad_parameters.max_memory; let arroy_memory = grenad_parameters.max_memory;
// We reduce the actual memory used to 5%. The reason we do this here and not in Meilisearch let (grenad_parameters, total_bbbuffer_capacity) =
// is because we still use the old indexer for the settings and it is highly impacted by the indexer_memory_settings(pool.current_num_threads(), grenad_parameters);
// max memory. So we keep the changes here and will remove these changes once we use the new
// indexer to also index settings. Related to #5125 and #5141.
let grenad_parameters = GrenadParameters {
max_memory: grenad_parameters.max_memory.map(|mm| mm * 5 / 100),
..grenad_parameters
};
// 5% percent of the allocated memory for the extractors, or min 100MiB
// 5% percent of the allocated memory for the bbqueues, or min 50MiB
//
// Minimum capacity for bbqueues
let minimum_total_bbbuffer_capacity = 50 * 1024 * 1024 * pool.current_num_threads(); // 50 MiB
let minimum_total_extractors_capacity = minimum_total_bbbuffer_capacity * 2;
let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or(
(
GrenadParameters {
max_memory: Some(minimum_total_extractors_capacity),
..grenad_parameters
},
minimum_total_bbbuffer_capacity,
), // 100 MiB by thread by default
|max_memory| {
let total_bbbuffer_capacity = max_memory.max(minimum_total_bbbuffer_capacity);
let new_grenad_parameters = GrenadParameters {
max_memory: Some(max_memory.max(minimum_total_extractors_capacity)),
..grenad_parameters
};
(new_grenad_parameters, total_bbbuffer_capacity)
},
);
LOG_MEMORY_METRICS_ONCE.call_once(|| {
tracing::debug!(
"Indexation allocated memory metrics - \
Total BBQueue size: {total_bbbuffer_capacity}, \
Total extractor memory: {:?}",
grenad_parameters.max_memory,
);
});
let (extractor_sender, writer_receiver) = pool let (extractor_sender, writer_receiver) = pool
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000)) .install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
@ -208,6 +170,7 @@ where
index_embeddings, index_embeddings,
arroy_memory, arroy_memory,
&mut arroy_writers, &mut arroy_writers,
None,
&indexing_context.must_stop_processing, &indexing_context.must_stop_processing,
) )
}) })
@ -241,3 +204,238 @@ where
Ok(congestion) Ok(congestion)
} }
#[allow(clippy::too_many_arguments)]
pub fn reindex<'indexer, 'index, MSP, SD>(
wtxn: &mut RwTxn<'index>,
index: &'index Index,
pool: &ThreadPoolNoAbort,
grenad_parameters: GrenadParameters,
settings_delta: &'indexer SD,
must_stop_processing: &'indexer MSP,
progress: &'indexer Progress,
embedder_stats: Arc<EmbedderStats>,
) -> Result<ChannelCongestion>
where
MSP: Fn() -> bool + Sync,
SD: SettingsDelta + Sync,
{
delete_old_embedders(wtxn, index, settings_delta)?;
let mut bbbuffers = Vec::new();
let finished_extraction = AtomicBool::new(false);
let arroy_memory = grenad_parameters.max_memory;
let (grenad_parameters, total_bbbuffer_capacity) =
indexer_memory_settings(pool.current_num_threads(), grenad_parameters);
let (extractor_sender, writer_receiver) = pool
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
.unwrap();
let mut extractor_allocs = ThreadLocal::with_capacity(rayon::current_num_threads());
let db_fields_ids_map = index.fields_ids_map(wtxn)?;
let new_fields_ids_map = settings_delta.new_fields_ids_map().clone();
let new_fields_ids_map = RwLock::new(new_fields_ids_map);
let fields_ids_map_store = ThreadLocal::with_capacity(rayon::current_num_threads());
let doc_allocs = ThreadLocal::with_capacity(rayon::current_num_threads());
let indexing_context = IndexingContext {
index,
db_fields_ids_map: &db_fields_ids_map,
new_fields_ids_map: &new_fields_ids_map,
doc_allocs: &doc_allocs,
fields_ids_map_store: &fields_ids_map_store,
must_stop_processing,
progress,
grenad_parameters: &grenad_parameters,
};
let index_embeddings = index.embedding_configs(wtxn)?;
let mut field_distribution = index.field_distribution(wtxn)?;
let mut modified_docids = roaring::RoaringBitmap::new();
let congestion = thread::scope(|s| -> Result<ChannelCongestion> {
let indexer_span = tracing::Span::current();
let finished_extraction = &finished_extraction;
// prevent moving the field_distribution and document_ids in the inner closure...
let field_distribution = &mut field_distribution;
let modified_docids = &mut modified_docids;
let extractor_handle =
Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
pool.install(move || {
extract::extract_all_settings_changes(
indexing_context,
indexer_span,
extractor_sender,
settings_delta,
&mut extractor_allocs,
finished_extraction,
field_distribution,
index_embeddings,
modified_docids,
&embedder_stats,
)
})
.unwrap()
})?;
let new_embedders = settings_delta.new_embedders();
let embedder_actions = settings_delta.embedder_actions();
let index_embedder_category_ids = settings_delta.new_embedder_category_id();
let mut arroy_writers = arroy_writers_from_embedder_actions(
index,
embedder_actions,
new_embedders,
index_embedder_category_ids,
)?;
let congestion =
write_to_db(writer_receiver, finished_extraction, index, wtxn, &arroy_writers)?;
indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors);
let index_embeddings = extractor_handle.join().unwrap()?;
indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase);
pool.install(|| {
build_vectors(
index,
wtxn,
indexing_context.progress,
index_embeddings,
arroy_memory,
&mut arroy_writers,
Some(embedder_actions),
&indexing_context.must_stop_processing,
)
})
.unwrap()?;
indexing_context.progress.update_progress(IndexingStep::Finalizing);
Ok(congestion) as Result<_>
})?;
// required to into_inner the new_fields_ids_map
drop(fields_ids_map_store);
let new_fields_ids_map = new_fields_ids_map.into_inner().unwrap();
let document_ids = index.documents_ids(wtxn)?;
update_index(
index,
wtxn,
new_fields_ids_map,
None,
settings_delta.new_embedders().clone(),
field_distribution,
document_ids,
)?;
Ok(congestion)
}
fn arroy_writers_from_embedder_actions<'indexer>(
index: &Index,
embedder_actions: &'indexer BTreeMap<String, EmbedderAction>,
embedders: &'indexer EmbeddingConfigs,
index_embedder_category_ids: &'indexer std::collections::HashMap<String, u8>,
) -> Result<HashMap<u8, (&'indexer str, &'indexer Embedder, ArroyWrapper, usize)>> {
let vector_arroy = index.vector_arroy;
embedders
.inner_as_ref()
.iter()
.filter_map(|(embedder_name, (embedder, _, _))| match embedder_actions.get(embedder_name) {
None => None,
Some(action) if action.write_back().is_some() => None,
Some(action) => {
let Some(&embedder_category_id) = index_embedder_category_ids.get(embedder_name)
else {
return Some(Err(crate::error::Error::InternalError(
crate::InternalError::DatabaseMissingEntry {
db_name: crate::index::db_name::VECTOR_EMBEDDER_CATEGORY_ID,
key: None,
},
)));
};
let writer =
ArroyWrapper::new(vector_arroy, embedder_category_id, action.was_quantized);
let dimensions = embedder.dimensions();
Some(Ok((
embedder_category_id,
(embedder_name.as_str(), embedder.as_ref(), writer, dimensions),
)))
}
})
.collect()
}
fn delete_old_embedders<SD>(wtxn: &mut RwTxn<'_>, index: &Index, settings_delta: &SD) -> Result<()>
where
SD: SettingsDelta,
{
for action in settings_delta.embedder_actions().values() {
if let Some(WriteBackToDocuments { embedder_id, .. }) = action.write_back() {
let reader = ArroyWrapper::new(index.vector_arroy, *embedder_id, action.was_quantized);
let dimensions = reader.dimensions(wtxn)?;
reader.clear(wtxn, dimensions)?;
}
}
Ok(())
}
fn indexer_memory_settings(
current_num_threads: usize,
grenad_parameters: GrenadParameters,
) -> (GrenadParameters, usize) {
// We reduce the actual memory used to 5%. The reason we do this here and not in Meilisearch
// is because we still use the old indexer for the settings and it is highly impacted by the
// max memory. So we keep the changes here and will remove these changes once we use the new
// indexer to also index settings. Related to #5125 and #5141.
let grenad_parameters = GrenadParameters {
max_memory: grenad_parameters.max_memory.map(|mm| mm * 5 / 100),
..grenad_parameters
};
// 5% percent of the allocated memory for the extractors, or min 100MiB
// 5% percent of the allocated memory for the bbqueues, or min 50MiB
//
// Minimum capacity for bbqueues
let minimum_total_bbbuffer_capacity = 50 * 1024 * 1024 * current_num_threads;
// 50 MiB
let minimum_total_extractors_capacity = minimum_total_bbbuffer_capacity * 2;
let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or(
(
GrenadParameters {
max_memory: Some(minimum_total_extractors_capacity),
..grenad_parameters
},
minimum_total_bbbuffer_capacity,
), // 100 MiB by thread by default
|max_memory| {
let total_bbbuffer_capacity = max_memory.max(minimum_total_bbbuffer_capacity);
let new_grenad_parameters = GrenadParameters {
max_memory: Some(max_memory.max(minimum_total_extractors_capacity)),
..grenad_parameters
};
(new_grenad_parameters, total_bbbuffer_capacity)
},
);
LOG_MEMORY_METRICS_ONCE.call_once(|| {
tracing::debug!(
"Indexation allocated memory metrics - \
Total BBQueue size: {total_bbbuffer_capacity}, \
Total extractor memory: {:?}",
grenad_parameters.max_memory,
);
});
(grenad_parameters, total_bbbuffer_capacity)
}

View file

@ -5,10 +5,10 @@ use rayon::iter::IndexedParallelIterator;
use rustc_hash::FxBuildHasher; use rustc_hash::FxBuildHasher;
use serde_json::value::RawValue; use serde_json::value::RawValue;
use super::document_changes::{DocumentChangeContext, DocumentChanges}; use super::document_changes::DocumentChanges;
use crate::documents::PrimaryKey; use crate::documents::PrimaryKey;
use crate::update::concurrent_available_ids::ConcurrentAvailableIds; use crate::update::concurrent_available_ids::ConcurrentAvailableIds;
use crate::update::new::document::Versions; use crate::update::new::document::{DocumentContext, Versions};
use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::thread_local::MostlySend; use crate::update::new::thread_local::MostlySend;
use crate::update::new::{DocumentChange, Insertion}; use crate::update::new::{DocumentChange, Insertion};
@ -55,7 +55,7 @@ where
fn item_to_document_change<'doc, T: MostlySend + 'doc>( fn item_to_document_change<'doc, T: MostlySend + 'doc>(
&'doc self, &'doc self,
context: &'doc DocumentChangeContext<T>, context: &'doc DocumentContext<T>,
document: &'doc Self::Item, document: &'doc Self::Item,
) -> Result<Option<DocumentChange<'doc>>> ) -> Result<Option<DocumentChange<'doc>>>
where where

View file

@ -0,0 +1,146 @@
use std::sync::atomic::Ordering;
use std::sync::Arc;
use bumpalo::Bump;
use rayon::iter::IndexedParallelIterator;
use rayon::slice::ParallelSlice;
use super::document_changes::IndexingContext;
use crate::documents::PrimaryKey;
use crate::progress::AtomicDocumentStep;
use crate::update::new::document::{DocumentContext, DocumentIdentifiers};
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
use crate::{DocumentId, InternalError, Result};
/// An internal iterator (i.e. using `foreach`) of `DocumentChange`s
pub trait SettingsChangeExtractor<'extractor>: Sync {
type Data: MostlySend;
fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result<Self::Data>;
fn process<'doc>(
&'doc self,
documents: impl Iterator<Item = Result<DocumentIdentifiers<'doc>>>,
context: &'doc DocumentContext<Self::Data>,
) -> Result<()>;
}
pub struct DocumentsIndentifiers<'indexer> {
documents: &'indexer [DocumentId],
primary_key: PrimaryKey<'indexer>,
}
impl<'indexer> DocumentsIndentifiers<'indexer> {
pub fn new(documents: &'indexer [DocumentId], primary_key: PrimaryKey<'indexer>) -> Self {
Self { documents, primary_key }
}
fn iter(&self, chunk_size: usize) -> impl IndexedParallelIterator<Item = &[DocumentId]> {
self.documents.par_chunks(chunk_size)
}
fn item_to_database_document<
'doc, // lifetime of a single `process` call
T: MostlySend,
>(
&'doc self,
context: &'doc DocumentContext<T>,
docid: &'doc DocumentId,
) -> Result<Option<DocumentIdentifiers<'doc>>> {
let current = context.index.document(&context.rtxn, *docid)?;
let external_document_id = self.primary_key.extract_docid_from_db(
current,
&context.db_fields_ids_map,
&context.doc_alloc,
)?;
let external_document_id = external_document_id.to_bump(&context.doc_alloc);
Ok(Some(DocumentIdentifiers::create(*docid, external_document_id)))
}
fn len(&self) -> usize {
self.documents.len()
}
}
const CHUNK_SIZE: usize = 100;
pub fn settings_change_extract<
'extractor, // invariant lifetime of extractor_alloc
'fid, // invariant lifetime of fields ids map
'indexer, // covariant lifetime of objects that are borrowed during the entire indexing
'data, // invariant on EX::Data lifetime of datastore
'index, // covariant lifetime of the index
EX: SettingsChangeExtractor<'extractor>,
MSP: Fn() -> bool + Sync,
>(
documents: &'indexer DocumentsIndentifiers<'indexer>,
extractor: &EX,
IndexingContext {
index,
db_fields_ids_map,
new_fields_ids_map,
doc_allocs,
fields_ids_map_store,
must_stop_processing,
progress,
grenad_parameters: _,
}: IndexingContext<'fid, 'indexer, 'index, MSP>,
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
datastore: &'data ThreadLocal<EX::Data>,
step: IndexingStep,
) -> Result<()> {
tracing::trace!("We are resetting the extractor allocators");
progress.update_progress(step);
// Clean up and reuse the extractor allocs
for extractor_alloc in extractor_allocs.iter_mut() {
tracing::trace!("\tWith {} bytes reset", extractor_alloc.0.allocated_bytes());
extractor_alloc.0.reset();
}
let total_documents = documents.len() as u32;
let (step, progress_step) = AtomicDocumentStep::new(total_documents);
progress.update_progress(progress_step);
let pi = documents.iter(CHUNK_SIZE);
pi.try_arc_for_each_try_init(
|| {
DocumentContext::new(
index,
db_fields_ids_map,
new_fields_ids_map,
extractor_allocs,
doc_allocs,
datastore,
fields_ids_map_store,
move |index_alloc| extractor.init_data(index_alloc),
)
},
|context, items| {
if (must_stop_processing)() {
return Err(Arc::new(InternalError::AbortedIndexation.into()));
}
// Clean up and reuse the document-specific allocator
context.doc_alloc.reset();
let documents = items
.iter()
.filter_map(|item| documents.item_to_database_document(context, item).transpose());
let res = extractor.process(documents, context).map_err(Arc::new);
step.fetch_add(items.as_ref().len() as u32, Ordering::Relaxed);
// send back the doc_alloc in the pool
context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc));
res
},
)?;
step.store(total_documents, Ordering::Relaxed);
Ok(())
}

View file

@ -5,15 +5,14 @@ use rhai::{Dynamic, Engine, OptimizationLevel, Scope, AST};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use rustc_hash::FxBuildHasher; use rustc_hash::FxBuildHasher;
use super::document_changes::DocumentChangeContext;
use super::DocumentChanges; use super::DocumentChanges;
use crate::documents::Error::InvalidDocumentFormat; use crate::documents::Error::InvalidDocumentFormat;
use crate::documents::PrimaryKey; use crate::documents::PrimaryKey;
use crate::error::{FieldIdMapMissingEntry, InternalError}; use crate::error::{FieldIdMapMissingEntry, InternalError};
use crate::update::new::document::Versions; use crate::update::new::document::{DocumentContext, Versions};
use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::thread_local::MostlySend; use crate::update::new::thread_local::MostlySend;
use crate::update::new::{Deletion, DocumentChange, KvReaderFieldId, Update}; use crate::update::new::{DocumentChange, DocumentIdentifiers, KvReaderFieldId, Update};
use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError}; use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError};
pub struct UpdateByFunction { pub struct UpdateByFunction {
@ -86,13 +85,13 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
fn item_to_document_change<'doc, T: MostlySend + 'doc>( fn item_to_document_change<'doc, T: MostlySend + 'doc>(
&self, &self,
context: &'doc DocumentChangeContext<T>, context: &'doc DocumentContext<T>,
docid: &'doc Self::Item, docid: &'doc Self::Item,
) -> Result<Option<DocumentChange<'doc>>> ) -> Result<Option<DocumentChange<'doc>>>
where where
'index: 'doc, 'index: 'doc,
{ {
let DocumentChangeContext { let DocumentContext {
index, index,
db_fields_ids_map, db_fields_ids_map,
rtxn: txn, rtxn: txn,
@ -128,10 +127,9 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
match scope.remove::<Dynamic>("doc") { match scope.remove::<Dynamic>("doc") {
// If the "doc" variable has been set to (), we effectively delete the document. // If the "doc" variable has been set to (), we effectively delete the document.
Some(doc) if doc.is_unit() => Ok(Some(DocumentChange::Deletion(Deletion::create( Some(doc) if doc.is_unit() => Ok(Some(DocumentChange::Deletion(
docid, DocumentIdentifiers::create(docid, doc_alloc.alloc_str(&document_id)),
doc_alloc.alloc_str(&document_id), ))),
)))),
None => unreachable!("missing doc variable from the Rhai scope"), None => unreachable!("missing doc variable from the Rhai scope"),
Some(new_document) => match new_document.try_cast() { Some(new_document) => match new_document.try_cast() {
Some(new_rhai_document) => { Some(new_rhai_document) => {

View file

@ -1,3 +1,4 @@
use std::collections::BTreeMap;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use bstr::ByteSlice as _; use bstr::ByteSlice as _;
@ -13,6 +14,7 @@ use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
use crate::index::IndexEmbeddingConfig; use crate::index::IndexEmbeddingConfig;
use crate::progress::Progress; use crate::progress::Progress;
use crate::update::settings::InnerIndexSettings; use crate::update::settings::InnerIndexSettings;
use crate::vector::settings::EmbedderAction;
use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs, Embeddings}; use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs, Embeddings};
use crate::{Error, Index, InternalError, Result, UserError}; use crate::{Error, Index, InternalError, Result, UserError};
@ -99,6 +101,7 @@ impl ChannelCongestion {
} }
#[tracing::instrument(level = "debug", skip_all, target = "indexing::vectors")] #[tracing::instrument(level = "debug", skip_all, target = "indexing::vectors")]
#[allow(clippy::too_many_arguments)]
pub fn build_vectors<MSP>( pub fn build_vectors<MSP>(
index: &Index, index: &Index,
wtxn: &mut RwTxn<'_>, wtxn: &mut RwTxn<'_>,
@ -106,6 +109,7 @@ pub fn build_vectors<MSP>(
index_embeddings: Vec<IndexEmbeddingConfig>, index_embeddings: Vec<IndexEmbeddingConfig>,
arroy_memory: Option<usize>, arroy_memory: Option<usize>,
arroy_writers: &mut HashMap<u8, (&str, &Embedder, ArroyWrapper, usize)>, arroy_writers: &mut HashMap<u8, (&str, &Embedder, ArroyWrapper, usize)>,
embeder_actions: Option<&BTreeMap<String, EmbedderAction>>,
must_stop_processing: &MSP, must_stop_processing: &MSP,
) -> Result<()> ) -> Result<()>
where where
@ -117,14 +121,17 @@ where
let seed = rand::random(); let seed = rand::random();
let mut rng = rand::rngs::StdRng::seed_from_u64(seed); let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
for (_index, (_embedder_name, _embedder, writer, dimensions)) in arroy_writers { for (_index, (embedder_name, _embedder, writer, dimensions)) in arroy_writers {
let dimensions = *dimensions; let dimensions = *dimensions;
let is_being_quantized = embeder_actions
.and_then(|actions| actions.get(*embedder_name).map(|action| action.is_being_quantized))
.unwrap_or(false);
writer.build_and_quantize( writer.build_and_quantize(
wtxn, wtxn,
progress, progress,
&mut rng, &mut rng,
dimensions, dimensions,
false, is_being_quantized,
arroy_memory, arroy_memory,
must_stop_processing, must_stop_processing,
)?; )?;

View file

@ -1,4 +1,5 @@
pub use document_change::{Deletion, DocumentChange, Insertion, Update}; pub use document::DocumentIdentifiers;
pub use document_change::{DocumentChange, Insertion, Update};
pub use indexer::ChannelCongestion; pub use indexer::ChannelCongestion;
pub use merger::{ pub use merger::{
merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases, FacetFieldIdsDelta, merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases, FacetFieldIdsDelta,

View file

@ -28,16 +28,20 @@ use crate::index::{
}; };
use crate::order_by_map::OrderByMap; use crate::order_by_map::OrderByMap;
use crate::progress::EmbedderStats; use crate::progress::EmbedderStats;
use crate::progress::Progress;
use crate::prompt::{default_max_bytes, default_template_text, PromptData}; use crate::prompt::{default_max_bytes, default_template_text, PromptData};
use crate::proximity::ProximityPrecision; use crate::proximity::ProximityPrecision;
use crate::update::index_documents::IndexDocumentsMethod; use crate::update::index_documents::IndexDocumentsMethod;
use crate::update::new::indexer::reindex;
use crate::update::{IndexDocuments, UpdateIndexingStep}; use crate::update::{IndexDocuments, UpdateIndexingStep};
use crate::vector::settings::{ use crate::vector::settings::{
EmbedderAction, EmbedderSource, EmbeddingSettings, NestingContext, ReindexAction, EmbedderAction, EmbedderSource, EmbeddingSettings, NestingContext, ReindexAction,
SubEmbeddingSettings, WriteBackToDocuments, SubEmbeddingSettings, WriteBackToDocuments,
}; };
use crate::vector::{Embedder, EmbeddingConfig, EmbeddingConfigs}; use crate::vector::{Embedder, EmbeddingConfig, EmbeddingConfigs};
use crate::{FieldId, FilterableAttributesRule, Index, LocalizedAttributesRule, Result}; use crate::{
ChannelCongestion, FieldId, FilterableAttributesRule, Index, LocalizedAttributesRule, Result,
};
#[derive(Debug, Clone, PartialEq, Eq, Copy)] #[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub enum Setting<T> { pub enum Setting<T> {
@ -1358,7 +1362,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
} }
} }
pub fn execute<FP, FA>( pub fn legacy_execute<FP, FA>(
mut self, mut self,
progress_callback: FP, progress_callback: FP,
should_abort: FA, should_abort: FA,
@ -1426,6 +1430,108 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
Ok(()) Ok(())
} }
pub fn execute<'indexer, MSP>(
mut self,
must_stop_processing: &'indexer MSP,
progress: &'indexer Progress,
embedder_stats: Arc<EmbedderStats>,
) -> Result<Option<ChannelCongestion>>
where
MSP: Fn() -> bool + Sync,
{
// force the old indexer if the environment says so
if self.indexer_config.experimental_no_edition_2024_for_settings {
return self
.legacy_execute(
|indexing_step| tracing::debug!(update = ?indexing_step),
must_stop_processing,
embedder_stats,
)
.map(|_| None);
}
// only use the new indexer when only the embedder possibly changed
if let Self {
searchable_fields: Setting::NotSet,
displayed_fields: Setting::NotSet,
filterable_fields: Setting::NotSet,
sortable_fields: Setting::NotSet,
criteria: Setting::NotSet,
stop_words: Setting::NotSet,
non_separator_tokens: Setting::NotSet,
separator_tokens: Setting::NotSet,
dictionary: Setting::NotSet,
distinct_field: Setting::NotSet,
synonyms: Setting::NotSet,
primary_key: Setting::NotSet,
authorize_typos: Setting::NotSet,
min_word_len_two_typos: Setting::NotSet,
min_word_len_one_typo: Setting::NotSet,
exact_words: Setting::NotSet,
exact_attributes: Setting::NotSet,
max_values_per_facet: Setting::NotSet,
sort_facet_values_by: Setting::NotSet,
pagination_max_total_hits: Setting::NotSet,
proximity_precision: Setting::NotSet,
embedder_settings: _,
search_cutoff: Setting::NotSet,
localized_attributes_rules: Setting::NotSet,
prefix_search: Setting::NotSet,
facet_search: Setting::NotSet,
disable_on_numbers: Setting::NotSet,
chat: Setting::NotSet,
wtxn: _,
index: _,
indexer_config: _,
} = &self
{
self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?;
let old_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn, None)?;
// Update index settings
let embedding_config_updates = self.update_embedding_configs()?;
let new_inner_settings = InnerIndexSettings::from_index(self.index, self.wtxn, None)?;
let primary_key_id = self
.index
.primary_key(self.wtxn)?
.and_then(|name| new_inner_settings.fields_ids_map.id(name));
let settings_update_only = true;
let inner_settings_diff = InnerIndexSettingsDiff::new(
old_inner_settings,
new_inner_settings,
primary_key_id,
embedding_config_updates,
settings_update_only,
);
if self.index.number_of_documents(self.wtxn)? > 0 {
reindex(
self.wtxn,
self.index,
&self.indexer_config.thread_pool,
self.indexer_config.grenad_parameters(),
&inner_settings_diff,
must_stop_processing,
progress,
embedder_stats,
)
.map(Some)
} else {
Ok(None)
}
} else {
self.legacy_execute(
|indexing_step| tracing::debug!(update = ?indexing_step),
must_stop_processing,
embedder_stats,
)
.map(|_| None)
}
}
} }
pub struct InnerIndexSettingsDiff { pub struct InnerIndexSettingsDiff {
@ -1685,6 +1791,7 @@ pub(crate) struct InnerIndexSettings {
pub disabled_typos_terms: DisabledTyposTerms, pub disabled_typos_terms: DisabledTyposTerms,
pub proximity_precision: ProximityPrecision, pub proximity_precision: ProximityPrecision,
pub embedding_configs: EmbeddingConfigs, pub embedding_configs: EmbeddingConfigs,
pub embedder_category_id: HashMap<String, u8>,
pub geo_fields_ids: Option<(FieldId, FieldId)>, pub geo_fields_ids: Option<(FieldId, FieldId)>,
pub prefix_search: PrefixSearch, pub prefix_search: PrefixSearch,
pub facet_search: bool, pub facet_search: bool,
@ -1707,6 +1814,11 @@ impl InnerIndexSettings {
Some(embedding_configs) => embedding_configs, Some(embedding_configs) => embedding_configs,
None => embedders(index.embedding_configs(rtxn)?)?, None => embedders(index.embedding_configs(rtxn)?)?,
}; };
let embedder_category_id = index
.embedder_category_id
.iter(rtxn)?
.map(|r| r.map(|(k, v)| (k.to_string(), v)))
.collect::<heed::Result<_>>()?;
let prefix_search = index.prefix_search(rtxn)?.unwrap_or_default(); let prefix_search = index.prefix_search(rtxn)?.unwrap_or_default();
let facet_search = index.facet_search(rtxn)?; let facet_search = index.facet_search(rtxn)?;
let geo_fields_ids = match fields_ids_map.id(RESERVED_GEO_FIELD_NAME) { let geo_fields_ids = match fields_ids_map.id(RESERVED_GEO_FIELD_NAME) {
@ -1746,6 +1858,7 @@ impl InnerIndexSettings {
exact_attributes, exact_attributes,
proximity_precision, proximity_precision,
embedding_configs, embedding_configs,
embedder_category_id,
geo_fields_ids, geo_fields_ids,
prefix_search, prefix_search,
facet_search, facet_search,
@ -2115,6 +2228,38 @@ fn deserialize_sub_embedder(
} }
} }
/// Implement this trait for the settings delta type.
/// This is used in the new settings update flow and will allow to easily replace the old settings delta type: `InnerIndexSettingsDiff`.
pub trait SettingsDelta {
fn new_embedders(&self) -> &EmbeddingConfigs;
fn old_embedders(&self) -> &EmbeddingConfigs;
fn new_embedder_category_id(&self) -> &HashMap<String, u8>;
fn embedder_actions(&self) -> &BTreeMap<String, EmbedderAction>;
fn new_fields_ids_map(&self) -> &FieldIdMapWithMetadata;
}
impl SettingsDelta for InnerIndexSettingsDiff {
fn new_embedders(&self) -> &EmbeddingConfigs {
&self.new.embedding_configs
}
fn old_embedders(&self) -> &EmbeddingConfigs {
&self.old.embedding_configs
}
fn new_embedder_category_id(&self) -> &HashMap<String, u8> {
&self.new.embedder_category_id
}
fn embedder_actions(&self) -> &BTreeMap<String, EmbedderAction> {
&self.embedding_config_updates
}
fn new_fields_ids_map(&self) -> &FieldIdMapWithMetadata {
&self.new.fields_ids_map
}
}
#[cfg(test)] #[cfg(test)]
#[path = "test_settings.rs"] #[path = "test_settings.rs"]
mod tests; mod tests;

View file

@ -1,6 +1,7 @@
use std::collections::HashSet; use std::collections::HashSet;
use big_s::S; use big_s::S;
use milli::progress::Progress;
use milli::update::Settings; use milli::update::Settings;
use milli::{Criterion, Search, SearchResult, TermsMatchingStrategy}; use milli::{Criterion, Search, SearchResult, TermsMatchingStrategy};
use Criterion::*; use Criterion::*;
@ -19,7 +20,7 @@ macro_rules! test_distinct {
let config = milli::update::IndexerConfig::default(); let config = milli::update::IndexerConfig::default();
let mut builder = Settings::new(&mut wtxn, &index, &config); let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_distinct_field(S(stringify!($distinct))); builder.set_distinct_field(S(stringify!($distinct)));
builder.execute(|_| (), || false, Default::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();

View file

@ -25,7 +25,7 @@ fn test_facet_distribution_with_no_facet_values() {
FilterableAttributesRule::Field(S("genres")), FilterableAttributesRule::Field(S("genres")),
FilterableAttributesRule::Field(S("tags")), FilterableAttributesRule::Field(S("tags")),
]); ]);
builder.execute(|_| (), || false, Default::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// index documents // index documents

View file

@ -63,7 +63,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
S("america") => vec![S("the united states")], S("america") => vec![S("the united states")],
}); });
builder.set_searchable_fields(vec![S("title"), S("description")]); builder.set_searchable_fields(vec![S("title"), S("description")]);
builder.execute(|_| (), || false, Default::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// index documents // index documents

View file

@ -1,3 +1,4 @@
use milli::progress::Progress;
use milli::update::{IndexerConfig, Settings}; use milli::update::{IndexerConfig, Settings};
use milli::{Criterion, Index, Search, TermsMatchingStrategy}; use milli::{Criterion, Index, Search, TermsMatchingStrategy};
@ -10,7 +11,7 @@ fn set_stop_words(index: &Index, stop_words: &[&str]) {
let mut builder = Settings::new(&mut wtxn, index, &config); let mut builder = Settings::new(&mut wtxn, index, &config);
let stop_words = stop_words.iter().map(|s| s.to_string()).collect(); let stop_words = stop_words.iter().map(|s| s.to_string()).collect();
builder.set_stop_words(stop_words); builder.set_stop_words(stop_words);
builder.execute(|_| (), || false, Default::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
} }

View file

@ -236,7 +236,7 @@ fn criteria_mixup() {
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index, &config); let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_criteria(criteria.clone()); builder.set_criteria(criteria.clone());
builder.execute(|_| (), || false, Default::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
@ -276,7 +276,7 @@ fn criteria_ascdesc() {
S("name"), S("name"),
S("age"), S("age"),
}); });
builder.execute(|_| (), || false, Default::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
@ -359,7 +359,7 @@ fn criteria_ascdesc() {
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index, &config); let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_criteria(vec![criterion.clone()]); builder.set_criteria(vec![criterion.clone()]);
builder.execute(|_| (), || false, Default::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();

View file

@ -46,7 +46,7 @@ fn test_typo_tolerance_one_typo() {
let config = IndexerConfig::default(); let config = IndexerConfig::default();
let mut builder = Settings::new(&mut txn, &index, &config); let mut builder = Settings::new(&mut txn, &index, &config);
builder.set_min_word_len_one_typo(4); builder.set_min_word_len_one_typo(4);
builder.execute(|_| (), || false, Default::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
// typo is now supported for 4 letters words // typo is now supported for 4 letters words
let mut search = Search::new(&txn, &index); let mut search = Search::new(&txn, &index);
@ -92,7 +92,7 @@ fn test_typo_tolerance_two_typo() {
let config = IndexerConfig::default(); let config = IndexerConfig::default();
let mut builder = Settings::new(&mut txn, &index, &config); let mut builder = Settings::new(&mut txn, &index, &config);
builder.set_min_word_len_two_typos(7); builder.set_min_word_len_two_typos(7);
builder.execute(|_| (), || false, Default::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
// typo is now supported for 4 letters words // typo is now supported for 4 letters words
let mut search = Search::new(&txn, &index); let mut search = Search::new(&txn, &index);
@ -181,7 +181,7 @@ fn test_typo_disabled_on_word() {
// `zealand` doesn't allow typos anymore // `zealand` doesn't allow typos anymore
exact_words.insert("zealand".to_string()); exact_words.insert("zealand".to_string());
builder.set_exact_words(exact_words); builder.set_exact_words(exact_words);
builder.execute(|_| (), || false, Default::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
let mut search = Search::new(&txn, &index); let mut search = Search::new(&txn, &index);
search.query("zealand"); search.query("zealand");
@ -219,7 +219,7 @@ fn test_disable_typo_on_attribute() {
let mut builder = Settings::new(&mut txn, &index, &config); let mut builder = Settings::new(&mut txn, &index, &config);
// disable typos on `description` // disable typos on `description`
builder.set_exact_attributes(vec!["description".to_string()].into_iter().collect()); builder.set_exact_attributes(vec!["description".to_string()].into_iter().collect());
builder.execute(|_| (), || false, Default::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
let mut search = Search::new(&txn, &index); let mut search = Search::new(&txn, &index);
search.query("antebelum"); search.query("antebelum");