diff --git a/benchmarks/benches/indexing.rs b/benchmarks/benches/indexing.rs index 93a57091a..a84998b12 100644 --- a/benchmarks/benches/indexing.rs +++ b/benchmarks/benches/indexing.rs @@ -6,7 +6,7 @@ use std::path::Path; use criterion::{criterion_group, criterion_main, Criterion}; use heed::EnvOpenOptions; -use milli::update::UpdateBuilder; +use milli::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig, Settings}; use milli::Index; #[cfg(target_os = "linux")] @@ -39,9 +39,9 @@ fn indexing_songs_default(c: &mut Criterion) { move || { let index = setup_index(); - let update_builder = UpdateBuilder::new(); + let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let mut builder = update_builder.settings(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_primary_key("id".to_owned()); let displayed_fields = @@ -66,12 +66,15 @@ fn indexing_songs_default(c: &mut Criterion) { index }, move |index| { - let update_builder = UpdateBuilder::new(); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = update_builder.index_documents(&mut wtxn, &index); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); - builder.execute(documents, |_| ()).unwrap(); + builder.add_documents(documents).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); index.prepare_for_closing().wait(); @@ -88,9 +91,9 @@ fn indexing_songs_without_faceted_numbers(c: &mut Criterion) { move || { let index = setup_index(); - let update_builder = UpdateBuilder::new(); + let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let mut builder = update_builder.settings(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_primary_key("id".to_owned()); let displayed_fields = @@ -112,12 +115,16 @@ fn indexing_songs_without_faceted_numbers(c: &mut Criterion) { index }, move |index| { - let update_builder = UpdateBuilder::new(); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = update_builder.index_documents(&mut wtxn, &index); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); - builder.execute(documents, |_| ()).unwrap(); + + builder.add_documents(documents).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); index.prepare_for_closing().wait(); @@ -134,9 +141,9 @@ fn indexing_songs_without_faceted_fields(c: &mut Criterion) { move || { let index = setup_index(); - let update_builder = UpdateBuilder::new(); + let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let mut builder = update_builder.settings(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_primary_key("id".to_owned()); let displayed_fields = @@ -154,12 +161,15 @@ fn indexing_songs_without_faceted_fields(c: &mut Criterion) { index }, move |index| { - let update_builder = UpdateBuilder::new(); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = update_builder.index_documents(&mut wtxn, &index); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); - builder.execute(documents, |_| ()).unwrap(); + builder.add_documents(documents).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); index.prepare_for_closing().wait(); @@ -176,9 +186,9 @@ fn indexing_wiki(c: &mut Criterion) { move || { let index = setup_index(); - let update_builder = UpdateBuilder::new(); + let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let mut builder = update_builder.settings(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_primary_key("id".to_owned()); let displayed_fields = @@ -195,13 +205,16 @@ fn indexing_wiki(c: &mut Criterion) { index }, move |index| { - let update_builder = UpdateBuilder::new(); + let config = IndexerConfig::default(); + let indexing_config = + IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut wtxn = index.write_txn().unwrap(); - let mut builder = update_builder.index_documents(&mut wtxn, &index); - builder.enable_autogenerate_docids(); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES, "csv"); - builder.execute(documents, |_| ()).unwrap(); + builder.add_documents(documents).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); index.prepare_for_closing().wait(); @@ -218,9 +231,9 @@ fn indexing_movies_default(c: &mut Criterion) { move || { let index = setup_index(); - let update_builder = UpdateBuilder::new(); + let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let mut builder = update_builder.settings(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_primary_key("id".to_owned()); let displayed_fields = ["title", "poster", "overview", "release_date", "genres"] @@ -242,12 +255,15 @@ fn indexing_movies_default(c: &mut Criterion) { index }, move |index| { - let update_builder = UpdateBuilder::new(); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = update_builder.index_documents(&mut wtxn, &index); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); let documents = utils::documents_from(datasets_paths::MOVIES, "json"); - builder.execute(documents, |_| ()).unwrap(); + builder.add_documents(documents).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); index.prepare_for_closing().wait(); @@ -264,9 +280,9 @@ fn indexing_geo(c: &mut Criterion) { move || { let index = setup_index(); - let update_builder = UpdateBuilder::new(); + let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let mut builder = update_builder.settings(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_primary_key("geonameid".to_owned()); let displayed_fields = @@ -293,12 +309,15 @@ fn indexing_geo(c: &mut Criterion) { index }, move |index| { - let update_builder = UpdateBuilder::new(); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = update_builder.index_documents(&mut wtxn, &index); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); let documents = utils::documents_from(datasets_paths::SMOL_ALL_COUNTRIES, "jsonl"); - builder.execute(documents, |_| ()).unwrap(); + builder.add_documents(documents).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); diff --git a/benchmarks/benches/utils.rs b/benchmarks/benches/utils.rs index df5a7b828..383587ef8 100644 --- a/benchmarks/benches/utils.rs +++ b/benchmarks/benches/utils.rs @@ -8,7 +8,9 @@ use std::path::Path; use criterion::BenchmarkId; use heed::EnvOpenOptions; use milli::documents::DocumentBatchReader; -use milli::update::{IndexDocumentsMethod, Settings, UpdateBuilder}; +use milli::update::{ + IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings, +}; use milli::{Filter, Index}; use serde_json::{Map, Value}; @@ -65,9 +67,9 @@ pub fn base_setup(conf: &Conf) -> Index { options.max_readers(10); let index = Index::new(options, conf.database_name).unwrap(); - let update_builder = UpdateBuilder::new(); + let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let mut builder = update_builder.settings(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); if let Some(primary_key) = conf.primary_key { builder.set_primary_key(primary_key.to_string()); @@ -87,16 +89,19 @@ pub fn base_setup(conf: &Conf) -> Index { builder.execute(|_| ()).unwrap(); wtxn.commit().unwrap(); - let update_builder = UpdateBuilder::new(); + let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let mut builder = update_builder.index_documents(&mut wtxn, &index); - if let None = conf.primary_key { - builder.enable_autogenerate_docids(); - } + let indexing_config = IndexDocumentsConfig { + autogenerate_docids: conf.primary_key.is_none(), + update_method: IndexDocumentsMethod::ReplaceDocuments, + ..Default::default() + }; + let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); let documents = documents_from(conf.dataset, conf.dataset_format); - builder.index_documents_method(IndexDocumentsMethod::ReplaceDocuments); - builder.execute(documents, |_| ()).unwrap(); + builder.add_documents(documents).unwrap(); + + builder.execute().unwrap(); wtxn.commit().unwrap(); index diff --git a/cli/src/main.rs b/cli/src/main.rs index b3c18244d..1edc171b0 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -9,6 +9,7 @@ use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use milli::update::UpdateIndexingStep::{ ComputeIdsAndMergeDocuments, IndexDocuments, MergeDataIntoFinalDatabase, RemapDocumentAddition, }; +use milli::update::{IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig}; use structopt::StructOpt; #[cfg(target_os = "linux")] @@ -122,18 +123,18 @@ impl DocumentAddition { println!("Adding {} documents to the index.", reader.len()); let mut txn = index.env.write_txn()?; - let mut addition = milli::update::IndexDocuments::new(&mut txn, &index); - - if self.update_documents { - addition.index_documents_method(milli::update::IndexDocumentsMethod::UpdateDocuments); - } - - addition.log_every_n(100); - - if self.autogen_docids { - addition.enable_autogenerate_docids() - } + let config = milli::update::IndexerConfig { log_every_n: Some(100), ..Default::default() }; + let update_method = if self.update_documents { + IndexDocumentsMethod::UpdateDocuments + } else { + IndexDocumentsMethod::ReplaceDocuments + }; + let indexing_config = IndexDocumentsConfig { + update_method, + autogenerate_docids: self.autogen_docids, + ..Default::default() + }; let mut bars = Vec::new(); let progesses = MultiProgress::new(); for _ in 0..4 { @@ -141,12 +142,20 @@ impl DocumentAddition { let bar = progesses.add(bar); bars.push(bar); } + let mut addition = milli::update::IndexDocuments::new( + &mut txn, + &index, + &config, + indexing_config, + |step| indexing_callback(step, &bars), + ); + addition.add_documents(reader)?; std::thread::spawn(move || { progesses.join().unwrap(); }); - let result = addition.execute(reader, |step| indexing_callback(step, &bars))?; + let result = addition.execute()?; txn.commit()?; @@ -293,8 +302,9 @@ impl SettingsUpdate { fn perform(&self, index: milli::Index) -> Result<()> { let mut txn = index.env.write_txn()?; - let mut update = milli::update::Settings::new(&mut txn, &index); - update.log_every_n(100); + let config = IndexerConfig { log_every_n: Some(100), ..Default::default() }; + + let mut update = milli::update::Settings::new(&mut txn, &index, &config); if let Some(ref filterable_attributes) = self.filterable_attributes { if !filterable_attributes.is_empty() { diff --git a/http-ui/src/main.rs b/http-ui/src/main.rs index 6502bf83a..039a6c2ae 100644 --- a/http-ui/src/main.rs +++ b/http-ui/src/main.rs @@ -21,13 +21,14 @@ use heed::EnvOpenOptions; use meilisearch_tokenizer::{Analyzer, AnalyzerConfig}; use milli::documents::DocumentBatchReader; use milli::update::UpdateIndexingStep::*; -use milli::update::{IndexDocumentsMethod, Setting, UpdateBuilder}; +use milli::update::{ + ClearDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Setting, +}; use milli::{ obkv_to_json, CompressionType, Filter as MilliFilter, FilterCondition, Index, MatchingWords, SearchResult, SortError, }; use once_cell::sync::OnceCell; -use rayon::ThreadPool; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use structopt::StructOpt; @@ -44,7 +45,7 @@ use self::update_store::UpdateStore; #[global_allocator] static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; -static GLOBAL_THREAD_POOL: OnceCell = OnceCell::new(); +static GLOBAL_CONFIG: OnceCell = OnceCell::new(); #[derive(Debug, StructOpt)] /// The HTTP main server of the milli project. @@ -327,7 +328,19 @@ async fn main() -> anyhow::Result<()> { // Setup the global thread pool let jobs = opt.indexer.indexing_jobs.unwrap_or(0); let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?; - GLOBAL_THREAD_POOL.set(pool).unwrap(); + + let config = IndexerConfig { + max_nb_chunks: opt.indexer.max_nb_chunks, + chunk_compression_level: opt.indexer.chunk_compression_level, + max_positions_per_attributes: opt.indexer.max_positions_per_attributes, + thread_pool: Some(pool), + log_every_n: Some(opt.indexer.log_every_n), + max_memory: Some(opt.indexer.max_memory.get_bytes() as usize), + chunk_compression_type: opt.indexer.chunk_compression_type.unwrap_or(CompressionType::None), + ..Default::default() + }; + + GLOBAL_CONFIG.set(config).unwrap(); // Open the LMDB database. let index = Index::new(options, &opt.database)?; @@ -342,209 +355,207 @@ async fn main() -> anyhow::Result<()> { let (update_status_sender, _) = broadcast::channel(100); let update_status_sender_cloned = update_status_sender.clone(); let index_cloned = index.clone(); - let indexer_opt_cloned = opt.indexer.clone(); let update_store = UpdateStore::open( update_store_options, update_store_path, // the type hint is necessary: https://github.com/rust-lang/rust/issues/32600 move |update_id, meta, content: &_| { // We prepare the update by using the update builder. - let mut update_builder = UpdateBuilder::new(); - if let Some(max_nb_chunks) = indexer_opt_cloned.max_nb_chunks { - update_builder.max_nb_chunks(max_nb_chunks); - } - if let Some(chunk_compression_level) = indexer_opt_cloned.chunk_compression_level { - update_builder.chunk_compression_level(chunk_compression_level); - } - if let Some(max_pos_per_attributes) = indexer_opt_cloned.max_positions_per_attributes { - update_builder.max_positions_per_attributes(max_pos_per_attributes); - } - update_builder.thread_pool(GLOBAL_THREAD_POOL.get().unwrap()); - update_builder.log_every_n(indexer_opt_cloned.log_every_n); - update_builder.max_memory(indexer_opt_cloned.max_memory.get_bytes() as usize); - update_builder.chunk_compression_type( - indexer_opt_cloned.chunk_compression_type.unwrap_or(CompressionType::None), - ); let before_update = Instant::now(); // we extract the update type and execute the update itself. - let result: anyhow::Result<()> = - (|| match meta { - UpdateMeta::DocumentsAddition { method, format, encoding } => { - // We must use the write transaction of the update here. - let mut wtxn = index_cloned.write_txn()?; - let mut builder = update_builder.index_documents(&mut wtxn, &index_cloned); - builder.enable_autogenerate_docids(); + let result: anyhow::Result<()> = (|| match meta { + UpdateMeta::DocumentsAddition { method, format, encoding } => { + // We must use the write transaction of the update here. + let mut wtxn = index_cloned.write_txn()?; + let update_method = match method.as_str() { + "replace" => IndexDocumentsMethod::ReplaceDocuments, + "update" => IndexDocumentsMethod::UpdateDocuments, + otherwise => panic!("invalid indexing method {:?}", otherwise), + }; + let indexing_config = IndexDocumentsConfig { + update_method, + autogenerate_docids: true, + ..Default::default() + }; - match method.as_str() { - "replace" => builder - .index_documents_method(IndexDocumentsMethod::ReplaceDocuments), - "update" => builder - .index_documents_method(IndexDocumentsMethod::UpdateDocuments), - otherwise => panic!("invalid indexing method {:?}", otherwise), + let indexing_callback = |indexing_step| { + let (current, total) = match indexing_step { + RemapDocumentAddition { documents_seen } => (documents_seen, None), + ComputeIdsAndMergeDocuments { documents_seen, total_documents } => { + (documents_seen, Some(total_documents)) + } + IndexDocuments { documents_seen, total_documents } => { + (documents_seen, Some(total_documents)) + } + MergeDataIntoFinalDatabase { databases_seen, total_databases } => { + (databases_seen, Some(total_databases)) + } }; - - let reader = match encoding.as_deref() { - Some("gzip") => Box::new(GzDecoder::new(content)), - None => Box::new(content) as Box, - otherwise => panic!("invalid encoding format {:?}", otherwise), - }; - - let documents = match format.as_str() { - "csv" => documents_from_csv(reader)?, - "json" => documents_from_json(reader)?, - "jsonl" => documents_from_jsonl(reader)?, - otherwise => panic!("invalid update format {:?}", otherwise), - }; - - let documents = DocumentBatchReader::from_reader(Cursor::new(documents))?; - - let result = builder.execute(documents, |indexing_step| { - let (current, total) = match indexing_step { - RemapDocumentAddition { documents_seen } => (documents_seen, None), - ComputeIdsAndMergeDocuments { documents_seen, total_documents } => { - (documents_seen, Some(total_documents)) - } - IndexDocuments { documents_seen, total_documents } => { - (documents_seen, Some(total_documents)) - } - MergeDataIntoFinalDatabase { databases_seen, total_databases } => { - (databases_seen, Some(total_databases)) - } - }; - let _ = update_status_sender_cloned.send(UpdateStatus::Progressing { - update_id, - meta: UpdateMetaProgress::DocumentsAddition { - step: indexing_step.step(), - total_steps: indexing_step.number_of_steps(), - current, - total, - }, - }); + let _ = update_status_sender_cloned.send(UpdateStatus::Progressing { + update_id, + meta: UpdateMetaProgress::DocumentsAddition { + step: indexing_step.step(), + total_steps: indexing_step.number_of_steps(), + current, + total, + }, }); + }; - match result { - Ok(_) => wtxn.commit().map_err(Into::into), - Err(e) => Err(e.into()), - } + let mut builder = milli::update::IndexDocuments::new( + &mut wtxn, + &index_cloned, + GLOBAL_CONFIG.get().unwrap(), + indexing_config, + indexing_callback, + ); + + let reader = match encoding.as_deref() { + Some("gzip") => Box::new(GzDecoder::new(content)), + None => Box::new(content) as Box, + otherwise => panic!("invalid encoding format {:?}", otherwise), + }; + + let documents = match format.as_str() { + "csv" => documents_from_csv(reader)?, + "json" => documents_from_json(reader)?, + "jsonl" => documents_from_jsonl(reader)?, + otherwise => panic!("invalid update format {:?}", otherwise), + }; + + let documents = DocumentBatchReader::from_reader(Cursor::new(documents))?; + + builder.add_documents(documents)?; + + let result = builder.execute(); + + match result { + Ok(_) => wtxn.commit().map_err(Into::into), + Err(e) => Err(e.into()), } - UpdateMeta::ClearDocuments => { - // We must use the write transaction of the update here. - let mut wtxn = index_cloned.write_txn()?; - let builder = update_builder.clear_documents(&mut wtxn, &index_cloned); + } + UpdateMeta::ClearDocuments => { + // We must use the write transaction of the update here. + let mut wtxn = index_cloned.write_txn()?; + let builder = ClearDocuments::new(&mut wtxn, &index_cloned); - match builder.execute() { - Ok(_count) => wtxn.commit().map_err(Into::into), - Err(e) => Err(e.into()), - } + match builder.execute() { + Ok(_count) => wtxn.commit().map_err(Into::into), + Err(e) => Err(e.into()), } - UpdateMeta::Settings(settings) => { - // We must use the write transaction of the update here. - let mut wtxn = index_cloned.write_txn()?; - let mut builder = update_builder.settings(&mut wtxn, &index_cloned); + } + UpdateMeta::Settings(settings) => { + // We must use the write transaction of the update here. + let mut wtxn = index_cloned.write_txn()?; + let mut builder = milli::update::Settings::new( + &mut wtxn, + &index_cloned, + GLOBAL_CONFIG.get().unwrap(), + ); - // We transpose the settings JSON struct into a real setting update. - match settings.searchable_attributes { - Setting::Set(searchable_attributes) => { - builder.set_searchable_fields(searchable_attributes) + // We transpose the settings JSON struct into a real setting update. + match settings.searchable_attributes { + Setting::Set(searchable_attributes) => { + builder.set_searchable_fields(searchable_attributes) + } + Setting::Reset => builder.reset_searchable_fields(), + Setting::NotSet => (), + } + + // We transpose the settings JSON struct into a real setting update. + match settings.displayed_attributes { + Setting::Set(displayed_attributes) => { + builder.set_displayed_fields(displayed_attributes) + } + Setting::Reset => builder.reset_displayed_fields(), + Setting::NotSet => (), + } + + // We transpose the settings JSON struct into a real setting update. + match settings.filterable_attributes { + Setting::Set(filterable_attributes) => { + builder.set_filterable_fields(filterable_attributes) + } + Setting::Reset => builder.reset_filterable_fields(), + Setting::NotSet => (), + } + + // We transpose the settings JSON struct into a real setting update. + match settings.sortable_attributes { + Setting::Set(sortable_attributes) => { + builder.set_sortable_fields(sortable_attributes) + } + Setting::Reset => builder.reset_sortable_fields(), + Setting::NotSet => (), + } + + // We transpose the settings JSON struct into a real setting update. + match settings.criteria { + Setting::Set(criteria) => builder.set_criteria(criteria), + Setting::Reset => builder.reset_criteria(), + Setting::NotSet => (), + } + + // We transpose the settings JSON struct into a real setting update. + match settings.stop_words { + Setting::Set(stop_words) => builder.set_stop_words(stop_words), + Setting::Reset => builder.reset_stop_words(), + Setting::NotSet => (), + } + + // We transpose the settings JSON struct into a real setting update. + match settings.synonyms { + Setting::Set(synonyms) => builder.set_synonyms(synonyms), + Setting::Reset => builder.reset_synonyms(), + Setting::NotSet => (), + } + + let result = builder.execute(|indexing_step| { + let (current, total) = match indexing_step { + RemapDocumentAddition { documents_seen } => (documents_seen, None), + ComputeIdsAndMergeDocuments { documents_seen, total_documents } => { + (documents_seen, Some(total_documents)) } - Setting::Reset => builder.reset_searchable_fields(), - Setting::NotSet => (), - } - - // We transpose the settings JSON struct into a real setting update. - match settings.displayed_attributes { - Setting::Set(displayed_attributes) => { - builder.set_displayed_fields(displayed_attributes) + IndexDocuments { documents_seen, total_documents } => { + (documents_seen, Some(total_documents)) } - Setting::Reset => builder.reset_displayed_fields(), - Setting::NotSet => (), - } - - // We transpose the settings JSON struct into a real setting update. - match settings.filterable_attributes { - Setting::Set(filterable_attributes) => { - builder.set_filterable_fields(filterable_attributes) + MergeDataIntoFinalDatabase { databases_seen, total_databases } => { + (databases_seen, Some(total_databases)) } - Setting::Reset => builder.reset_filterable_fields(), - Setting::NotSet => (), - } - - // We transpose the settings JSON struct into a real setting update. - match settings.sortable_attributes { - Setting::Set(sortable_attributes) => { - builder.set_sortable_fields(sortable_attributes) - } - Setting::Reset => builder.reset_sortable_fields(), - Setting::NotSet => (), - } - - // We transpose the settings JSON struct into a real setting update. - match settings.criteria { - Setting::Set(criteria) => builder.set_criteria(criteria), - Setting::Reset => builder.reset_criteria(), - Setting::NotSet => (), - } - - // We transpose the settings JSON struct into a real setting update. - match settings.stop_words { - Setting::Set(stop_words) => builder.set_stop_words(stop_words), - Setting::Reset => builder.reset_stop_words(), - Setting::NotSet => (), - } - - // We transpose the settings JSON struct into a real setting update. - match settings.synonyms { - Setting::Set(synonyms) => builder.set_synonyms(synonyms), - Setting::Reset => builder.reset_synonyms(), - Setting::NotSet => (), - } - - let result = builder.execute(|indexing_step| { - let (current, total) = match indexing_step { - RemapDocumentAddition { documents_seen } => (documents_seen, None), - ComputeIdsAndMergeDocuments { documents_seen, total_documents } => { - (documents_seen, Some(total_documents)) - } - IndexDocuments { documents_seen, total_documents } => { - (documents_seen, Some(total_documents)) - } - MergeDataIntoFinalDatabase { databases_seen, total_databases } => { - (databases_seen, Some(total_databases)) - } - }; - let _ = update_status_sender_cloned.send(UpdateStatus::Progressing { - update_id, - meta: UpdateMetaProgress::DocumentsAddition { - step: indexing_step.step(), - total_steps: indexing_step.number_of_steps(), - current, - total, - }, - }); + }; + let _ = update_status_sender_cloned.send(UpdateStatus::Progressing { + update_id, + meta: UpdateMetaProgress::DocumentsAddition { + step: indexing_step.step(), + total_steps: indexing_step.number_of_steps(), + current, + total, + }, }); + }); - match result { - Ok(_count) => wtxn.commit().map_err(Into::into), - Err(e) => Err(e.into()), - } + match result { + Ok(_count) => wtxn.commit().map_err(Into::into), + Err(e) => Err(e.into()), } - UpdateMeta::Facets(levels) => { - // We must use the write transaction of the update here. - let mut wtxn = index_cloned.write_txn()?; - let mut builder = update_builder.facets(&mut wtxn, &index_cloned); - if let Some(value) = levels.level_group_size { - builder.level_group_size(value); - } - if let Some(value) = levels.min_level_size { - builder.min_level_size(value); - } - match builder.execute() { - Ok(()) => wtxn.commit().map_err(Into::into), - Err(e) => Err(e.into()), - } + } + UpdateMeta::Facets(levels) => { + // We must use the write transaction of the update here. + let mut wtxn = index_cloned.write_txn()?; + let mut builder = milli::update::Facets::new(&mut wtxn, &index_cloned); + if let Some(value) = levels.level_group_size { + builder.level_group_size(value); } - })(); + if let Some(value) = levels.min_level_size { + builder.min_level_size(value); + } + match builder.execute() { + Ok(()) => wtxn.commit().map_err(Into::into), + Err(e) => Err(e.into()), + } + } + })(); let meta = match result { Ok(()) => { diff --git a/milli/src/index.rs b/milli/src/index.rs index 2f51b8c6b..70081dfb0 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -867,7 +867,7 @@ pub(crate) mod tests { use maplit::btreemap; use tempfile::TempDir; - use crate::update::IndexDocuments; + use crate::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig}; use crate::Index; pub(crate) struct TempIndex { @@ -908,8 +908,13 @@ pub(crate) mod tests { { "id": 2, "name": "bob", "age": 20 }, { "id": 2, "name": "bob", "age": 20 } ]); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); @@ -927,13 +932,15 @@ pub(crate) mod tests { // we add all the documents a second time. we are supposed to get the same // field_distribution in the end let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new(&mut wtxn, &index); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); let content = documents!([ { "id": 1, "name": "kevin" }, { "id": 2, "name": "bob", "age": 20 }, { "id": 2, "name": "bob", "age": 20 } ]); - builder.execute(content, |_| ()).unwrap(); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); @@ -955,8 +962,10 @@ pub(crate) mod tests { ]); let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); diff --git a/milli/src/search/distinct/mod.rs b/milli/src/search/distinct/mod.rs index 3d36ed2a3..965423886 100644 --- a/milli/src/search/distinct/mod.rs +++ b/milli/src/search/distinct/mod.rs @@ -38,7 +38,9 @@ mod test { use crate::documents::{DocumentBatchBuilder, DocumentBatchReader}; use crate::index::tests::TempIndex; use crate::index::Index; - use crate::update::{IndexDocumentsMethod, UpdateBuilder}; + use crate::update::{ + IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings, + }; use crate::{DocumentId, FieldId, BEU32}; static JSON: Lazy> = Lazy::new(generate_documents); @@ -84,19 +86,24 @@ mod test { let mut txn = index.write_txn().unwrap(); // set distinct and faceted attributes for the index. - let builder = UpdateBuilder::new(); - let mut update = builder.settings(&mut txn, &index); + let config = IndexerConfig::default(); + let mut update = Settings::new(&mut txn, &index, &config); update.set_distinct_field(distinct.to_string()); update.execute(|_| ()).unwrap(); // add documents to the index - let builder = UpdateBuilder::new(); - let mut addition = builder.index_documents(&mut txn, &index); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig { + update_method: IndexDocumentsMethod::ReplaceDocuments, + ..Default::default() + }; + let mut addition = IndexDocuments::new(&mut txn, &index, &config, indexing_config, |_| ()); - addition.index_documents_method(IndexDocumentsMethod::ReplaceDocuments); let reader = crate::documents::DocumentBatchReader::from_reader(Cursor::new(&*JSON)).unwrap(); - addition.execute(reader, |_| ()).unwrap(); + + addition.add_documents(reader).unwrap(); + addition.execute().unwrap(); let fields_map = index.fields_ids_map(&txn).unwrap(); let fid = fields_map.id(&distinct).unwrap(); diff --git a/milli/src/search/facet/filter.rs b/milli/src/search/facet/filter.rs index b4b4b80b7..edc86d0ca 100644 --- a/milli/src/search/facet/filter.rs +++ b/milli/src/search/facet/filter.rs @@ -450,7 +450,7 @@ mod tests { use maplit::hashset; use super::*; - use crate::update::Settings; + use crate::update::{IndexerConfig, Settings}; use crate::Index; #[test] @@ -461,8 +461,9 @@ mod tests { let index = Index::new(options, &path).unwrap(); // Set the filterable fields to be the channel. + let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_searchable_fields(vec![S("PrIcE")]); // to keep the fields order builder.set_filterable_fields(hashset! { S("PrIcE") }); builder.execute(|_| ()).unwrap(); @@ -563,9 +564,10 @@ mod tests { )); drop(rtxn); + let config = IndexerConfig::default(); // Set the filterable fields to be the channel. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_searchable_fields(vec![S("title")]); builder.set_filterable_fields(hashset! { S("title") }); builder.execute(|_| ()).unwrap(); @@ -593,9 +595,10 @@ mod tests { options.map_size(10 * 1024 * 1024); // 10 MB let index = Index::new(options, &path).unwrap(); + let config = IndexerConfig::default(); // Set the filterable fields to be the channel. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_searchable_fields(vec![S("_geo"), S("price")]); // to keep the fields order builder.set_filterable_fields(hashset! { S("_geo"), S("price") }); builder.execute(|_| ()).unwrap(); diff --git a/milli/src/update/clear_documents.rs b/milli/src/update/clear_documents.rs index 5be3bc23d..8c9178d4e 100644 --- a/milli/src/update/clear_documents.rs +++ b/milli/src/update/clear_documents.rs @@ -77,7 +77,7 @@ mod tests { use heed::EnvOpenOptions; use super::*; - use crate::update::IndexDocuments; + use crate::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig}; #[test] fn clear_documents() { @@ -92,7 +92,11 @@ mod tests { { "id": 1, "name": "kevina" }, { "id": 2, "name": "benoit", "country": "France", "_geo": { "lng": 42, "lat": 35 } } ]); - IndexDocuments::new(&mut wtxn, &index).execute(content, |_| ()).unwrap(); + let indexing_config = IndexDocumentsConfig::default(); + let config = IndexerConfig::default(); + let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); // Clear all documents from the database. let builder = ClearDocuments::new(&mut wtxn, &index); diff --git a/milli/src/update/delete_documents.rs b/milli/src/update/delete_documents.rs index 4c41cbd53..19f1d9f42 100644 --- a/milli/src/update/delete_documents.rs +++ b/milli/src/update/delete_documents.rs @@ -580,7 +580,7 @@ mod tests { use maplit::hashset; use super::*; - use crate::update::{IndexDocuments, Settings}; + use crate::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig, Settings}; use crate::Filter; #[test] @@ -596,8 +596,11 @@ mod tests { { "id": 1, "name": "kevina", "array": ["I", "am", "fine"] }, { "id": 2, "name": "benoit", "array_of_object": [{ "wow": "amazing" }] } ]); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); // delete those documents, ids are synchronous therefore 0, 1, and 2. let mut builder = DeleteDocuments::new(&mut wtxn, &index).unwrap(); @@ -626,8 +629,12 @@ mod tests { { "mysuperid": 1, "name": "kevina" }, { "mysuperid": 2, "name": "benoit" } ]); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); // Delete not all of the documents but some of them. let mut builder = DeleteDocuments::new(&mut wtxn, &index).unwrap(); @@ -646,7 +653,8 @@ mod tests { let index = Index::new(options, &path).unwrap(); let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let config = IndexerConfig::default(); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_primary_key(S("docid")); builder.set_filterable_fields(hashset! { S("label") }); builder.execute(|_| ()).unwrap(); @@ -673,8 +681,12 @@ mod tests { {"docid":"1_68","label":"design"}, {"docid":"1_69","label":"geometry"} ]); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); // Delete not all of the documents but some of them. let mut builder = DeleteDocuments::new(&mut wtxn, &index).unwrap(); @@ -696,7 +708,8 @@ mod tests { let index = Index::new(options, &path).unwrap(); let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let config = IndexerConfig::default(); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_primary_key(S("id")); builder.set_filterable_fields(hashset!(S("_geo"))); builder.set_sortable_fields(hashset!(S("_geo"))); @@ -726,7 +739,11 @@ mod tests { ]); let external_ids_to_delete = ["5", "6", "7", "12", "17", "19"]; - IndexDocuments::new(&mut wtxn, &index).execute(content, |_| ()).unwrap(); + let indexing_config = IndexDocumentsConfig::default(); + + let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); let external_document_ids = index.external_documents_ids(&wtxn).unwrap(); let ids_to_delete: Vec = external_ids_to_delete diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index b0c0a5362..4fbb75d5f 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -7,13 +7,9 @@ use std::collections::HashSet; use std::io::{Read, Seek}; use std::iter::FromIterator; use std::num::{NonZeroU32, NonZeroUsize}; -use std::time::Instant; -use chrono::Utc; use crossbeam_channel::{Receiver, Sender}; -use grenad::{self, CompressionType}; -use log::{debug, info}; -use rayon::ThreadPool; +use log::debug; use roaring::RoaringBitmap; use serde::{Deserialize, Serialize}; use typed_chunk::{write_typed_chunk_into_index, TypedChunk}; @@ -26,8 +22,8 @@ use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; pub use self::transform::{Transform, TransformOutput}; use crate::documents::DocumentBatchReader; use crate::update::{ - Facets, UpdateBuilder, UpdateIndexingStep, WordPrefixDocids, WordPrefixPairProximityDocids, - WordPrefixPositionDocids, WordsPrefixesFst, + self, Facets, IndexerConfig, UpdateIndexingStep, WordPrefixDocids, + WordPrefixPairProximityDocids, WordPrefixPositionDocids, WordsPrefixesFst, }; use crate::{Index, Result}; @@ -55,120 +51,116 @@ pub enum IndexDocumentsMethod { UpdateDocuments, } +impl Default for IndexDocumentsMethod { + fn default() -> Self { + Self::ReplaceDocuments + } +} + #[derive(Debug, Copy, Clone)] pub enum WriteMethod { Append, GetMergePut, } -pub struct IndexDocuments<'t, 'u, 'i, 'a> { +pub struct IndexDocuments<'t, 'u, 'i, 'a, F> { wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index, - pub(crate) log_every_n: Option, - pub(crate) documents_chunk_size: Option, - pub(crate) max_nb_chunks: Option, - pub(crate) max_memory: Option, - pub(crate) chunk_compression_type: CompressionType, - pub(crate) chunk_compression_level: Option, - pub(crate) thread_pool: Option<&'a ThreadPool>, - pub(crate) max_positions_per_attributes: Option, - facet_level_group_size: Option, - facet_min_level_size: Option, - words_prefix_threshold: Option, - max_prefix_length: Option, - words_positions_level_group_size: Option, - words_positions_min_level_size: Option, - update_method: IndexDocumentsMethod, - autogenerate_docids: bool, + config: IndexDocumentsConfig, + indexer_config: &'a IndexerConfig, + transform: Option>, + progress: F, + added_documents: u64, } -impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { +#[derive(Default, Debug, Clone)] +pub struct IndexDocumentsConfig { + pub facet_level_group_size: Option, + pub facet_min_level_size: Option, + pub words_prefix_threshold: Option, + pub max_prefix_length: Option, + pub words_positions_level_group_size: Option, + pub words_positions_min_level_size: Option, + pub update_method: IndexDocumentsMethod, + pub autogenerate_docids: bool, +} + +impl<'t, 'u, 'i, 'a, F> IndexDocuments<'t, 'u, 'i, 'a, F> +where + F: Fn(UpdateIndexingStep) + Sync, +{ pub fn new( wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index, - ) -> IndexDocuments<'t, 'u, 'i, 'a> { + indexer_config: &'a IndexerConfig, + config: IndexDocumentsConfig, + progress: F, + ) -> IndexDocuments<'t, 'u, 'i, 'a, F> { + let transform = Some(Transform::new( + &index, + indexer_config, + config.update_method, + config.autogenerate_docids, + )); + IndexDocuments { + transform, + config, + indexer_config, + progress, wtxn, index, - log_every_n: None, - documents_chunk_size: None, - max_nb_chunks: None, - max_memory: None, - chunk_compression_type: CompressionType::None, - chunk_compression_level: None, - thread_pool: None, - facet_level_group_size: None, - facet_min_level_size: None, - words_prefix_threshold: None, - max_prefix_length: None, - words_positions_level_group_size: None, - words_positions_min_level_size: None, - update_method: IndexDocumentsMethod::ReplaceDocuments, - autogenerate_docids: false, - max_positions_per_attributes: None, + added_documents: 0, } } - pub fn log_every_n(&mut self, n: usize) { - self.log_every_n = Some(n); - } - - pub fn index_documents_method(&mut self, method: IndexDocumentsMethod) { - self.update_method = method; - } - - pub fn enable_autogenerate_docids(&mut self) { - self.autogenerate_docids = true; - } - - pub fn disable_autogenerate_docids(&mut self) { - self.autogenerate_docids = false; - } - - #[logging_timer::time("IndexDocuments::{}")] - pub fn execute( - self, - reader: DocumentBatchReader, - progress_callback: F, - ) -> Result + /// Adds a batch of documents to the current builder. + /// + /// Since the documents are progressively added to the writer, a failure will cause a stale + /// builder, and the builder must be discarded. + /// + /// Returns the number of documents added to the builder. + pub fn add_documents(&mut self, reader: DocumentBatchReader) -> Result where R: Read + Seek, - F: Fn(UpdateIndexingStep) + Sync, { // Early return when there is no document to add if reader.is_empty() { - return Ok(DocumentAdditionResult { - indexed_documents: 0, - number_of_documents: self.index.number_of_documents(self.wtxn)?, - }); + return Ok(0); } - self.index.set_updated_at(self.wtxn, &Utc::now())?; - let before_transform = Instant::now(); - let transform = Transform { - rtxn: &self.wtxn, - index: self.index, - log_every_n: self.log_every_n, - chunk_compression_type: self.chunk_compression_type, - chunk_compression_level: self.chunk_compression_level, - max_nb_chunks: self.max_nb_chunks, - max_memory: self.max_memory, - index_documents_method: self.update_method, - autogenerate_docids: self.autogenerate_docids, - }; + let indexed_documents = self + .transform + .as_mut() + .expect("Invalid document addition state") + .read_documents(reader, self.wtxn, &self.progress)? + as u64; - let output = transform.read_documents(reader, &progress_callback)?; + self.added_documents += indexed_documents; + + Ok(indexed_documents) + } + + #[logging_timer::time("IndexDocuments::{}")] + pub fn execute(mut self) -> Result { + if self.added_documents == 0 { + let number_of_documents = self.index.number_of_documents(self.wtxn)?; + return Ok(DocumentAdditionResult { indexed_documents: 0, number_of_documents }); + } + let output = self + .transform + .take() + .expect("Invalid document addition state") + .output_from_sorter(self.wtxn, &self.progress)?; let indexed_documents = output.documents_count as u64; - - info!("Update transformed in {:.02?}", before_transform.elapsed()); - - let number_of_documents = self.execute_raw(output, progress_callback)?; + let number_of_documents = self.execute_raw(output)?; Ok(DocumentAdditionResult { indexed_documents, number_of_documents }) } + /// Returns the total number of documents in the index after the update. #[logging_timer::time("IndexDocuments::{}")] - pub fn execute_raw(self, output: TransformOutput, progress_callback: F) -> Result + pub fn execute_raw(self, output: TransformOutput) -> Result where F: Fn(UpdateIndexingStep) + Sync, { @@ -188,8 +180,8 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { self.index.put_fields_ids_map(self.wtxn, &fields_ids_map)?; let backup_pool; - let pool = match self.thread_pool { - Some(pool) => pool, + let pool = match self.indexer_config.thread_pool { + Some(ref pool) => pool, #[cfg(not(test))] None => { // We initialize a bakcup pool with the default @@ -237,22 +229,21 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { }; let stop_words = self.index.stop_words(self.wtxn)?; - // let stop_words = stop_words.as_ref(); // Run extraction pipeline in parallel. pool.install(|| { let params = GrenadParameters { - chunk_compression_type: self.chunk_compression_type, - chunk_compression_level: self.chunk_compression_level, - max_memory: self.max_memory, - max_nb_chunks: self.max_nb_chunks, // default value, may be chosen. + chunk_compression_type: self.indexer_config.chunk_compression_type, + chunk_compression_level: self.indexer_config.chunk_compression_level, + max_memory: self.indexer_config.max_memory, + max_nb_chunks: self.indexer_config.max_nb_chunks, // default value, may be chosen. }; // split obkv file into several chuncks let chunk_iter = grenad_obkv_into_chunks( documents_file, params.clone(), - self.documents_chunk_size.unwrap_or(1024 * 1024 * 128), // 128MiB + self.indexer_config.documents_chunk_size.unwrap_or(1024 * 1024 * 128), // 128MiB ); let result = chunk_iter.map(|chunk_iter| { @@ -266,7 +257,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { primary_key_id, geo_field_id, stop_words, - self.max_positions_per_attributes, + self.indexer_config.max_positions_per_attributes, ) }); @@ -281,17 +272,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { // We delete the documents that this document addition replaces. This way we are // able to simply insert all the documents even if they already exist in the database. if !replaced_documents_ids.is_empty() { - let update_builder = UpdateBuilder { - log_every_n: self.log_every_n, - max_nb_chunks: self.max_nb_chunks, - max_memory: self.max_memory, - documents_chunk_size: self.documents_chunk_size, - chunk_compression_type: self.chunk_compression_type, - chunk_compression_level: self.chunk_compression_level, - thread_pool: self.thread_pool, - max_positions_per_attributes: self.max_positions_per_attributes, - }; - let mut deletion_builder = update_builder.delete_documents(self.wtxn, self.index)?; + let mut deletion_builder = update::DeleteDocuments::new(self.wtxn, self.index)?; debug!("documents to delete {:?}", replaced_documents_ids); deletion_builder.delete_documents(&replaced_documents_ids); let deleted_documents_count = deletion_builder.execute()?; @@ -303,7 +284,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { let mut final_documents_ids = RoaringBitmap::new(); let mut databases_seen = 0; - progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { + (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { databases_seen, total_databases: TOTAL_POSTING_DATABASE_COUNT, }); @@ -314,7 +295,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { if !docids.is_empty() { final_documents_ids |= docids; let documents_seen_count = final_documents_ids.len(); - progress_callback(UpdateIndexingStep::IndexDocuments { + (self.progress)(UpdateIndexingStep::IndexDocuments { documents_seen: documents_seen_count as usize, total_documents: documents_count, }); @@ -325,7 +306,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { } if is_merged_database { databases_seen += 1; - progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { + (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { databases_seen, total_databases: TOTAL_POSTING_DATABASE_COUNT, }); @@ -344,98 +325,95 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> { let all_documents_ids = index_documents_ids | new_documents_ids | replaced_documents_ids; self.index.put_documents_ids(self.wtxn, &all_documents_ids)?; - self.execute_prefix_databases(progress_callback)?; + self.execute_prefix_databases()?; Ok(all_documents_ids.len()) } #[logging_timer::time("IndexDocuments::{}")] - pub fn execute_prefix_databases(self, progress_callback: F) -> Result<()> - where - F: Fn(UpdateIndexingStep) + Sync, - { + pub fn execute_prefix_databases(self) -> Result<()> { // Merged databases are already been indexed, we start from this count; let mut databases_seen = MERGED_DATABASE_COUNT; // Run the facets update operation. let mut builder = Facets::new(self.wtxn, self.index); - builder.chunk_compression_type = self.chunk_compression_type; - builder.chunk_compression_level = self.chunk_compression_level; - if let Some(value) = self.facet_level_group_size { + builder.chunk_compression_type = self.indexer_config.chunk_compression_type; + builder.chunk_compression_level = self.indexer_config.chunk_compression_level; + if let Some(value) = self.config.facet_level_group_size { builder.level_group_size(value); } - if let Some(value) = self.facet_min_level_size { + if let Some(value) = self.config.facet_min_level_size { builder.min_level_size(value); } builder.execute()?; databases_seen += 1; - progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { - databases_seen: databases_seen, + (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { + databases_seen, total_databases: TOTAL_POSTING_DATABASE_COUNT, }); // Run the words prefixes update operation. let mut builder = WordsPrefixesFst::new(self.wtxn, self.index); - if let Some(value) = self.words_prefix_threshold { + if let Some(value) = self.config.words_prefix_threshold { builder.threshold(value); } - if let Some(value) = self.max_prefix_length { + if let Some(value) = self.config.max_prefix_length { builder.max_prefix_length(value); } builder.execute()?; databases_seen += 1; - progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { - databases_seen: databases_seen, + (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { + databases_seen, total_databases: TOTAL_POSTING_DATABASE_COUNT, }); // Run the word prefix docids update operation. let mut builder = WordPrefixDocids::new(self.wtxn, self.index); - builder.chunk_compression_type = self.chunk_compression_type; - builder.chunk_compression_level = self.chunk_compression_level; - builder.max_nb_chunks = self.max_nb_chunks; - builder.max_memory = self.max_memory; + builder.chunk_compression_type = self.indexer_config.chunk_compression_type; + builder.chunk_compression_level = self.indexer_config.chunk_compression_level; + builder.max_nb_chunks = self.indexer_config.max_nb_chunks; + builder.max_memory = self.indexer_config.max_memory; builder.execute()?; databases_seen += 1; - progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { - databases_seen: databases_seen, + (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { + databases_seen, total_databases: TOTAL_POSTING_DATABASE_COUNT, }); // Run the word prefix pair proximity docids update operation. let mut builder = WordPrefixPairProximityDocids::new(self.wtxn, self.index); - builder.chunk_compression_type = self.chunk_compression_type; - builder.chunk_compression_level = self.chunk_compression_level; - builder.max_nb_chunks = self.max_nb_chunks; - builder.max_memory = self.max_memory; + builder.chunk_compression_type = self.indexer_config.chunk_compression_type; + builder.chunk_compression_level = self.indexer_config.chunk_compression_level; + builder.max_nb_chunks = self.indexer_config.max_nb_chunks; + builder.max_memory = self.indexer_config.max_memory; builder.execute()?; databases_seen += 1; - progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { - databases_seen: databases_seen, + (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { + databases_seen, total_databases: TOTAL_POSTING_DATABASE_COUNT, }); // Run the words prefix position docids update operation. let mut builder = WordPrefixPositionDocids::new(self.wtxn, self.index); - builder.chunk_compression_type = self.chunk_compression_type; - builder.chunk_compression_level = self.chunk_compression_level; - builder.max_nb_chunks = self.max_nb_chunks; - builder.max_memory = self.max_memory; - if let Some(value) = self.words_positions_level_group_size { + builder.chunk_compression_type = self.indexer_config.chunk_compression_type; + builder.chunk_compression_level = self.indexer_config.chunk_compression_level; + builder.max_nb_chunks = self.indexer_config.max_nb_chunks; + builder.max_memory = self.indexer_config.max_memory; + if let Some(value) = self.config.words_positions_level_group_size { builder.level_group_size(value); } - if let Some(value) = self.words_positions_min_level_size { + if let Some(value) = self.config.words_positions_min_level_size { builder.min_level_size(value); } builder.execute()?; databases_seen += 1; - progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase { - databases_seen: databases_seen, + (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { + databases_seen, total_databases: TOTAL_POSTING_DATABASE_COUNT, }); @@ -469,8 +447,13 @@ mod tests { { "id": 2, "name": "kevina" }, { "id": 3, "name": "benoit" } ]); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // Check that there is 3 documents now. @@ -482,8 +465,10 @@ mod tests { // Second we send 1 document with id 1, to erase the previous ones. let mut wtxn = index.write_txn().unwrap(); let content = documents!([ { "id": 1, "name": "updated kevin" } ]); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // Check that there is **always** 3 documents. @@ -499,8 +484,8 @@ mod tests { { "id": 2, "name": "updated kevina" }, { "id": 3, "name": "updated benoit" } ]); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + builder.add_documents(content).unwrap(); wtxn.commit().unwrap(); // Check that there is **always** 3 documents. @@ -525,9 +510,15 @@ mod tests { { "id": 1, "name": "kevina" }, { "id": 1, "name": "benoit" } ]); - let mut builder = IndexDocuments::new(&mut wtxn, &index); - builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments); - builder.execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig { + update_method: IndexDocumentsMethod::UpdateDocuments, + ..Default::default() + }; + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // Check that there is only 1 document now. @@ -551,9 +542,9 @@ mod tests { // Second we send 1 document with id 1, to force it to be merged with the previous one. let mut wtxn = index.write_txn().unwrap(); let content = documents!([ { "id": 1, "age": 25 } ]); - let mut builder = IndexDocuments::new(&mut wtxn, &index); - builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments); - builder.execute(content, |_| ()).unwrap(); + let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // Check that there is **always** 1 document. @@ -590,8 +581,10 @@ mod tests { { "name": "kevina" }, { "name": "benoit" } ]); - let builder = IndexDocuments::new(&mut wtxn, &index); - assert!(builder.execute(content, |_| ()).is_err()); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + assert!(builder.add_documents(content).is_err()); wtxn.commit().unwrap(); // Check that there is no document. @@ -615,9 +608,13 @@ mod tests { { "name": "kevina" }, { "name": "benoit" } ]); - let mut builder = IndexDocuments::new(&mut wtxn, &index); - builder.enable_autogenerate_docids(); - builder.execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = + IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // Check that there is 3 documents now. @@ -633,8 +630,9 @@ mod tests { // Second we send 1 document with the generated uuid, to erase the previous ones. let mut wtxn = index.write_txn().unwrap(); let content = documents!([ { "name": "updated kevin", "id": kevin_uuid } ]); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // Check that there is **always** 3 documents. @@ -670,8 +668,11 @@ mod tests { { "id": 2, "name": "kevina" }, { "id": 3, "name": "benoit" } ]); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // Check that there is 3 documents now. @@ -683,9 +684,11 @@ mod tests { // Second we send 1 document without specifying the id. let mut wtxn = index.write_txn().unwrap(); let content = documents!([ { "name": "new kevin" } ]); - let mut builder = IndexDocuments::new(&mut wtxn, &index); - builder.enable_autogenerate_docids(); - builder.execute(content, |_| ()).unwrap(); + let indexing_config = + IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; + let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // Check that there is 4 documents now. @@ -705,8 +708,11 @@ mod tests { // First we send 0 documents and only headers. let mut wtxn = index.write_txn().unwrap(); let content = documents!([]); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // Check that there is no documents. @@ -727,16 +733,20 @@ mod tests { let mut wtxn = index.write_txn().unwrap(); // There is a space in the document id. let content = documents!([ { "id": "brume bleue", "name": "kevin" } ]); - let builder = IndexDocuments::new(&mut wtxn, &index); - assert!(builder.execute(content, |_| ()).is_err()); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + assert!(builder.add_documents(content).is_err()); wtxn.commit().unwrap(); // First we send 1 document with a valid id. let mut wtxn = index.write_txn().unwrap(); // There is a space in the document id. let content = documents!([ { "id": 32, "name": "kevin" } ]); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // Check that there is 1 document now. @@ -760,8 +770,11 @@ mod tests { { "id": 1, "name": "kevina", "array": ["I", "am", "fine"] }, { "id": 2, "name": "benoit", "array_of_object": [{ "wow": "amazing" }] } ]); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // Check that there is 1 documents now. @@ -799,14 +812,22 @@ mod tests { { "id": 4, "title": "Harry Potter and the Half-Blood Prince", "author": "J. K. Rowling", "genre": "fantasy" }, { "id": 42, "title": "The Hitchhiker's Guide to the Galaxy", "author": "Douglas Adams", "_geo": { "lat": 35, "lng": 23 } } ]); - let mut builder = IndexDocuments::new(&mut wtxn, &index); - builder.index_documents_method(IndexDocumentsMethod::ReplaceDocuments); - builder.execute(documents, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig { + update_method: IndexDocumentsMethod::ReplaceDocuments, + ..Default::default() + }; + let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); + builder.add_documents(documents).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); let mut wtxn = index.write_txn().unwrap(); - let mut builder = IndexDocuments::new(&mut wtxn, &index); - builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments); + let indexing_config = IndexDocumentsConfig { + update_method: IndexDocumentsMethod::UpdateDocuments, + ..Default::default() + }; + let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); let documents = documents!([ { "id": 2, @@ -815,7 +836,8 @@ mod tests { } ]); - builder.execute(documents, |_| ()).unwrap(); + builder.add_documents(documents).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); } @@ -833,7 +855,12 @@ mod tests { { "objectId": 1, "title": "Alice In Wonderland", "comment": "A weird book" }, { "objectId": 30, "title": "Hamlet", "_geo": { "lat": 12, "lng": 89 } } ]); - IndexDocuments::new(&mut wtxn, &index).execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); assert_eq!(index.primary_key(&wtxn).unwrap(), Some("objectId")); @@ -848,15 +875,22 @@ mod tests { let content = documents!([ { "objectId": 30, "title": "Hamlet", "_geo": { "lat": 12, "lng": 89 } } ]); - IndexDocuments::new(&mut wtxn, &index).execute(content, |_| ()).unwrap(); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); let external_documents_ids = index.external_documents_ids(&wtxn).unwrap(); assert!(external_documents_ids.get("30").is_some()); let content = documents!([ { "objectId": 30, "title": "Hamlet", "_geo": { "lat": 12, "lng": 89 } } ]); - IndexDocuments::new(&mut wtxn, &index).execute(content, |_| ()).unwrap(); + + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); } @@ -886,8 +920,12 @@ mod tests { cursor.set_position(0); let content = DocumentBatchReader::from_reader(cursor).unwrap(); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); } @@ -916,8 +954,12 @@ mod tests { cursor.set_position(0); let content = DocumentBatchReader::from_reader(cursor).unwrap(); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -969,8 +1011,12 @@ mod tests { }, ]); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); } @@ -990,8 +1036,12 @@ mod tests { ]); let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // Check that there is 4 document now. @@ -1002,8 +1052,12 @@ mod tests { let content = documents!([]); let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // Check that there is 4 document now. @@ -1019,8 +1073,12 @@ mod tests { ]); let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // Check that there is 4 document now. @@ -1042,8 +1100,12 @@ mod tests { ]); let mut wtxn = index.write_txn().unwrap(); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 855fb8db9..f5fb1ec01 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -5,7 +5,6 @@ use std::fs::File; use std::io::{Read, Seek, SeekFrom}; use std::time::Instant; -use grenad::CompressionType; use itertools::Itertools; use log::info; use roaring::RoaringBitmap; @@ -14,7 +13,7 @@ use serde_json::{Map, Value}; use super::helpers::{ create_sorter, create_writer, keep_latest_obkv, merge_obkvs, merge_two_obkvs, MergeFn, }; -use super::IndexDocumentsMethod; +use super::{IndexDocumentsMethod, IndexerConfig}; use crate::documents::{DocumentBatchReader, DocumentsBatchIndex}; use crate::error::{Error, InternalError, UserError}; use crate::index::db_name; @@ -40,16 +39,14 @@ pub struct TransformOutput { /// Outputs the new `FieldsIdsMap`, the new `UsersIdsDocumentsIds` map, the new documents ids, /// the replaced documents ids, the number of documents in this update and the file /// containing all those documents. -pub struct Transform<'t, 'i> { - pub rtxn: &'t heed::RoTxn<'i>, +pub struct Transform<'a, 'i> { pub index: &'i Index, - pub log_every_n: Option, - pub chunk_compression_type: CompressionType, - pub chunk_compression_level: Option, - pub max_nb_chunks: Option, - pub max_memory: Option, - pub index_documents_method: IndexDocumentsMethod, + indexer_settings: &'a IndexerConfig, pub autogenerate_docids: bool, + pub index_documents_method: IndexDocumentsMethod, + + sorter: grenad::Sorter, + documents_count: usize, } /// Create a mapping between the field ids found in the document batch and the one that were @@ -84,56 +81,73 @@ fn find_primary_key(index: &DocumentsBatchIndex) -> Option<&str> { .map(String::as_str) } -impl Transform<'_, '_> { - pub fn read_documents( - self, - mut reader: DocumentBatchReader, - progress_callback: F, - ) -> Result - where - R: Read + Seek, - F: Fn(UpdateIndexingStep) + Sync, - { - let fields_index = reader.index(); - let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?; - let mapping = create_fields_mapping(&mut fields_ids_map, fields_index)?; - - let alternative_name = self - .index - .primary_key(self.rtxn)? - .or_else(|| find_primary_key(fields_index)) - .map(String::from); - - let (primary_key_id, primary_key_name) = compute_primary_key_pair( - self.index.primary_key(self.rtxn)?, - &mut fields_ids_map, - alternative_name, - self.autogenerate_docids, - )?; - +impl<'a, 'i> Transform<'a, 'i> { + pub fn new( + index: &'i Index, + indexer_settings: &'a IndexerConfig, + index_documents_method: IndexDocumentsMethod, + autogenerate_docids: bool, + ) -> Self { // We must choose the appropriate merge function for when two or more documents // with the same user id must be merged or fully replaced in the same batch. - let merge_function = match self.index_documents_method { + let merge_function = match index_documents_method { IndexDocumentsMethod::ReplaceDocuments => keep_latest_obkv, IndexDocumentsMethod::UpdateDocuments => merge_obkvs, }; // We initialize the sorter with the user indexing settings. - let mut sorter = create_sorter( + let sorter = create_sorter( merge_function, - self.chunk_compression_type, - self.chunk_compression_level, - self.max_nb_chunks, - self.max_memory, + indexer_settings.chunk_compression_type, + indexer_settings.chunk_compression_level, + indexer_settings.max_nb_chunks, + indexer_settings.max_memory, ); + Transform { + index, + indexer_settings, + autogenerate_docids, + sorter, + documents_count: 0, + index_documents_method, + } + } + + pub fn read_documents( + &mut self, + mut reader: DocumentBatchReader, + wtxn: &mut heed::RwTxn, + progress_callback: F, + ) -> Result + where + R: Read + Seek, + F: Fn(UpdateIndexingStep) + Sync, + { + let fields_index = reader.index(); + let mut fields_ids_map = self.index.fields_ids_map(wtxn)?; + let mapping = create_fields_mapping(&mut fields_ids_map, fields_index)?; + + let alternative_name = self + .index + .primary_key(wtxn)? + .or_else(|| find_primary_key(fields_index)) + .map(String::from); + + let (primary_key_id, primary_key_name) = compute_primary_key_pair( + self.index.primary_key(wtxn)?, + &mut fields_ids_map, + alternative_name, + self.autogenerate_docids, + )?; + let mut obkv_buffer = Vec::new(); let mut documents_count = 0; let mut external_id_buffer = Vec::new(); let mut field_buffer: Vec<(u16, &[u8])> = Vec::new(); while let Some((addition_index, document)) = reader.next_document_with_index()? { let mut field_buffer_cache = drop_and_reuse(field_buffer); - if self.log_every_n.map_or(false, |len| documents_count % len == 0) { + if self.indexer_settings.log_every_n.map_or(false, |len| documents_count % len == 0) { progress_callback(UpdateIndexingStep::RemapDocumentAddition { documents_seen: documents_count, }); @@ -214,7 +228,7 @@ impl Transform<'_, '_> { } // We use the extracted/generated user id as the key for this document. - sorter.insert(&external_id.as_ref().as_bytes(), &obkv_buffer)?; + self.sorter.insert(&external_id.as_ref().as_bytes(), &obkv_buffer)?; documents_count += 1; progress_callback(UpdateIndexingStep::RemapDocumentAddition { @@ -230,38 +244,40 @@ impl Transform<'_, '_> { documents_seen: documents_count, }); + self.index.put_fields_ids_map(wtxn, &fields_ids_map)?; + self.index.put_primary_key(wtxn, &primary_key_name)?; + self.documents_count += documents_count; // Now that we have a valid sorter that contains the user id and the obkv we // give it to the last transforming function which returns the TransformOutput. - self.output_from_sorter( - sorter, - primary_key_name, - fields_ids_map, - documents_count, - progress_callback, - ) + Ok(documents_count) } /// Generate the `TransformOutput` based on the given sorter that can be generated from any /// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document /// id for the user side and the value must be an obkv where keys are valid fields ids. - fn output_from_sorter( + pub(crate) fn output_from_sorter( self, - sorter: grenad::Sorter, - primary_key: String, - fields_ids_map: FieldsIdsMap, - approximate_number_of_documents: usize, + wtxn: &mut heed::RwTxn, progress_callback: F, ) -> Result where F: Fn(UpdateIndexingStep) + Sync, { - let mut external_documents_ids = self.index.external_documents_ids(self.rtxn).unwrap(); - let documents_ids = self.index.documents_ids(self.rtxn)?; - let mut field_distribution = self.index.field_distribution(self.rtxn)?; + let primary_key = self + .index + .primary_key(&wtxn)? + .ok_or(Error::UserError(UserError::MissingPrimaryKey))? + .to_string(); + let fields_ids_map = self.index.fields_ids_map(wtxn)?; + let approximate_number_of_documents = self.documents_count; + + let mut external_documents_ids = self.index.external_documents_ids(wtxn).unwrap(); + let documents_ids = self.index.documents_ids(wtxn)?; + let mut field_distribution = self.index.field_distribution(wtxn)?; let mut available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids); // consume sorter, in order to free the internal allocation, before creating a new one. - let mut iter = sorter.into_merger_iter()?; + let mut iter = self.sorter.into_merger_iter()?; // Once we have sort and deduplicated the documents we write them into a final file. let mut final_sorter = create_sorter( @@ -272,10 +288,10 @@ impl Transform<'_, '_> { Err(InternalError::IndexingMergingKeys { process: "documents" }.into()) } }, - self.chunk_compression_type, - self.chunk_compression_level, - self.max_nb_chunks, - self.max_memory, + self.indexer_settings.chunk_compression_type, + self.indexer_settings.chunk_compression_level, + self.indexer_settings.max_nb_chunks, + self.indexer_settings.max_memory, ); let mut new_external_documents_ids_builder = fst::MapBuilder::memory(); let mut replaced_documents_ids = RoaringBitmap::new(); @@ -285,7 +301,7 @@ impl Transform<'_, '_> { // While we write into final file we get or generate the internal documents ids. let mut documents_count = 0; while let Some((external_id, update_obkv)) = iter.next()? { - if self.log_every_n.map_or(false, |len| documents_count % len == 0) { + if self.indexer_settings.log_every_n.map_or(false, |len| documents_count % len == 0) { progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments { documents_seen: documents_count, total_documents: approximate_number_of_documents, @@ -299,7 +315,7 @@ impl Transform<'_, '_> { replaced_documents_ids.insert(docid); let key = BEU32::new(docid); - let base_obkv = self.index.documents.get(&self.rtxn, &key)?.ok_or( + let base_obkv = self.index.documents.get(wtxn, &key)?.ok_or( InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None, @@ -359,8 +375,11 @@ impl Transform<'_, '_> { // We create a final writer to write the new documents in order from the sorter. let file = tempfile::tempfile()?; - let mut writer = - create_writer(self.chunk_compression_type, self.chunk_compression_level, file)?; + let mut writer = create_writer( + self.indexer_settings.chunk_compression_type, + self.indexer_settings.chunk_compression_level, + file, + )?; // Once we have written all the documents into the final sorter, we write the documents // into this writer, extract the file and reset the seek to be able to read it again. @@ -392,22 +411,28 @@ impl Transform<'_, '_> { // TODO this can be done in parallel by using the rayon `ThreadPool`. pub fn remap_index_documents( self, - primary_key: String, + wtxn: &mut heed::RwTxn, old_fields_ids_map: FieldsIdsMap, new_fields_ids_map: FieldsIdsMap, ) -> Result { - let field_distribution = self.index.field_distribution(self.rtxn)?; - let external_documents_ids = self.index.external_documents_ids(self.rtxn)?; - let documents_ids = self.index.documents_ids(self.rtxn)?; + // There already has been a document addition, the primary key should be set by now. + let primary_key = + self.index.primary_key(wtxn)?.ok_or(UserError::MissingPrimaryKey)?.to_string(); + let field_distribution = self.index.field_distribution(wtxn)?; + let external_documents_ids = self.index.external_documents_ids(wtxn)?; + let documents_ids = self.index.documents_ids(wtxn)?; let documents_count = documents_ids.len() as usize; // We create a final writer to write the new documents in order from the sorter. let file = tempfile::tempfile()?; - let mut writer = - create_writer(self.chunk_compression_type, self.chunk_compression_level, file)?; + let mut writer = create_writer( + self.indexer_settings.chunk_compression_type, + self.indexer_settings.chunk_compression_level, + file, + )?; let mut obkv_buffer = Vec::new(); - for result in self.index.documents.iter(self.rtxn)? { + for result in self.index.documents.iter(wtxn)? { let (docid, obkv) = result?; let docid = docid.get(); diff --git a/milli/src/update/indexer_config.rs b/milli/src/update/indexer_config.rs new file mode 100644 index 000000000..af7211f90 --- /dev/null +++ b/milli/src/update/indexer_config.rs @@ -0,0 +1,29 @@ +use grenad::CompressionType; +use rayon::ThreadPool; + +#[derive(Debug)] +pub struct IndexerConfig { + pub log_every_n: Option, + pub max_nb_chunks: Option, + pub documents_chunk_size: Option, + pub max_memory: Option, + pub chunk_compression_type: CompressionType, + pub chunk_compression_level: Option, + pub thread_pool: Option, + pub max_positions_per_attributes: Option, +} + +impl Default for IndexerConfig { + fn default() -> Self { + Self { + log_every_n: None, + max_nb_chunks: None, + documents_chunk_size: None, + max_memory: None, + chunk_compression_type: CompressionType::None, + chunk_compression_level: None, + thread_pool: None, + max_positions_per_attributes: None, + } + } +} diff --git a/milli/src/update/mod.rs b/milli/src/update/mod.rs index 3dd8abd28..965ed4fd2 100644 --- a/milli/src/update/mod.rs +++ b/milli/src/update/mod.rs @@ -2,9 +2,11 @@ pub use self::available_documents_ids::AvailableDocumentsIds; pub use self::clear_documents::ClearDocuments; pub use self::delete_documents::{DeleteDocuments, DocumentDeletionResult}; pub use self::facets::Facets; -pub use self::index_documents::{DocumentAdditionResult, IndexDocuments, IndexDocumentsMethod}; +pub use self::index_documents::{ + DocumentAdditionResult, IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, +}; +pub use self::indexer_config::IndexerConfig; pub use self::settings::{Setting, Settings}; -pub use self::update_builder::UpdateBuilder; pub use self::update_step::UpdateIndexingStep; pub use self::word_prefix_docids::WordPrefixDocids; pub use self::word_prefix_pair_proximity_docids::WordPrefixPairProximityDocids; @@ -16,8 +18,8 @@ mod clear_documents; mod delete_documents; mod facets; mod index_documents; +mod indexer_config; mod settings; -mod update_builder; mod update_step; mod word_prefix_docids; mod word_prefix_pair_proximity_docids; diff --git a/milli/src/update/settings.rs b/milli/src/update/settings.rs index fff5eb0fa..91ef187f5 100644 --- a/milli/src/update/settings.rs +++ b/milli/src/update/settings.rs @@ -2,15 +2,15 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use std::result::Result as StdResult; use chrono::Utc; -use grenad::CompressionType; use itertools::Itertools; use meilisearch_tokenizer::{Analyzer, AnalyzerConfig}; -use rayon::ThreadPool; use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use super::index_documents::{IndexDocumentsConfig, Transform}; +use super::IndexerConfig; use crate::criterion::Criterion; use crate::error::UserError; -use crate::update::index_documents::{IndexDocumentsMethod, Transform}; +use crate::update::index_documents::IndexDocumentsMethod; use crate::update::{ClearDocuments, IndexDocuments, UpdateIndexingStep}; use crate::{FieldsIdsMap, Index, Result}; @@ -77,14 +77,8 @@ impl<'de, T: Deserialize<'de>> Deserialize<'de> for Setting { pub struct Settings<'a, 't, 'u, 'i> { wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index, - pub(crate) log_every_n: Option, - pub(crate) max_nb_chunks: Option, - pub(crate) max_memory: Option, - pub(crate) documents_chunk_size: Option, - pub(crate) chunk_compression_type: CompressionType, - pub(crate) chunk_compression_level: Option, - pub(crate) thread_pool: Option<&'a ThreadPool>, - pub(crate) max_positions_per_attributes: Option, + + indexer_config: &'a IndexerConfig, searchable_fields: Setting>, displayed_fields: Setting>, @@ -98,17 +92,14 @@ pub struct Settings<'a, 't, 'u, 'i> { } impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { - pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> Settings<'a, 't, 'u, 'i> { + pub fn new( + wtxn: &'t mut heed::RwTxn<'i, 'u>, + index: &'i Index, + indexer_config: &'a IndexerConfig, + ) -> Settings<'a, 't, 'u, 'i> { Settings { wtxn, index, - log_every_n: None, - max_nb_chunks: None, - max_memory: None, - documents_chunk_size: None, - chunk_compression_type: CompressionType::None, - chunk_compression_level: None, - thread_pool: None, searchable_fields: Setting::NotSet, displayed_fields: Setting::NotSet, filterable_fields: Setting::NotSet, @@ -118,14 +109,10 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { distinct_field: Setting::NotSet, synonyms: Setting::NotSet, primary_key: Setting::NotSet, - max_positions_per_attributes: None, + indexer_config, } } - pub fn log_every_n(&mut self, n: usize) { - self.log_every_n = Some(n); - } - pub fn reset_searchable_fields(&mut self) { self.searchable_fields = Setting::Reset; } @@ -210,25 +197,16 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { return Ok(()); } - let transform = Transform { - rtxn: &self.wtxn, - index: self.index, - log_every_n: self.log_every_n, - chunk_compression_type: self.chunk_compression_type, - chunk_compression_level: self.chunk_compression_level, - max_nb_chunks: self.max_nb_chunks, - max_memory: self.max_memory, - index_documents_method: IndexDocumentsMethod::ReplaceDocuments, - autogenerate_docids: false, - }; - - // There already has been a document addition, the primary key should be set by now. - let primary_key = - self.index.primary_key(&self.wtxn)?.ok_or(UserError::MissingPrimaryKey)?; + let transform = Transform::new( + &self.index, + &self.indexer_config, + IndexDocumentsMethod::ReplaceDocuments, + false, + ); // We remap the documents fields based on the new `FieldsIdsMap`. let output = transform.remap_index_documents( - primary_key.to_string(), + self.wtxn, old_fields_ids_map, fields_ids_map.clone(), )?; @@ -238,16 +216,14 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { // We index the generated `TransformOutput` which must contain // all the documents with fields in the newly defined searchable order. - let mut indexing_builder = IndexDocuments::new(self.wtxn, self.index); - indexing_builder.log_every_n = self.log_every_n; - indexing_builder.max_nb_chunks = self.max_nb_chunks; - indexing_builder.max_memory = self.max_memory; - indexing_builder.documents_chunk_size = self.documents_chunk_size; - indexing_builder.chunk_compression_type = self.chunk_compression_type; - indexing_builder.chunk_compression_level = self.chunk_compression_level; - indexing_builder.thread_pool = self.thread_pool; - indexing_builder.max_positions_per_attributes = self.max_positions_per_attributes; - indexing_builder.execute_raw(output, &cb)?; + let indexing_builder = IndexDocuments::new( + self.wtxn, + self.index, + &self.indexer_config, + IndexDocumentsConfig::default(), + &cb, + ); + indexing_builder.execute_raw(output)?; Ok(()) } @@ -535,13 +511,17 @@ mod tests { { "id": 2, "name": "kevina", "age": 21}, { "id": 3, "name": "benoit", "age": 34 } ]); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = IndexDocumentsConfig::default(); + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // We change the searchable fields to be the "name" field only. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_searchable_fields(vec!["name".into()]); builder.execute(|_| ()).unwrap(); wtxn.commit().unwrap(); @@ -563,7 +543,7 @@ mod tests { // We change the searchable fields to be the "name" field only. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.reset_searchable_fields(); builder.execute(|_| ()).unwrap(); wtxn.commit().unwrap(); @@ -592,15 +572,19 @@ mod tests { { "name": "kevina", "age": 21 }, { "name": "benoit", "age": 34 } ]); - let mut builder = IndexDocuments::new(&mut wtxn, &index); - builder.enable_autogenerate_docids(); - builder.execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = + IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // In the same transaction we change the displayed fields to be only the "age". // We also change the searchable fields to be the "name" field only. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_displayed_fields(vec!["age".into()]); builder.set_searchable_fields(vec!["name".into()]); builder.execute(|_| ()).unwrap(); @@ -614,7 +598,7 @@ mod tests { // We change the searchable fields to be the "name" field only. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.reset_searchable_fields(); builder.execute(|_| ()).unwrap(); wtxn.commit().unwrap(); @@ -639,9 +623,13 @@ mod tests { { "name": "kevina", "age": 21 }, { "name": "benoit", "age": 34 } ]); - let mut builder = IndexDocuments::new(&mut wtxn, &index); - builder.enable_autogenerate_docids(); - builder.execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = + IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // Check that the displayed fields are correctly set to `None` (default value). @@ -664,12 +652,16 @@ mod tests { { "name": "kevina", "age": 21 }, { "name": "benoit", "age": 34 } ]); - let mut builder = IndexDocuments::new(&mut wtxn, &index); - builder.enable_autogenerate_docids(); - builder.execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = + IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); // In the same transaction we change the displayed fields to be only the age. - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_displayed_fields(vec!["age".into()]); builder.execute(|_| ()).unwrap(); wtxn.commit().unwrap(); @@ -682,7 +674,7 @@ mod tests { // We reset the fields ids to become `None`, the default value. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.reset_displayed_fields(); builder.execute(|_| ()).unwrap(); wtxn.commit().unwrap(); @@ -700,9 +692,11 @@ mod tests { options.map_size(10 * 1024 * 1024); // 10 MB let index = Index::new(options, &path).unwrap(); + let config = IndexerConfig::default(); + // Set the filterable fields to be the age. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_filterable_fields(hashset! { S("age") }); builder.execute(|_| ()).unwrap(); @@ -712,9 +706,12 @@ mod tests { { "name": "kevina", "age": 21 }, { "name": "benoit", "age": 34 } ]); - let mut builder = IndexDocuments::new(&mut wtxn, &index); - builder.enable_autogenerate_docids(); - builder.execute(content, |_| ()).unwrap(); + let indexing_config = + IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // Check that the displayed fields are correctly set. @@ -749,9 +746,12 @@ mod tests { { "name": "benoit", "age": 35 } ]); - let mut builder = IndexDocuments::new(&mut wtxn, &index); - builder.enable_autogenerate_docids(); - builder.execute(content, |_| ()).unwrap(); + let indexing_config = + IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); @@ -771,10 +771,11 @@ mod tests { let mut options = EnvOpenOptions::new(); options.map_size(10 * 1024 * 1024); // 10 MB let index = Index::new(options, &path).unwrap(); + let config = IndexerConfig::default(); // Set the filterable fields to be the age. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); // Don't display the generated `id` field. builder.set_displayed_fields(vec![S("name")]); builder.set_criteria(vec![S("age:asc")]); @@ -786,9 +787,12 @@ mod tests { { "name": "kevina", "age": 21 }, { "name": "benoit", "age": 34 } ]); - let mut builder = IndexDocuments::new(&mut wtxn, &index); - builder.enable_autogenerate_docids(); - builder.execute(content, |_| ()).unwrap(); + let indexing_config = + IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // Run an empty query just to ensure that the search results are ordered. @@ -813,10 +817,11 @@ mod tests { let mut options = EnvOpenOptions::new(); options.map_size(10 * 1024 * 1024); // 10 MB let index = Index::new(options, &path).unwrap(); + let config = IndexerConfig::default(); // Set the filterable fields to be the age. let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); // Don't display the generated `id` field. builder.set_displayed_fields(vec![S("name"), S("age")]); builder.set_distinct_field(S("age")); @@ -832,9 +837,12 @@ mod tests { { "name": "bernie", "age": 34 }, { "name": "ben", "age": 34 } ]); - let mut builder = IndexDocuments::new(&mut wtxn, &index); - builder.enable_autogenerate_docids(); - builder.execute(content, |_| ()).unwrap(); + let indexing_config = + IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // Run an empty query just to ensure that the search results are ordered. @@ -859,9 +867,13 @@ mod tests { { "name": "kevina", "age": 21 }, { "name": "benoit", "age": 34 } ]); - let mut builder = IndexDocuments::new(&mut wtxn, &index); - builder.enable_autogenerate_docids(); - builder.execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = + IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // Ensure there is no stop_words by default @@ -884,12 +896,16 @@ mod tests { { "name": "kevina", "age": 21, "maxim": "Doggos are the best" }, { "name": "benoit", "age": 34, "maxim": "The crepes are really good" }, ]); - let mut builder = IndexDocuments::new(&mut wtxn, &index); - builder.enable_autogenerate_docids(); - builder.execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = + IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); // In the same transaction we provide some stop_words - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); let set = btreeset! { "i".to_string(), "the".to_string(), "are".to_string() }; builder.set_stop_words(set.clone()); builder.execute(|_| ()).unwrap(); @@ -920,7 +936,7 @@ mod tests { // now we'll reset the stop_words and ensure it's None let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.reset_stop_words(); builder.execute(|_| ()).unwrap(); wtxn.commit().unwrap(); @@ -958,12 +974,16 @@ mod tests { { "name": "kevina", "age": 21, "maxim": "Doggos are the best"}, { "name": "benoit", "age": 34, "maxim": "The crepes are really good"}, ]); - let mut builder = IndexDocuments::new(&mut wtxn, &index); - builder.enable_autogenerate_docids(); - builder.execute(content, |_| ()).unwrap(); + let config = IndexerConfig::default(); + let indexing_config = + IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); // In the same transaction provide some synonyms - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_synonyms(hashmap! { "blini".to_string() => vec!["crepes".to_string()], "super like".to_string() => vec!["love".to_string()], @@ -987,7 +1007,7 @@ mod tests { // Reset the synonyms let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.reset_synonyms(); builder.execute(|_| ()).unwrap(); wtxn.commit().unwrap(); @@ -1012,10 +1032,11 @@ mod tests { let mut options = EnvOpenOptions::new(); options.map_size(10 * 1024 * 1024); // 10 MB let index = Index::new(options, &path).unwrap(); + let config = IndexerConfig::default(); // Set all the settings except searchable let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_displayed_fields(vec!["hello".to_string()]); builder.set_filterable_fields(hashset! { S("age"), S("toto") }); builder.set_criteria(vec!["toto:asc".to_string()]); @@ -1032,7 +1053,7 @@ mod tests { // We set toto and age as searchable to force reordering of the fields let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_searchable_fields(vec!["toto".to_string(), "age".to_string()]); builder.execute(|_| ()).unwrap(); wtxn.commit().unwrap(); @@ -1049,10 +1070,11 @@ mod tests { let mut options = EnvOpenOptions::new(); options.map_size(10 * 1024 * 1024); // 10 MB let index = Index::new(options, &path).unwrap(); + let config = IndexerConfig::default(); // Set all the settings except searchable let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_displayed_fields(vec!["hello".to_string()]); // It is only Asc(toto), there is a facet database but it is denied to filter with toto. builder.set_criteria(vec!["toto:asc".to_string()]); @@ -1070,10 +1092,11 @@ mod tests { let mut options = EnvOpenOptions::new(); options.map_size(10 * 1024 * 1024); // 10 MB let index = Index::new(options, &path).unwrap(); + let config = IndexerConfig::default(); // Set the primary key settings let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_primary_key(S("mykey")); builder.execute(|_| ()).unwrap(); @@ -1089,14 +1112,17 @@ mod tests { { "mykey": 6, "name": "bernie", "age": 34 }, { "mykey": 7, "name": "ben", "age": 34 } ]); - let mut builder = IndexDocuments::new(&mut wtxn, &index); - builder.disable_autogenerate_docids(); - builder.execute(content, |_| ()).unwrap(); + let indexing_config = + IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // We now try to reset the primary key let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.reset_primary_key(); let err = builder.execute(|_| ()).unwrap_err(); @@ -1109,7 +1135,7 @@ mod tests { builder.execute().unwrap(); // ...we can change the primary key - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_primary_key(S("myid")); builder.execute(|_| ()).unwrap(); wtxn.commit().unwrap(); @@ -1121,10 +1147,11 @@ mod tests { let mut options = EnvOpenOptions::new(); options.map_size(10 * 1024 * 1024); // 10 MB let index = Index::new(options, &path).unwrap(); + let config = IndexerConfig::default(); // Set the genres setting let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_filterable_fields(hashset! { S("genres") }); builder.execute(|_| ()).unwrap(); @@ -1147,8 +1174,12 @@ mod tests { "release_date": 819676800 } ]); - let builder = IndexDocuments::new(&mut wtxn, &index); - builder.execute(content, |_| ()).unwrap(); + let indexing_config = + IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; + let mut builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ()); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); // We now try to reset the primary key diff --git a/milli/src/update/update_builder.rs b/milli/src/update/update_builder.rs deleted file mode 100644 index 6e892a356..000000000 --- a/milli/src/update/update_builder.rs +++ /dev/null @@ -1,130 +0,0 @@ -use grenad::CompressionType; -use rayon::ThreadPool; - -use super::{ClearDocuments, DeleteDocuments, Facets, IndexDocuments, Settings}; -use crate::{Index, Result}; - -pub struct UpdateBuilder<'a> { - pub(crate) log_every_n: Option, - pub(crate) max_nb_chunks: Option, - pub(crate) documents_chunk_size: Option, - pub(crate) max_memory: Option, - pub(crate) chunk_compression_type: CompressionType, - pub(crate) chunk_compression_level: Option, - pub(crate) thread_pool: Option<&'a ThreadPool>, - pub(crate) max_positions_per_attributes: Option, -} - -impl<'a> UpdateBuilder<'a> { - pub fn new() -> UpdateBuilder<'a> { - UpdateBuilder { - log_every_n: None, - max_nb_chunks: None, - documents_chunk_size: None, - max_memory: None, - chunk_compression_type: CompressionType::None, - chunk_compression_level: None, - thread_pool: None, - max_positions_per_attributes: None, - } - } - - pub fn log_every_n(&mut self, log_every_n: usize) { - self.log_every_n = Some(log_every_n); - } - - pub fn max_nb_chunks(&mut self, max_nb_chunks: usize) { - self.max_nb_chunks = Some(max_nb_chunks); - } - - pub fn max_memory(&mut self, max_memory: usize) { - self.max_memory = Some(max_memory); - } - - pub fn documents_chunk_size(&mut self, documents_chunk_size: usize) { - self.documents_chunk_size = Some(documents_chunk_size); - } - - pub fn chunk_compression_type(&mut self, chunk_compression_type: CompressionType) { - self.chunk_compression_type = chunk_compression_type; - } - - pub fn chunk_compression_level(&mut self, chunk_compression_level: u32) { - self.chunk_compression_level = Some(chunk_compression_level); - } - - pub fn thread_pool(&mut self, thread_pool: &'a ThreadPool) { - self.thread_pool = Some(thread_pool); - } - - pub fn max_positions_per_attributes(&mut self, max_positions_per_attributes: u32) { - self.max_positions_per_attributes = Some(max_positions_per_attributes); - } - - pub fn clear_documents<'t, 'u, 'i>( - self, - wtxn: &'t mut heed::RwTxn<'i, 'u>, - index: &'i Index, - ) -> ClearDocuments<'t, 'u, 'i> { - ClearDocuments::new(wtxn, index) - } - - pub fn delete_documents<'t, 'u, 'i>( - self, - wtxn: &'t mut heed::RwTxn<'i, 'u>, - index: &'i Index, - ) -> Result> { - DeleteDocuments::new(wtxn, index) - } - - pub fn index_documents<'t, 'u, 'i>( - self, - wtxn: &'t mut heed::RwTxn<'i, 'u>, - index: &'i Index, - ) -> IndexDocuments<'t, 'u, 'i, 'a> { - let mut builder = IndexDocuments::new(wtxn, index); - - builder.log_every_n = self.log_every_n; - builder.max_nb_chunks = self.max_nb_chunks; - builder.max_memory = self.max_memory; - builder.documents_chunk_size = self.documents_chunk_size; - builder.chunk_compression_type = self.chunk_compression_type; - builder.chunk_compression_level = self.chunk_compression_level; - builder.thread_pool = self.thread_pool; - builder.max_positions_per_attributes = self.max_positions_per_attributes; - - builder - } - - pub fn settings<'t, 'u, 'i>( - self, - wtxn: &'t mut heed::RwTxn<'i, 'u>, - index: &'i Index, - ) -> Settings<'a, 't, 'u, 'i> { - let mut builder = Settings::new(wtxn, index); - - builder.log_every_n = self.log_every_n; - builder.max_nb_chunks = self.max_nb_chunks; - builder.max_memory = self.max_memory; - builder.documents_chunk_size = self.documents_chunk_size; - builder.chunk_compression_type = self.chunk_compression_type; - builder.chunk_compression_level = self.chunk_compression_level; - builder.thread_pool = self.thread_pool; - builder.max_positions_per_attributes = self.max_positions_per_attributes; - - builder - } - - pub fn facets<'t, 'u, 'i>( - self, - wtxn: &'t mut heed::RwTxn<'i, 'u>, - index: &'i Index, - ) -> Facets<'t, 'u, 'i> { - let mut builder = Facets::new(wtxn, index); - - builder.chunk_compression_type = self.chunk_compression_type; - builder.chunk_compression_level = self.chunk_compression_level; - - builder - } -} diff --git a/milli/tests/search/distinct.rs b/milli/tests/search/distinct.rs index da7251389..631618f73 100644 --- a/milli/tests/search/distinct.rs +++ b/milli/tests/search/distinct.rs @@ -16,7 +16,8 @@ macro_rules! test_distinct { // update distinct attribute let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let config = milli::update::IndexerConfig::default(); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_distinct_field(S(stringify!($distinct))); builder.execute(|_| ()).unwrap(); wtxn.commit().unwrap(); diff --git a/milli/tests/search/mod.rs b/milli/tests/search/mod.rs index d1467fd72..31d53b666 100644 --- a/milli/tests/search/mod.rs +++ b/milli/tests/search/mod.rs @@ -7,7 +7,7 @@ use either::{Either, Left, Right}; use heed::EnvOpenOptions; use maplit::{hashmap, hashset}; use milli::documents::{DocumentBatchBuilder, DocumentBatchReader}; -use milli::update::{Settings, UpdateBuilder}; +use milli::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig, Settings}; use milli::{AscDesc, Criterion, DocumentId, Index, Member}; use serde::Deserialize; use slice_group_by::GroupBy; @@ -31,8 +31,9 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { let index = Index::new(options, &path).unwrap(); let mut wtxn = index.write_txn().unwrap(); + let config = IndexerConfig::default(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); let criteria = criteria.iter().map(|c| c.to_string()).collect(); builder.set_criteria(criteria); @@ -54,10 +55,10 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { builder.execute(|_| ()).unwrap(); // index documents - let mut builder = UpdateBuilder::new(); - builder.max_memory(10 * 1024 * 1024); // 10MiB - let mut builder = builder.index_documents(&mut wtxn, &index); - builder.enable_autogenerate_docids(); + let config = IndexerConfig { max_memory: Some(10 * 1024 * 1024), ..Default::default() }; + let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; + + let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); let mut cursor = Cursor::new(Vec::new()); let mut documents_builder = DocumentBatchBuilder::new(&mut cursor).unwrap(); let reader = Cursor::new(CONTENT.as_bytes()); @@ -73,7 +74,8 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { // index documents let content = DocumentBatchReader::from_reader(cursor).unwrap(); - builder.execute(content, |_| ()).unwrap(); + builder.add_documents(content).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); diff --git a/milli/tests/search/query_criteria.rs b/milli/tests/search/query_criteria.rs index 8968eff90..0dcbf660e 100644 --- a/milli/tests/search/query_criteria.rs +++ b/milli/tests/search/query_criteria.rs @@ -6,7 +6,7 @@ use heed::EnvOpenOptions; use itertools::Itertools; use maplit::hashset; use milli::documents::{DocumentBatchBuilder, DocumentBatchReader}; -use milli::update::{Settings, UpdateBuilder}; +use milli::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig, Settings}; use milli::{AscDesc, Criterion, Index, Member, Search, SearchResult}; use rand::Rng; use Criterion::*; @@ -337,11 +337,12 @@ fn criteria_mixup() { ] }; + let config = IndexerConfig::default(); for criteria in criteria_mix { eprintln!("Testing with criteria order: {:?}", &criteria); //update criteria let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_criteria(criteria.iter().map(ToString::to_string).collect()); builder.execute(|_| ()).unwrap(); wtxn.commit().unwrap(); @@ -375,8 +376,9 @@ fn criteria_ascdesc() { let index = Index::new(options, &path).unwrap(); let mut wtxn = index.write_txn().unwrap(); + let config = IndexerConfig::default(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_sortable_fields(hashset! { S("name"), @@ -385,10 +387,9 @@ fn criteria_ascdesc() { builder.execute(|_| ()).unwrap(); // index documents - let mut builder = UpdateBuilder::new(); - builder.max_memory(10 * 1024 * 1024); // 10MiB - let mut builder = builder.index_documents(&mut wtxn, &index); - builder.enable_autogenerate_docids(); + let config = IndexerConfig { max_memory: Some(10 * 1024 * 1024), ..Default::default() }; + let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; + let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()); let mut cursor = Cursor::new(Vec::new()); let mut batch_builder = DocumentBatchBuilder::new(&mut cursor).unwrap(); @@ -419,7 +420,8 @@ fn criteria_ascdesc() { let reader = DocumentBatchReader::from_reader(cursor).unwrap(); - builder.execute(reader, |_| ()).unwrap(); + builder.add_documents(reader).unwrap(); + builder.execute().unwrap(); wtxn.commit().unwrap(); @@ -430,7 +432,7 @@ fn criteria_ascdesc() { eprintln!("Testing with criterion: {:?}", &criterion); let mut wtxn = index.write_txn().unwrap(); - let mut builder = Settings::new(&mut wtxn, &index); + let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_criteria(vec![criterion.to_string()]); builder.execute(|_| ()).unwrap(); wtxn.commit().unwrap();