430: Document batch support r=Kerollmops a=MarinPostma

This pr adds support for document batches in milli. It changes the API of the `IndexDocuments` builder by adding a `add_documents` method. The API of the updates is changed a little, with the `UpdateBuilder` being renamed to `IndexerConfig` and being passed to the update builders. This makes it easier to pass around structs that need to access the indexer config, rather that extracting the fields each time. This change impacts many function signatures and simplify them.

The change in not thorough, and may require another PR to propagate to the whole codebase. I restricted to the necessary for this PR.


Co-authored-by: Marin Postma <postma.marin@protonmail.com>
This commit is contained in:
bors[bot] 2022-01-19 13:32:59 +00:00 committed by GitHub
commit 8433516d85
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 912 additions and 803 deletions

View File

@ -6,7 +6,7 @@ use std::path::Path;
use criterion::{criterion_group, criterion_main, Criterion};
use heed::EnvOpenOptions;
use milli::update::UpdateBuilder;
use milli::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig, Settings};
use milli::Index;
#[cfg(target_os = "linux")]
@ -39,9 +39,9 @@ fn indexing_songs_default(c: &mut Criterion) {
move || {
let index = setup_index();
let update_builder = UpdateBuilder::new();
let config = IndexerConfig::default();
let mut wtxn = index.write_txn().unwrap();
let mut builder = update_builder.settings(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_primary_key("id".to_owned());
let displayed_fields =
@ -66,12 +66,15 @@ fn indexing_songs_default(c: &mut Criterion) {
index
},
move |index| {
let update_builder = UpdateBuilder::new();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut wtxn = index.write_txn().unwrap();
let builder = update_builder.index_documents(&mut wtxn, &index);
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv");
builder.execute(documents, |_| ()).unwrap();
builder.add_documents(documents).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
index.prepare_for_closing().wait();
@ -88,9 +91,9 @@ fn indexing_songs_without_faceted_numbers(c: &mut Criterion) {
move || {
let index = setup_index();
let update_builder = UpdateBuilder::new();
let config = IndexerConfig::default();
let mut wtxn = index.write_txn().unwrap();
let mut builder = update_builder.settings(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_primary_key("id".to_owned());
let displayed_fields =
@ -112,12 +115,16 @@ fn indexing_songs_without_faceted_numbers(c: &mut Criterion) {
index
},
move |index| {
let update_builder = UpdateBuilder::new();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut wtxn = index.write_txn().unwrap();
let builder = update_builder.index_documents(&mut wtxn, &index);
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv");
builder.execute(documents, |_| ()).unwrap();
builder.add_documents(documents).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
index.prepare_for_closing().wait();
@ -134,9 +141,9 @@ fn indexing_songs_without_faceted_fields(c: &mut Criterion) {
move || {
let index = setup_index();
let update_builder = UpdateBuilder::new();
let config = IndexerConfig::default();
let mut wtxn = index.write_txn().unwrap();
let mut builder = update_builder.settings(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_primary_key("id".to_owned());
let displayed_fields =
@ -154,12 +161,15 @@ fn indexing_songs_without_faceted_fields(c: &mut Criterion) {
index
},
move |index| {
let update_builder = UpdateBuilder::new();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut wtxn = index.write_txn().unwrap();
let builder = update_builder.index_documents(&mut wtxn, &index);
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv");
builder.execute(documents, |_| ()).unwrap();
builder.add_documents(documents).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
index.prepare_for_closing().wait();
@ -176,9 +186,9 @@ fn indexing_wiki(c: &mut Criterion) {
move || {
let index = setup_index();
let update_builder = UpdateBuilder::new();
let config = IndexerConfig::default();
let mut wtxn = index.write_txn().unwrap();
let mut builder = update_builder.settings(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_primary_key("id".to_owned());
let displayed_fields =
@ -195,13 +205,16 @@ fn indexing_wiki(c: &mut Criterion) {
index
},
move |index| {
let update_builder = UpdateBuilder::new();
let config = IndexerConfig::default();
let indexing_config =
IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() };
let mut wtxn = index.write_txn().unwrap();
let mut builder = update_builder.index_documents(&mut wtxn, &index);
builder.enable_autogenerate_docids();
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES, "csv");
builder.execute(documents, |_| ()).unwrap();
builder.add_documents(documents).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
index.prepare_for_closing().wait();
@ -218,9 +231,9 @@ fn indexing_movies_default(c: &mut Criterion) {
move || {
let index = setup_index();
let update_builder = UpdateBuilder::new();
let config = IndexerConfig::default();
let mut wtxn = index.write_txn().unwrap();
let mut builder = update_builder.settings(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_primary_key("id".to_owned());
let displayed_fields = ["title", "poster", "overview", "release_date", "genres"]
@ -242,12 +255,15 @@ fn indexing_movies_default(c: &mut Criterion) {
index
},
move |index| {
let update_builder = UpdateBuilder::new();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut wtxn = index.write_txn().unwrap();
let builder = update_builder.index_documents(&mut wtxn, &index);
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
let documents = utils::documents_from(datasets_paths::MOVIES, "json");
builder.execute(documents, |_| ()).unwrap();
builder.add_documents(documents).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
index.prepare_for_closing().wait();
@ -264,9 +280,9 @@ fn indexing_geo(c: &mut Criterion) {
move || {
let index = setup_index();
let update_builder = UpdateBuilder::new();
let config = IndexerConfig::default();
let mut wtxn = index.write_txn().unwrap();
let mut builder = update_builder.settings(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_primary_key("geonameid".to_owned());
let displayed_fields =
@ -293,12 +309,15 @@ fn indexing_geo(c: &mut Criterion) {
index
},
move |index| {
let update_builder = UpdateBuilder::new();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut wtxn = index.write_txn().unwrap();
let builder = update_builder.index_documents(&mut wtxn, &index);
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
let documents = utils::documents_from(datasets_paths::SMOL_ALL_COUNTRIES, "jsonl");
builder.execute(documents, |_| ()).unwrap();
builder.add_documents(documents).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();

View File

@ -8,7 +8,9 @@ use std::path::Path;
use criterion::BenchmarkId;
use heed::EnvOpenOptions;
use milli::documents::DocumentBatchReader;
use milli::update::{IndexDocumentsMethod, Settings, UpdateBuilder};
use milli::update::{
IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings,
};
use milli::{Filter, Index};
use serde_json::{Map, Value};
@ -65,9 +67,9 @@ pub fn base_setup(conf: &Conf) -> Index {
options.max_readers(10);
let index = Index::new(options, conf.database_name).unwrap();
let update_builder = UpdateBuilder::new();
let config = IndexerConfig::default();
let mut wtxn = index.write_txn().unwrap();
let mut builder = update_builder.settings(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
if let Some(primary_key) = conf.primary_key {
builder.set_primary_key(primary_key.to_string());
@ -87,16 +89,19 @@ pub fn base_setup(conf: &Conf) -> Index {
builder.execute(|_| ()).unwrap();
wtxn.commit().unwrap();
let update_builder = UpdateBuilder::new();
let config = IndexerConfig::default();
let mut wtxn = index.write_txn().unwrap();
let mut builder = update_builder.index_documents(&mut wtxn, &index);
if let None = conf.primary_key {
builder.enable_autogenerate_docids();
}
let indexing_config = IndexDocumentsConfig {
autogenerate_docids: conf.primary_key.is_none(),
update_method: IndexDocumentsMethod::ReplaceDocuments,
..Default::default()
};
let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
let documents = documents_from(conf.dataset, conf.dataset_format);
builder.index_documents_method(IndexDocumentsMethod::ReplaceDocuments);
builder.execute(documents, |_| ()).unwrap();
builder.add_documents(documents).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
index

View File

@ -9,6 +9,7 @@ use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use milli::update::UpdateIndexingStep::{
ComputeIdsAndMergeDocuments, IndexDocuments, MergeDataIntoFinalDatabase, RemapDocumentAddition,
};
use milli::update::{IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig};
use structopt::StructOpt;
#[cfg(target_os = "linux")]
@ -122,18 +123,18 @@ impl DocumentAddition {
println!("Adding {} documents to the index.", reader.len());
let mut txn = index.env.write_txn()?;
let mut addition = milli::update::IndexDocuments::new(&mut txn, &index);
if self.update_documents {
addition.index_documents_method(milli::update::IndexDocumentsMethod::UpdateDocuments);
}
addition.log_every_n(100);
if self.autogen_docids {
addition.enable_autogenerate_docids()
}
let config = milli::update::IndexerConfig { log_every_n: Some(100), ..Default::default() };
let update_method = if self.update_documents {
IndexDocumentsMethod::UpdateDocuments
} else {
IndexDocumentsMethod::ReplaceDocuments
};
let indexing_config = IndexDocumentsConfig {
update_method,
autogenerate_docids: self.autogen_docids,
..Default::default()
};
let mut bars = Vec::new();
let progesses = MultiProgress::new();
for _ in 0..4 {
@ -141,12 +142,20 @@ impl DocumentAddition {
let bar = progesses.add(bar);
bars.push(bar);
}
let mut addition = milli::update::IndexDocuments::new(
&mut txn,
&index,
&config,
indexing_config,
|step| indexing_callback(step, &bars),
);
addition.add_documents(reader)?;
std::thread::spawn(move || {
progesses.join().unwrap();
});
let result = addition.execute(reader, |step| indexing_callback(step, &bars))?;
let result = addition.execute()?;
txn.commit()?;
@ -293,8 +302,9 @@ impl SettingsUpdate {
fn perform(&self, index: milli::Index) -> Result<()> {
let mut txn = index.env.write_txn()?;
let mut update = milli::update::Settings::new(&mut txn, &index);
update.log_every_n(100);
let config = IndexerConfig { log_every_n: Some(100), ..Default::default() };
let mut update = milli::update::Settings::new(&mut txn, &index, &config);
if let Some(ref filterable_attributes) = self.filterable_attributes {
if !filterable_attributes.is_empty() {

View File

@ -21,13 +21,14 @@ use heed::EnvOpenOptions;
use meilisearch_tokenizer::{Analyzer, AnalyzerConfig};
use milli::documents::DocumentBatchReader;
use milli::update::UpdateIndexingStep::*;
use milli::update::{IndexDocumentsMethod, Setting, UpdateBuilder};
use milli::update::{
ClearDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Setting,
};
use milli::{
obkv_to_json, CompressionType, Filter as MilliFilter, FilterCondition, Index, MatchingWords,
SearchResult, SortError,
};
use once_cell::sync::OnceCell;
use rayon::ThreadPool;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use structopt::StructOpt;
@ -44,7 +45,7 @@ use self::update_store::UpdateStore;
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
static GLOBAL_THREAD_POOL: OnceCell<ThreadPool> = OnceCell::new();
static GLOBAL_CONFIG: OnceCell<IndexerConfig> = OnceCell::new();
#[derive(Debug, StructOpt)]
/// The HTTP main server of the milli project.
@ -327,7 +328,19 @@ async fn main() -> anyhow::Result<()> {
// Setup the global thread pool
let jobs = opt.indexer.indexing_jobs.unwrap_or(0);
let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?;
GLOBAL_THREAD_POOL.set(pool).unwrap();
let config = IndexerConfig {
max_nb_chunks: opt.indexer.max_nb_chunks,
chunk_compression_level: opt.indexer.chunk_compression_level,
max_positions_per_attributes: opt.indexer.max_positions_per_attributes,
thread_pool: Some(pool),
log_every_n: Some(opt.indexer.log_every_n),
max_memory: Some(opt.indexer.max_memory.get_bytes() as usize),
chunk_compression_type: opt.indexer.chunk_compression_type.unwrap_or(CompressionType::None),
..Default::default()
};
GLOBAL_CONFIG.set(config).unwrap();
// Open the LMDB database.
let index = Index::new(options, &opt.database)?;
@ -342,209 +355,207 @@ async fn main() -> anyhow::Result<()> {
let (update_status_sender, _) = broadcast::channel(100);
let update_status_sender_cloned = update_status_sender.clone();
let index_cloned = index.clone();
let indexer_opt_cloned = opt.indexer.clone();
let update_store = UpdateStore::open(
update_store_options,
update_store_path,
// the type hint is necessary: https://github.com/rust-lang/rust/issues/32600
move |update_id, meta, content: &_| {
// We prepare the update by using the update builder.
let mut update_builder = UpdateBuilder::new();
if let Some(max_nb_chunks) = indexer_opt_cloned.max_nb_chunks {
update_builder.max_nb_chunks(max_nb_chunks);
}
if let Some(chunk_compression_level) = indexer_opt_cloned.chunk_compression_level {
update_builder.chunk_compression_level(chunk_compression_level);
}
if let Some(max_pos_per_attributes) = indexer_opt_cloned.max_positions_per_attributes {
update_builder.max_positions_per_attributes(max_pos_per_attributes);
}
update_builder.thread_pool(GLOBAL_THREAD_POOL.get().unwrap());
update_builder.log_every_n(indexer_opt_cloned.log_every_n);
update_builder.max_memory(indexer_opt_cloned.max_memory.get_bytes() as usize);
update_builder.chunk_compression_type(
indexer_opt_cloned.chunk_compression_type.unwrap_or(CompressionType::None),
);
let before_update = Instant::now();
// we extract the update type and execute the update itself.
let result: anyhow::Result<()> =
(|| match meta {
UpdateMeta::DocumentsAddition { method, format, encoding } => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let mut builder = update_builder.index_documents(&mut wtxn, &index_cloned);
builder.enable_autogenerate_docids();
let result: anyhow::Result<()> = (|| match meta {
UpdateMeta::DocumentsAddition { method, format, encoding } => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let update_method = match method.as_str() {
"replace" => IndexDocumentsMethod::ReplaceDocuments,
"update" => IndexDocumentsMethod::UpdateDocuments,
otherwise => panic!("invalid indexing method {:?}", otherwise),
};
let indexing_config = IndexDocumentsConfig {
update_method,
autogenerate_docids: true,
..Default::default()
};
match method.as_str() {
"replace" => builder
.index_documents_method(IndexDocumentsMethod::ReplaceDocuments),
"update" => builder
.index_documents_method(IndexDocumentsMethod::UpdateDocuments),
otherwise => panic!("invalid indexing method {:?}", otherwise),
let indexing_callback = |indexing_step| {
let (current, total) = match indexing_step {
RemapDocumentAddition { documents_seen } => (documents_seen, None),
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => {
(documents_seen, Some(total_documents))
}
IndexDocuments { documents_seen, total_documents } => {
(documents_seen, Some(total_documents))
}
MergeDataIntoFinalDatabase { databases_seen, total_databases } => {
(databases_seen, Some(total_databases))
}
};
let reader = match encoding.as_deref() {
Some("gzip") => Box::new(GzDecoder::new(content)),
None => Box::new(content) as Box<dyn io::Read>,
otherwise => panic!("invalid encoding format {:?}", otherwise),
};
let documents = match format.as_str() {
"csv" => documents_from_csv(reader)?,
"json" => documents_from_json(reader)?,
"jsonl" => documents_from_jsonl(reader)?,
otherwise => panic!("invalid update format {:?}", otherwise),
};
let documents = DocumentBatchReader::from_reader(Cursor::new(documents))?;
let result = builder.execute(documents, |indexing_step| {
let (current, total) = match indexing_step {
RemapDocumentAddition { documents_seen } => (documents_seen, None),
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => {
(documents_seen, Some(total_documents))
}
IndexDocuments { documents_seen, total_documents } => {
(documents_seen, Some(total_documents))
}
MergeDataIntoFinalDatabase { databases_seen, total_databases } => {
(databases_seen, Some(total_databases))
}
};
let _ = update_status_sender_cloned.send(UpdateStatus::Progressing {
update_id,
meta: UpdateMetaProgress::DocumentsAddition {
step: indexing_step.step(),
total_steps: indexing_step.number_of_steps(),
current,
total,
},
});
let _ = update_status_sender_cloned.send(UpdateStatus::Progressing {
update_id,
meta: UpdateMetaProgress::DocumentsAddition {
step: indexing_step.step(),
total_steps: indexing_step.number_of_steps(),
current,
total,
},
});
};
match result {
Ok(_) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
let mut builder = milli::update::IndexDocuments::new(
&mut wtxn,
&index_cloned,
GLOBAL_CONFIG.get().unwrap(),
indexing_config,
indexing_callback,
);
let reader = match encoding.as_deref() {
Some("gzip") => Box::new(GzDecoder::new(content)),
None => Box::new(content) as Box<dyn io::Read>,
otherwise => panic!("invalid encoding format {:?}", otherwise),
};
let documents = match format.as_str() {
"csv" => documents_from_csv(reader)?,
"json" => documents_from_json(reader)?,
"jsonl" => documents_from_jsonl(reader)?,
otherwise => panic!("invalid update format {:?}", otherwise),
};
let documents = DocumentBatchReader::from_reader(Cursor::new(documents))?;
builder.add_documents(documents)?;
let result = builder.execute();
match result {
Ok(_) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
UpdateMeta::ClearDocuments => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let builder = update_builder.clear_documents(&mut wtxn, &index_cloned);
}
UpdateMeta::ClearDocuments => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let builder = ClearDocuments::new(&mut wtxn, &index_cloned);
match builder.execute() {
Ok(_count) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
match builder.execute() {
Ok(_count) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
UpdateMeta::Settings(settings) => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let mut builder = update_builder.settings(&mut wtxn, &index_cloned);
}
UpdateMeta::Settings(settings) => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let mut builder = milli::update::Settings::new(
&mut wtxn,
&index_cloned,
GLOBAL_CONFIG.get().unwrap(),
);
// We transpose the settings JSON struct into a real setting update.
match settings.searchable_attributes {
Setting::Set(searchable_attributes) => {
builder.set_searchable_fields(searchable_attributes)
// We transpose the settings JSON struct into a real setting update.
match settings.searchable_attributes {
Setting::Set(searchable_attributes) => {
builder.set_searchable_fields(searchable_attributes)
}
Setting::Reset => builder.reset_searchable_fields(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.displayed_attributes {
Setting::Set(displayed_attributes) => {
builder.set_displayed_fields(displayed_attributes)
}
Setting::Reset => builder.reset_displayed_fields(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.filterable_attributes {
Setting::Set(filterable_attributes) => {
builder.set_filterable_fields(filterable_attributes)
}
Setting::Reset => builder.reset_filterable_fields(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.sortable_attributes {
Setting::Set(sortable_attributes) => {
builder.set_sortable_fields(sortable_attributes)
}
Setting::Reset => builder.reset_sortable_fields(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.criteria {
Setting::Set(criteria) => builder.set_criteria(criteria),
Setting::Reset => builder.reset_criteria(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.stop_words {
Setting::Set(stop_words) => builder.set_stop_words(stop_words),
Setting::Reset => builder.reset_stop_words(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.synonyms {
Setting::Set(synonyms) => builder.set_synonyms(synonyms),
Setting::Reset => builder.reset_synonyms(),
Setting::NotSet => (),
}
let result = builder.execute(|indexing_step| {
let (current, total) = match indexing_step {
RemapDocumentAddition { documents_seen } => (documents_seen, None),
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => {
(documents_seen, Some(total_documents))
}
Setting::Reset => builder.reset_searchable_fields(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.displayed_attributes {
Setting::Set(displayed_attributes) => {
builder.set_displayed_fields(displayed_attributes)
IndexDocuments { documents_seen, total_documents } => {
(documents_seen, Some(total_documents))
}
Setting::Reset => builder.reset_displayed_fields(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.filterable_attributes {
Setting::Set(filterable_attributes) => {
builder.set_filterable_fields(filterable_attributes)
MergeDataIntoFinalDatabase { databases_seen, total_databases } => {
(databases_seen, Some(total_databases))
}
Setting::Reset => builder.reset_filterable_fields(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.sortable_attributes {
Setting::Set(sortable_attributes) => {
builder.set_sortable_fields(sortable_attributes)
}
Setting::Reset => builder.reset_sortable_fields(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.criteria {
Setting::Set(criteria) => builder.set_criteria(criteria),
Setting::Reset => builder.reset_criteria(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.stop_words {
Setting::Set(stop_words) => builder.set_stop_words(stop_words),
Setting::Reset => builder.reset_stop_words(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.synonyms {
Setting::Set(synonyms) => builder.set_synonyms(synonyms),
Setting::Reset => builder.reset_synonyms(),
Setting::NotSet => (),
}
let result = builder.execute(|indexing_step| {
let (current, total) = match indexing_step {
RemapDocumentAddition { documents_seen } => (documents_seen, None),
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => {
(documents_seen, Some(total_documents))
}
IndexDocuments { documents_seen, total_documents } => {
(documents_seen, Some(total_documents))
}
MergeDataIntoFinalDatabase { databases_seen, total_databases } => {
(databases_seen, Some(total_databases))
}
};
let _ = update_status_sender_cloned.send(UpdateStatus::Progressing {
update_id,
meta: UpdateMetaProgress::DocumentsAddition {
step: indexing_step.step(),
total_steps: indexing_step.number_of_steps(),
current,
total,
},
});
};
let _ = update_status_sender_cloned.send(UpdateStatus::Progressing {
update_id,
meta: UpdateMetaProgress::DocumentsAddition {
step: indexing_step.step(),
total_steps: indexing_step.number_of_steps(),
current,
total,
},
});
});
match result {
Ok(_count) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
match result {
Ok(_count) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
UpdateMeta::Facets(levels) => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let mut builder = update_builder.facets(&mut wtxn, &index_cloned);
if let Some(value) = levels.level_group_size {
builder.level_group_size(value);
}
if let Some(value) = levels.min_level_size {
builder.min_level_size(value);
}
match builder.execute() {
Ok(()) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
}
UpdateMeta::Facets(levels) => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let mut builder = milli::update::Facets::new(&mut wtxn, &index_cloned);
if let Some(value) = levels.level_group_size {
builder.level_group_size(value);
}
})();
if let Some(value) = levels.min_level_size {
builder.min_level_size(value);
}
match builder.execute() {
Ok(()) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
}
})();
let meta = match result {
Ok(()) => {

View File

@ -867,7 +867,7 @@ pub(crate) mod tests {
use maplit::btreemap;
use tempfile::TempDir;
use crate::update::IndexDocuments;
use crate::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig};
use crate::Index;
pub(crate) struct TempIndex {
@ -908,8 +908,13 @@ pub(crate) mod tests {
{ "id": 2, "name": "bob", "age": 20 },
{ "id": 2, "name": "bob", "age": 20 }
]);
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap();
@ -927,13 +932,15 @@ pub(crate) mod tests {
// we add all the documents a second time. we are supposed to get the same
// field_distribution in the end
let mut wtxn = index.write_txn().unwrap();
let builder = IndexDocuments::new(&mut wtxn, &index);
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
let content = documents!([
{ "id": 1, "name": "kevin" },
{ "id": 2, "name": "bob", "age": 20 },
{ "id": 2, "name": "bob", "age": 20 }
]);
builder.execute(content, |_| ()).unwrap();
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap();
@ -955,8 +962,10 @@ pub(crate) mod tests {
]);
let mut wtxn = index.write_txn().unwrap();
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap();

View File

@ -38,7 +38,9 @@ mod test {
use crate::documents::{DocumentBatchBuilder, DocumentBatchReader};
use crate::index::tests::TempIndex;
use crate::index::Index;
use crate::update::{IndexDocumentsMethod, UpdateBuilder};
use crate::update::{
IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings,
};
use crate::{DocumentId, FieldId, BEU32};
static JSON: Lazy<Vec<u8>> = Lazy::new(generate_documents);
@ -84,19 +86,24 @@ mod test {
let mut txn = index.write_txn().unwrap();
// set distinct and faceted attributes for the index.
let builder = UpdateBuilder::new();
let mut update = builder.settings(&mut txn, &index);
let config = IndexerConfig::default();
let mut update = Settings::new(&mut txn, &index, &config);
update.set_distinct_field(distinct.to_string());
update.execute(|_| ()).unwrap();
// add documents to the index
let builder = UpdateBuilder::new();
let mut addition = builder.index_documents(&mut txn, &index);
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig {
update_method: IndexDocumentsMethod::ReplaceDocuments,
..Default::default()
};
let mut addition = IndexDocuments::new(&mut txn, &index, &config, indexing_config, |_| ());
addition.index_documents_method(IndexDocumentsMethod::ReplaceDocuments);
let reader =
crate::documents::DocumentBatchReader::from_reader(Cursor::new(&*JSON)).unwrap();
addition.execute(reader, |_| ()).unwrap();
addition.add_documents(reader).unwrap();
addition.execute().unwrap();
let fields_map = index.fields_ids_map(&txn).unwrap();
let fid = fields_map.id(&distinct).unwrap();

View File

@ -450,7 +450,7 @@ mod tests {
use maplit::hashset;
use super::*;
use crate::update::Settings;
use crate::update::{IndexerConfig, Settings};
use crate::Index;
#[test]
@ -461,8 +461,9 @@ mod tests {
let index = Index::new(options, &path).unwrap();
// Set the filterable fields to be the channel.
let config = IndexerConfig::default();
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_searchable_fields(vec![S("PrIcE")]); // to keep the fields order
builder.set_filterable_fields(hashset! { S("PrIcE") });
builder.execute(|_| ()).unwrap();
@ -563,9 +564,10 @@ mod tests {
));
drop(rtxn);
let config = IndexerConfig::default();
// Set the filterable fields to be the channel.
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_searchable_fields(vec![S("title")]);
builder.set_filterable_fields(hashset! { S("title") });
builder.execute(|_| ()).unwrap();
@ -593,9 +595,10 @@ mod tests {
options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path).unwrap();
let config = IndexerConfig::default();
// Set the filterable fields to be the channel.
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_searchable_fields(vec![S("_geo"), S("price")]); // to keep the fields order
builder.set_filterable_fields(hashset! { S("_geo"), S("price") });
builder.execute(|_| ()).unwrap();

View File

@ -77,7 +77,7 @@ mod tests {
use heed::EnvOpenOptions;
use super::*;
use crate::update::IndexDocuments;
use crate::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig};
#[test]
fn clear_documents() {
@ -92,7 +92,11 @@ mod tests {
{ "id": 1, "name": "kevina" },
{ "id": 2, "name": "benoit", "country": "France", "_geo": { "lng": 42, "lat": 35 } }
]);
IndexDocuments::new(&mut wtxn, &index).execute(content, |_| ()).unwrap();
let indexing_config = IndexDocumentsConfig::default();
let config = IndexerConfig::default();
let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
// Clear all documents from the database.
let builder = ClearDocuments::new(&mut wtxn, &index);

View File

@ -580,7 +580,7 @@ mod tests {
use maplit::hashset;
use super::*;
use crate::update::{IndexDocuments, Settings};
use crate::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig, Settings};
use crate::Filter;
#[test]
@ -596,8 +596,11 @@ mod tests {
{ "id": 1, "name": "kevina", "array": ["I", "am", "fine"] },
{ "id": 2, "name": "benoit", "array_of_object": [{ "wow": "amazing" }] }
]);
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
// delete those documents, ids are synchronous therefore 0, 1, and 2.
let mut builder = DeleteDocuments::new(&mut wtxn, &index).unwrap();
@ -626,8 +629,12 @@ mod tests {
{ "mysuperid": 1, "name": "kevina" },
{ "mysuperid": 2, "name": "benoit" }
]);
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
// Delete not all of the documents but some of them.
let mut builder = DeleteDocuments::new(&mut wtxn, &index).unwrap();
@ -646,7 +653,8 @@ mod tests {
let index = Index::new(options, &path).unwrap();
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let config = IndexerConfig::default();
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_primary_key(S("docid"));
builder.set_filterable_fields(hashset! { S("label") });
builder.execute(|_| ()).unwrap();
@ -673,8 +681,12 @@ mod tests {
{"docid":"1_68","label":"design"},
{"docid":"1_69","label":"geometry"}
]);
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
// Delete not all of the documents but some of them.
let mut builder = DeleteDocuments::new(&mut wtxn, &index).unwrap();
@ -696,7 +708,8 @@ mod tests {
let index = Index::new(options, &path).unwrap();
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let config = IndexerConfig::default();
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_primary_key(S("id"));
builder.set_filterable_fields(hashset!(S("_geo")));
builder.set_sortable_fields(hashset!(S("_geo")));
@ -726,7 +739,11 @@ mod tests {
]);
let external_ids_to_delete = ["5", "6", "7", "12", "17", "19"];
IndexDocuments::new(&mut wtxn, &index).execute(content, |_| ()).unwrap();
let indexing_config = IndexDocumentsConfig::default();
let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
let external_document_ids = index.external_documents_ids(&wtxn).unwrap();
let ids_to_delete: Vec<u32> = external_ids_to_delete

View File

@ -7,13 +7,9 @@ use std::collections::HashSet;
use std::io::{Read, Seek};
use std::iter::FromIterator;
use std::num::{NonZeroU32, NonZeroUsize};
use std::time::Instant;
use chrono::Utc;
use crossbeam_channel::{Receiver, Sender};
use grenad::{self, CompressionType};
use log::{debug, info};
use rayon::ThreadPool;
use log::debug;
use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize};
use typed_chunk::{write_typed_chunk_into_index, TypedChunk};
@ -26,8 +22,8 @@ use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
pub use self::transform::{Transform, TransformOutput};
use crate::documents::DocumentBatchReader;
use crate::update::{
Facets, UpdateBuilder, UpdateIndexingStep, WordPrefixDocids, WordPrefixPairProximityDocids,
WordPrefixPositionDocids, WordsPrefixesFst,
self, Facets, IndexerConfig, UpdateIndexingStep, WordPrefixDocids,
WordPrefixPairProximityDocids, WordPrefixPositionDocids, WordsPrefixesFst,
};
use crate::{Index, Result};
@ -55,120 +51,116 @@ pub enum IndexDocumentsMethod {
UpdateDocuments,
}
impl Default for IndexDocumentsMethod {
fn default() -> Self {
Self::ReplaceDocuments
}
}
#[derive(Debug, Copy, Clone)]
pub enum WriteMethod {
Append,
GetMergePut,
}
pub struct IndexDocuments<'t, 'u, 'i, 'a> {
pub struct IndexDocuments<'t, 'u, 'i, 'a, F> {
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
pub(crate) log_every_n: Option<usize>,
pub(crate) documents_chunk_size: Option<usize>,
pub(crate) max_nb_chunks: Option<usize>,
pub(crate) max_memory: Option<usize>,
pub(crate) chunk_compression_type: CompressionType,
pub(crate) chunk_compression_level: Option<u32>,
pub(crate) thread_pool: Option<&'a ThreadPool>,
pub(crate) max_positions_per_attributes: Option<u32>,
facet_level_group_size: Option<NonZeroUsize>,
facet_min_level_size: Option<NonZeroUsize>,
words_prefix_threshold: Option<u32>,
max_prefix_length: Option<usize>,
words_positions_level_group_size: Option<NonZeroU32>,
words_positions_min_level_size: Option<NonZeroU32>,
update_method: IndexDocumentsMethod,
autogenerate_docids: bool,
config: IndexDocumentsConfig,
indexer_config: &'a IndexerConfig,
transform: Option<Transform<'a, 'i>>,
progress: F,
added_documents: u64,
}
impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
#[derive(Default, Debug, Clone)]
pub struct IndexDocumentsConfig {
pub facet_level_group_size: Option<NonZeroUsize>,
pub facet_min_level_size: Option<NonZeroUsize>,
pub words_prefix_threshold: Option<u32>,
pub max_prefix_length: Option<usize>,
pub words_positions_level_group_size: Option<NonZeroU32>,
pub words_positions_min_level_size: Option<NonZeroU32>,
pub update_method: IndexDocumentsMethod,
pub autogenerate_docids: bool,
}
impl<'t, 'u, 'i, 'a, F> IndexDocuments<'t, 'u, 'i, 'a, F>
where
F: Fn(UpdateIndexingStep) + Sync,
{
pub fn new(
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
) -> IndexDocuments<'t, 'u, 'i, 'a> {
indexer_config: &'a IndexerConfig,
config: IndexDocumentsConfig,
progress: F,
) -> IndexDocuments<'t, 'u, 'i, 'a, F> {
let transform = Some(Transform::new(
&index,
indexer_config,
config.update_method,
config.autogenerate_docids,
));
IndexDocuments {
transform,
config,
indexer_config,
progress,
wtxn,
index,
log_every_n: None,
documents_chunk_size: None,
max_nb_chunks: None,
max_memory: None,
chunk_compression_type: CompressionType::None,
chunk_compression_level: None,
thread_pool: None,
facet_level_group_size: None,
facet_min_level_size: None,
words_prefix_threshold: None,
max_prefix_length: None,
words_positions_level_group_size: None,
words_positions_min_level_size: None,
update_method: IndexDocumentsMethod::ReplaceDocuments,
autogenerate_docids: false,
max_positions_per_attributes: None,
added_documents: 0,
}
}
pub fn log_every_n(&mut self, n: usize) {
self.log_every_n = Some(n);
}
pub fn index_documents_method(&mut self, method: IndexDocumentsMethod) {
self.update_method = method;
}
pub fn enable_autogenerate_docids(&mut self) {
self.autogenerate_docids = true;
}
pub fn disable_autogenerate_docids(&mut self) {
self.autogenerate_docids = false;
}
#[logging_timer::time("IndexDocuments::{}")]
pub fn execute<R, F>(
self,
reader: DocumentBatchReader<R>,
progress_callback: F,
) -> Result<DocumentAdditionResult>
/// Adds a batch of documents to the current builder.
///
/// Since the documents are progressively added to the writer, a failure will cause a stale
/// builder, and the builder must be discarded.
///
/// Returns the number of documents added to the builder.
pub fn add_documents<R>(&mut self, reader: DocumentBatchReader<R>) -> Result<u64>
where
R: Read + Seek,
F: Fn(UpdateIndexingStep) + Sync,
{
// Early return when there is no document to add
if reader.is_empty() {
return Ok(DocumentAdditionResult {
indexed_documents: 0,
number_of_documents: self.index.number_of_documents(self.wtxn)?,
});
return Ok(0);
}
self.index.set_updated_at(self.wtxn, &Utc::now())?;
let before_transform = Instant::now();
let transform = Transform {
rtxn: &self.wtxn,
index: self.index,
log_every_n: self.log_every_n,
chunk_compression_type: self.chunk_compression_type,
chunk_compression_level: self.chunk_compression_level,
max_nb_chunks: self.max_nb_chunks,
max_memory: self.max_memory,
index_documents_method: self.update_method,
autogenerate_docids: self.autogenerate_docids,
};
let indexed_documents = self
.transform
.as_mut()
.expect("Invalid document addition state")
.read_documents(reader, self.wtxn, &self.progress)?
as u64;
let output = transform.read_documents(reader, &progress_callback)?;
self.added_documents += indexed_documents;
Ok(indexed_documents)
}
#[logging_timer::time("IndexDocuments::{}")]
pub fn execute(mut self) -> Result<DocumentAdditionResult> {
if self.added_documents == 0 {
let number_of_documents = self.index.number_of_documents(self.wtxn)?;
return Ok(DocumentAdditionResult { indexed_documents: 0, number_of_documents });
}
let output = self
.transform
.take()
.expect("Invalid document addition state")
.output_from_sorter(self.wtxn, &self.progress)?;
let indexed_documents = output.documents_count as u64;
info!("Update transformed in {:.02?}", before_transform.elapsed());
let number_of_documents = self.execute_raw(output, progress_callback)?;
let number_of_documents = self.execute_raw(output)?;
Ok(DocumentAdditionResult { indexed_documents, number_of_documents })
}
/// Returns the total number of documents in the index after the update.
#[logging_timer::time("IndexDocuments::{}")]
pub fn execute_raw<F>(self, output: TransformOutput, progress_callback: F) -> Result<u64>
pub fn execute_raw(self, output: TransformOutput) -> Result<u64>
where
F: Fn(UpdateIndexingStep) + Sync,
{
@ -188,8 +180,8 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
self.index.put_fields_ids_map(self.wtxn, &fields_ids_map)?;
let backup_pool;
let pool = match self.thread_pool {
Some(pool) => pool,
let pool = match self.indexer_config.thread_pool {
Some(ref pool) => pool,
#[cfg(not(test))]
None => {
// We initialize a bakcup pool with the default
@ -237,22 +229,21 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
};
let stop_words = self.index.stop_words(self.wtxn)?;
// let stop_words = stop_words.as_ref();
// Run extraction pipeline in parallel.
pool.install(|| {
let params = GrenadParameters {
chunk_compression_type: self.chunk_compression_type,
chunk_compression_level: self.chunk_compression_level,
max_memory: self.max_memory,
max_nb_chunks: self.max_nb_chunks, // default value, may be chosen.
chunk_compression_type: self.indexer_config.chunk_compression_type,
chunk_compression_level: self.indexer_config.chunk_compression_level,
max_memory: self.indexer_config.max_memory,
max_nb_chunks: self.indexer_config.max_nb_chunks, // default value, may be chosen.
};
// split obkv file into several chuncks
let chunk_iter = grenad_obkv_into_chunks(
documents_file,
params.clone(),
self.documents_chunk_size.unwrap_or(1024 * 1024 * 128), // 128MiB
self.indexer_config.documents_chunk_size.unwrap_or(1024 * 1024 * 128), // 128MiB
);
let result = chunk_iter.map(|chunk_iter| {
@ -266,7 +257,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
primary_key_id,
geo_field_id,
stop_words,
self.max_positions_per_attributes,
self.indexer_config.max_positions_per_attributes,
)
});
@ -281,17 +272,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
// We delete the documents that this document addition replaces. This way we are
// able to simply insert all the documents even if they already exist in the database.
if !replaced_documents_ids.is_empty() {
let update_builder = UpdateBuilder {
log_every_n: self.log_every_n,
max_nb_chunks: self.max_nb_chunks,
max_memory: self.max_memory,
documents_chunk_size: self.documents_chunk_size,
chunk_compression_type: self.chunk_compression_type,
chunk_compression_level: self.chunk_compression_level,
thread_pool: self.thread_pool,
max_positions_per_attributes: self.max_positions_per_attributes,
};
let mut deletion_builder = update_builder.delete_documents(self.wtxn, self.index)?;
let mut deletion_builder = update::DeleteDocuments::new(self.wtxn, self.index)?;
debug!("documents to delete {:?}", replaced_documents_ids);
deletion_builder.delete_documents(&replaced_documents_ids);
let deleted_documents_count = deletion_builder.execute()?;
@ -303,7 +284,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
let mut final_documents_ids = RoaringBitmap::new();
let mut databases_seen = 0;
progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase {
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
@ -314,7 +295,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
if !docids.is_empty() {
final_documents_ids |= docids;
let documents_seen_count = final_documents_ids.len();
progress_callback(UpdateIndexingStep::IndexDocuments {
(self.progress)(UpdateIndexingStep::IndexDocuments {
documents_seen: documents_seen_count as usize,
total_documents: documents_count,
});
@ -325,7 +306,7 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
}
if is_merged_database {
databases_seen += 1;
progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase {
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
@ -344,98 +325,95 @@ impl<'t, 'u, 'i, 'a> IndexDocuments<'t, 'u, 'i, 'a> {
let all_documents_ids = index_documents_ids | new_documents_ids | replaced_documents_ids;
self.index.put_documents_ids(self.wtxn, &all_documents_ids)?;
self.execute_prefix_databases(progress_callback)?;
self.execute_prefix_databases()?;
Ok(all_documents_ids.len())
}
#[logging_timer::time("IndexDocuments::{}")]
pub fn execute_prefix_databases<F>(self, progress_callback: F) -> Result<()>
where
F: Fn(UpdateIndexingStep) + Sync,
{
pub fn execute_prefix_databases(self) -> Result<()> {
// Merged databases are already been indexed, we start from this count;
let mut databases_seen = MERGED_DATABASE_COUNT;
// Run the facets update operation.
let mut builder = Facets::new(self.wtxn, self.index);
builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level;
if let Some(value) = self.facet_level_group_size {
builder.chunk_compression_type = self.indexer_config.chunk_compression_type;
builder.chunk_compression_level = self.indexer_config.chunk_compression_level;
if let Some(value) = self.config.facet_level_group_size {
builder.level_group_size(value);
}
if let Some(value) = self.facet_min_level_size {
if let Some(value) = self.config.facet_min_level_size {
builder.min_level_size(value);
}
builder.execute()?;
databases_seen += 1;
progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen: databases_seen,
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
// Run the words prefixes update operation.
let mut builder = WordsPrefixesFst::new(self.wtxn, self.index);
if let Some(value) = self.words_prefix_threshold {
if let Some(value) = self.config.words_prefix_threshold {
builder.threshold(value);
}
if let Some(value) = self.max_prefix_length {
if let Some(value) = self.config.max_prefix_length {
builder.max_prefix_length(value);
}
builder.execute()?;
databases_seen += 1;
progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen: databases_seen,
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
// Run the word prefix docids update operation.
let mut builder = WordPrefixDocids::new(self.wtxn, self.index);
builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level;
builder.max_nb_chunks = self.max_nb_chunks;
builder.max_memory = self.max_memory;
builder.chunk_compression_type = self.indexer_config.chunk_compression_type;
builder.chunk_compression_level = self.indexer_config.chunk_compression_level;
builder.max_nb_chunks = self.indexer_config.max_nb_chunks;
builder.max_memory = self.indexer_config.max_memory;
builder.execute()?;
databases_seen += 1;
progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen: databases_seen,
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
// Run the word prefix pair proximity docids update operation.
let mut builder = WordPrefixPairProximityDocids::new(self.wtxn, self.index);
builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level;
builder.max_nb_chunks = self.max_nb_chunks;
builder.max_memory = self.max_memory;
builder.chunk_compression_type = self.indexer_config.chunk_compression_type;
builder.chunk_compression_level = self.indexer_config.chunk_compression_level;
builder.max_nb_chunks = self.indexer_config.max_nb_chunks;
builder.max_memory = self.indexer_config.max_memory;
builder.execute()?;
databases_seen += 1;
progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen: databases_seen,
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
// Run the words prefix position docids update operation.
let mut builder = WordPrefixPositionDocids::new(self.wtxn, self.index);
builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level;
builder.max_nb_chunks = self.max_nb_chunks;
builder.max_memory = self.max_memory;
if let Some(value) = self.words_positions_level_group_size {
builder.chunk_compression_type = self.indexer_config.chunk_compression_type;
builder.chunk_compression_level = self.indexer_config.chunk_compression_level;
builder.max_nb_chunks = self.indexer_config.max_nb_chunks;
builder.max_memory = self.indexer_config.max_memory;
if let Some(value) = self.config.words_positions_level_group_size {
builder.level_group_size(value);
}
if let Some(value) = self.words_positions_min_level_size {
if let Some(value) = self.config.words_positions_min_level_size {
builder.min_level_size(value);
}
builder.execute()?;
databases_seen += 1;
progress_callback(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen: databases_seen,
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
@ -469,8 +447,13 @@ mod tests {
{ "id": 2, "name": "kevina" },
{ "id": 3, "name": "benoit" }
]);
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// Check that there is 3 documents now.
@ -482,8 +465,10 @@ mod tests {
// Second we send 1 document with id 1, to erase the previous ones.
let mut wtxn = index.write_txn().unwrap();
let content = documents!([ { "id": 1, "name": "updated kevin" } ]);
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// Check that there is **always** 3 documents.
@ -499,8 +484,8 @@ mod tests {
{ "id": 2, "name": "updated kevina" },
{ "id": 3, "name": "updated benoit" }
]);
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
builder.add_documents(content).unwrap();
wtxn.commit().unwrap();
// Check that there is **always** 3 documents.
@ -525,9 +510,15 @@ mod tests {
{ "id": 1, "name": "kevina" },
{ "id": 1, "name": "benoit" }
]);
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments);
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig {
update_method: IndexDocumentsMethod::UpdateDocuments,
..Default::default()
};
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// Check that there is only 1 document now.
@ -551,9 +542,9 @@ mod tests {
// Second we send 1 document with id 1, to force it to be merged with the previous one.
let mut wtxn = index.write_txn().unwrap();
let content = documents!([ { "id": 1, "age": 25 } ]);
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments);
builder.execute(content, |_| ()).unwrap();
let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// Check that there is **always** 1 document.
@ -590,8 +581,10 @@ mod tests {
{ "name": "kevina" },
{ "name": "benoit" }
]);
let builder = IndexDocuments::new(&mut wtxn, &index);
assert!(builder.execute(content, |_| ()).is_err());
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
assert!(builder.add_documents(content).is_err());
wtxn.commit().unwrap();
// Check that there is no document.
@ -615,9 +608,13 @@ mod tests {
{ "name": "kevina" },
{ "name": "benoit" }
]);
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.enable_autogenerate_docids();
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config =
IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() };
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// Check that there is 3 documents now.
@ -633,8 +630,9 @@ mod tests {
// Second we send 1 document with the generated uuid, to erase the previous ones.
let mut wtxn = index.write_txn().unwrap();
let content = documents!([ { "name": "updated kevin", "id": kevin_uuid } ]);
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// Check that there is **always** 3 documents.
@ -670,8 +668,11 @@ mod tests {
{ "id": 2, "name": "kevina" },
{ "id": 3, "name": "benoit" }
]);
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// Check that there is 3 documents now.
@ -683,9 +684,11 @@ mod tests {
// Second we send 1 document without specifying the id.
let mut wtxn = index.write_txn().unwrap();
let content = documents!([ { "name": "new kevin" } ]);
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.enable_autogenerate_docids();
builder.execute(content, |_| ()).unwrap();
let indexing_config =
IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() };
let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// Check that there is 4 documents now.
@ -705,8 +708,11 @@ mod tests {
// First we send 0 documents and only headers.
let mut wtxn = index.write_txn().unwrap();
let content = documents!([]);
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// Check that there is no documents.
@ -727,16 +733,20 @@ mod tests {
let mut wtxn = index.write_txn().unwrap();
// There is a space in the document id.
let content = documents!([ { "id": "brume bleue", "name": "kevin" } ]);
let builder = IndexDocuments::new(&mut wtxn, &index);
assert!(builder.execute(content, |_| ()).is_err());
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
assert!(builder.add_documents(content).is_err());
wtxn.commit().unwrap();
// First we send 1 document with a valid id.
let mut wtxn = index.write_txn().unwrap();
// There is a space in the document id.
let content = documents!([ { "id": 32, "name": "kevin" } ]);
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// Check that there is 1 document now.
@ -760,8 +770,11 @@ mod tests {
{ "id": 1, "name": "kevina", "array": ["I", "am", "fine"] },
{ "id": 2, "name": "benoit", "array_of_object": [{ "wow": "amazing" }] }
]);
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// Check that there is 1 documents now.
@ -799,14 +812,22 @@ mod tests {
{ "id": 4, "title": "Harry Potter and the Half-Blood Prince", "author": "J. K. Rowling", "genre": "fantasy" },
{ "id": 42, "title": "The Hitchhiker's Guide to the Galaxy", "author": "Douglas Adams", "_geo": { "lat": 35, "lng": 23 } }
]);
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.index_documents_method(IndexDocumentsMethod::ReplaceDocuments);
builder.execute(documents, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig {
update_method: IndexDocumentsMethod::ReplaceDocuments,
..Default::default()
};
let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
builder.add_documents(documents).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
let mut wtxn = index.write_txn().unwrap();
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments);
let indexing_config = IndexDocumentsConfig {
update_method: IndexDocumentsMethod::UpdateDocuments,
..Default::default()
};
let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
let documents = documents!([
{
"id": 2,
@ -815,7 +836,8 @@ mod tests {
}
]);
builder.execute(documents, |_| ()).unwrap();
builder.add_documents(documents).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
}
@ -833,7 +855,12 @@ mod tests {
{ "objectId": 1, "title": "Alice In Wonderland", "comment": "A weird book" },
{ "objectId": 30, "title": "Hamlet", "_geo": { "lat": 12, "lng": 89 } }
]);
IndexDocuments::new(&mut wtxn, &index).execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
assert_eq!(index.primary_key(&wtxn).unwrap(), Some("objectId"));
@ -848,15 +875,22 @@ mod tests {
let content = documents!([
{ "objectId": 30, "title": "Hamlet", "_geo": { "lat": 12, "lng": 89 } }
]);
IndexDocuments::new(&mut wtxn, &index).execute(content, |_| ()).unwrap();
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
let external_documents_ids = index.external_documents_ids(&wtxn).unwrap();
assert!(external_documents_ids.get("30").is_some());
let content = documents!([
{ "objectId": 30, "title": "Hamlet", "_geo": { "lat": 12, "lng": 89 } }
]);
IndexDocuments::new(&mut wtxn, &index).execute(content, |_| ()).unwrap();
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
}
@ -886,8 +920,12 @@ mod tests {
cursor.set_position(0);
let content = DocumentBatchReader::from_reader(cursor).unwrap();
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
}
@ -916,8 +954,12 @@ mod tests {
cursor.set_position(0);
let content = DocumentBatchReader::from_reader(cursor).unwrap();
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
@ -969,8 +1011,12 @@ mod tests {
},
]);
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
}
@ -990,8 +1036,12 @@ mod tests {
]);
let mut wtxn = index.write_txn().unwrap();
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// Check that there is 4 document now.
@ -1002,8 +1052,12 @@ mod tests {
let content = documents!([]);
let mut wtxn = index.write_txn().unwrap();
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// Check that there is 4 document now.
@ -1019,8 +1073,12 @@ mod tests {
]);
let mut wtxn = index.write_txn().unwrap();
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// Check that there is 4 document now.
@ -1042,8 +1100,12 @@ mod tests {
]);
let mut wtxn = index.write_txn().unwrap();
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap();

View File

@ -5,7 +5,6 @@ use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use std::time::Instant;
use grenad::CompressionType;
use itertools::Itertools;
use log::info;
use roaring::RoaringBitmap;
@ -14,7 +13,7 @@ use serde_json::{Map, Value};
use super::helpers::{
create_sorter, create_writer, keep_latest_obkv, merge_obkvs, merge_two_obkvs, MergeFn,
};
use super::IndexDocumentsMethod;
use super::{IndexDocumentsMethod, IndexerConfig};
use crate::documents::{DocumentBatchReader, DocumentsBatchIndex};
use crate::error::{Error, InternalError, UserError};
use crate::index::db_name;
@ -40,16 +39,14 @@ pub struct TransformOutput {
/// Outputs the new `FieldsIdsMap`, the new `UsersIdsDocumentsIds` map, the new documents ids,
/// the replaced documents ids, the number of documents in this update and the file
/// containing all those documents.
pub struct Transform<'t, 'i> {
pub rtxn: &'t heed::RoTxn<'i>,
pub struct Transform<'a, 'i> {
pub index: &'i Index,
pub log_every_n: Option<usize>,
pub chunk_compression_type: CompressionType,
pub chunk_compression_level: Option<u32>,
pub max_nb_chunks: Option<usize>,
pub max_memory: Option<usize>,
pub index_documents_method: IndexDocumentsMethod,
indexer_settings: &'a IndexerConfig,
pub autogenerate_docids: bool,
pub index_documents_method: IndexDocumentsMethod,
sorter: grenad::Sorter<MergeFn>,
documents_count: usize,
}
/// Create a mapping between the field ids found in the document batch and the one that were
@ -84,56 +81,73 @@ fn find_primary_key(index: &DocumentsBatchIndex) -> Option<&str> {
.map(String::as_str)
}
impl Transform<'_, '_> {
pub fn read_documents<R, F>(
self,
mut reader: DocumentBatchReader<R>,
progress_callback: F,
) -> Result<TransformOutput>
where
R: Read + Seek,
F: Fn(UpdateIndexingStep) + Sync,
{
let fields_index = reader.index();
let mut fields_ids_map = self.index.fields_ids_map(self.rtxn)?;
let mapping = create_fields_mapping(&mut fields_ids_map, fields_index)?;
let alternative_name = self
.index
.primary_key(self.rtxn)?
.or_else(|| find_primary_key(fields_index))
.map(String::from);
let (primary_key_id, primary_key_name) = compute_primary_key_pair(
self.index.primary_key(self.rtxn)?,
&mut fields_ids_map,
alternative_name,
self.autogenerate_docids,
)?;
impl<'a, 'i> Transform<'a, 'i> {
pub fn new(
index: &'i Index,
indexer_settings: &'a IndexerConfig,
index_documents_method: IndexDocumentsMethod,
autogenerate_docids: bool,
) -> Self {
// We must choose the appropriate merge function for when two or more documents
// with the same user id must be merged or fully replaced in the same batch.
let merge_function = match self.index_documents_method {
let merge_function = match index_documents_method {
IndexDocumentsMethod::ReplaceDocuments => keep_latest_obkv,
IndexDocumentsMethod::UpdateDocuments => merge_obkvs,
};
// We initialize the sorter with the user indexing settings.
let mut sorter = create_sorter(
let sorter = create_sorter(
merge_function,
self.chunk_compression_type,
self.chunk_compression_level,
self.max_nb_chunks,
self.max_memory,
indexer_settings.chunk_compression_type,
indexer_settings.chunk_compression_level,
indexer_settings.max_nb_chunks,
indexer_settings.max_memory,
);
Transform {
index,
indexer_settings,
autogenerate_docids,
sorter,
documents_count: 0,
index_documents_method,
}
}
pub fn read_documents<R, F>(
&mut self,
mut reader: DocumentBatchReader<R>,
wtxn: &mut heed::RwTxn,
progress_callback: F,
) -> Result<usize>
where
R: Read + Seek,
F: Fn(UpdateIndexingStep) + Sync,
{
let fields_index = reader.index();
let mut fields_ids_map = self.index.fields_ids_map(wtxn)?;
let mapping = create_fields_mapping(&mut fields_ids_map, fields_index)?;
let alternative_name = self
.index
.primary_key(wtxn)?
.or_else(|| find_primary_key(fields_index))
.map(String::from);
let (primary_key_id, primary_key_name) = compute_primary_key_pair(
self.index.primary_key(wtxn)?,
&mut fields_ids_map,
alternative_name,
self.autogenerate_docids,
)?;
let mut obkv_buffer = Vec::new();
let mut documents_count = 0;
let mut external_id_buffer = Vec::new();
let mut field_buffer: Vec<(u16, &[u8])> = Vec::new();
while let Some((addition_index, document)) = reader.next_document_with_index()? {
let mut field_buffer_cache = drop_and_reuse(field_buffer);
if self.log_every_n.map_or(false, |len| documents_count % len == 0) {
if self.indexer_settings.log_every_n.map_or(false, |len| documents_count % len == 0) {
progress_callback(UpdateIndexingStep::RemapDocumentAddition {
documents_seen: documents_count,
});
@ -214,7 +228,7 @@ impl Transform<'_, '_> {
}
// We use the extracted/generated user id as the key for this document.
sorter.insert(&external_id.as_ref().as_bytes(), &obkv_buffer)?;
self.sorter.insert(&external_id.as_ref().as_bytes(), &obkv_buffer)?;
documents_count += 1;
progress_callback(UpdateIndexingStep::RemapDocumentAddition {
@ -230,38 +244,40 @@ impl Transform<'_, '_> {
documents_seen: documents_count,
});
self.index.put_fields_ids_map(wtxn, &fields_ids_map)?;
self.index.put_primary_key(wtxn, &primary_key_name)?;
self.documents_count += documents_count;
// Now that we have a valid sorter that contains the user id and the obkv we
// give it to the last transforming function which returns the TransformOutput.
self.output_from_sorter(
sorter,
primary_key_name,
fields_ids_map,
documents_count,
progress_callback,
)
Ok(documents_count)
}
/// Generate the `TransformOutput` based on the given sorter that can be generated from any
/// format like CSV, JSON or JSON stream. This sorter must contain a key that is the document
/// id for the user side and the value must be an obkv where keys are valid fields ids.
fn output_from_sorter<F>(
pub(crate) fn output_from_sorter<F>(
self,
sorter: grenad::Sorter<MergeFn>,
primary_key: String,
fields_ids_map: FieldsIdsMap,
approximate_number_of_documents: usize,
wtxn: &mut heed::RwTxn,
progress_callback: F,
) -> Result<TransformOutput>
where
F: Fn(UpdateIndexingStep) + Sync,
{
let mut external_documents_ids = self.index.external_documents_ids(self.rtxn).unwrap();
let documents_ids = self.index.documents_ids(self.rtxn)?;
let mut field_distribution = self.index.field_distribution(self.rtxn)?;
let primary_key = self
.index
.primary_key(&wtxn)?
.ok_or(Error::UserError(UserError::MissingPrimaryKey))?
.to_string();
let fields_ids_map = self.index.fields_ids_map(wtxn)?;
let approximate_number_of_documents = self.documents_count;
let mut external_documents_ids = self.index.external_documents_ids(wtxn).unwrap();
let documents_ids = self.index.documents_ids(wtxn)?;
let mut field_distribution = self.index.field_distribution(wtxn)?;
let mut available_documents_ids = AvailableDocumentsIds::from_documents_ids(&documents_ids);
// consume sorter, in order to free the internal allocation, before creating a new one.
let mut iter = sorter.into_merger_iter()?;
let mut iter = self.sorter.into_merger_iter()?;
// Once we have sort and deduplicated the documents we write them into a final file.
let mut final_sorter = create_sorter(
@ -272,10 +288,10 @@ impl Transform<'_, '_> {
Err(InternalError::IndexingMergingKeys { process: "documents" }.into())
}
},
self.chunk_compression_type,
self.chunk_compression_level,
self.max_nb_chunks,
self.max_memory,
self.indexer_settings.chunk_compression_type,
self.indexer_settings.chunk_compression_level,
self.indexer_settings.max_nb_chunks,
self.indexer_settings.max_memory,
);
let mut new_external_documents_ids_builder = fst::MapBuilder::memory();
let mut replaced_documents_ids = RoaringBitmap::new();
@ -285,7 +301,7 @@ impl Transform<'_, '_> {
// While we write into final file we get or generate the internal documents ids.
let mut documents_count = 0;
while let Some((external_id, update_obkv)) = iter.next()? {
if self.log_every_n.map_or(false, |len| documents_count % len == 0) {
if self.indexer_settings.log_every_n.map_or(false, |len| documents_count % len == 0) {
progress_callback(UpdateIndexingStep::ComputeIdsAndMergeDocuments {
documents_seen: documents_count,
total_documents: approximate_number_of_documents,
@ -299,7 +315,7 @@ impl Transform<'_, '_> {
replaced_documents_ids.insert(docid);
let key = BEU32::new(docid);
let base_obkv = self.index.documents.get(&self.rtxn, &key)?.ok_or(
let base_obkv = self.index.documents.get(wtxn, &key)?.ok_or(
InternalError::DatabaseMissingEntry {
db_name: db_name::DOCUMENTS,
key: None,
@ -359,8 +375,11 @@ impl Transform<'_, '_> {
// We create a final writer to write the new documents in order from the sorter.
let file = tempfile::tempfile()?;
let mut writer =
create_writer(self.chunk_compression_type, self.chunk_compression_level, file)?;
let mut writer = create_writer(
self.indexer_settings.chunk_compression_type,
self.indexer_settings.chunk_compression_level,
file,
)?;
// Once we have written all the documents into the final sorter, we write the documents
// into this writer, extract the file and reset the seek to be able to read it again.
@ -392,22 +411,28 @@ impl Transform<'_, '_> {
// TODO this can be done in parallel by using the rayon `ThreadPool`.
pub fn remap_index_documents(
self,
primary_key: String,
wtxn: &mut heed::RwTxn,
old_fields_ids_map: FieldsIdsMap,
new_fields_ids_map: FieldsIdsMap,
) -> Result<TransformOutput> {
let field_distribution = self.index.field_distribution(self.rtxn)?;
let external_documents_ids = self.index.external_documents_ids(self.rtxn)?;
let documents_ids = self.index.documents_ids(self.rtxn)?;
// There already has been a document addition, the primary key should be set by now.
let primary_key =
self.index.primary_key(wtxn)?.ok_or(UserError::MissingPrimaryKey)?.to_string();
let field_distribution = self.index.field_distribution(wtxn)?;
let external_documents_ids = self.index.external_documents_ids(wtxn)?;
let documents_ids = self.index.documents_ids(wtxn)?;
let documents_count = documents_ids.len() as usize;
// We create a final writer to write the new documents in order from the sorter.
let file = tempfile::tempfile()?;
let mut writer =
create_writer(self.chunk_compression_type, self.chunk_compression_level, file)?;
let mut writer = create_writer(
self.indexer_settings.chunk_compression_type,
self.indexer_settings.chunk_compression_level,
file,
)?;
let mut obkv_buffer = Vec::new();
for result in self.index.documents.iter(self.rtxn)? {
for result in self.index.documents.iter(wtxn)? {
let (docid, obkv) = result?;
let docid = docid.get();

View File

@ -0,0 +1,29 @@
use grenad::CompressionType;
use rayon::ThreadPool;
#[derive(Debug)]
pub struct IndexerConfig {
pub log_every_n: Option<usize>,
pub max_nb_chunks: Option<usize>,
pub documents_chunk_size: Option<usize>,
pub max_memory: Option<usize>,
pub chunk_compression_type: CompressionType,
pub chunk_compression_level: Option<u32>,
pub thread_pool: Option<ThreadPool>,
pub max_positions_per_attributes: Option<u32>,
}
impl Default for IndexerConfig {
fn default() -> Self {
Self {
log_every_n: None,
max_nb_chunks: None,
documents_chunk_size: None,
max_memory: None,
chunk_compression_type: CompressionType::None,
chunk_compression_level: None,
thread_pool: None,
max_positions_per_attributes: None,
}
}
}

View File

@ -2,9 +2,11 @@ pub use self::available_documents_ids::AvailableDocumentsIds;
pub use self::clear_documents::ClearDocuments;
pub use self::delete_documents::{DeleteDocuments, DocumentDeletionResult};
pub use self::facets::Facets;
pub use self::index_documents::{DocumentAdditionResult, IndexDocuments, IndexDocumentsMethod};
pub use self::index_documents::{
DocumentAdditionResult, IndexDocuments, IndexDocumentsConfig, IndexDocumentsMethod,
};
pub use self::indexer_config::IndexerConfig;
pub use self::settings::{Setting, Settings};
pub use self::update_builder::UpdateBuilder;
pub use self::update_step::UpdateIndexingStep;
pub use self::word_prefix_docids::WordPrefixDocids;
pub use self::word_prefix_pair_proximity_docids::WordPrefixPairProximityDocids;
@ -16,8 +18,8 @@ mod clear_documents;
mod delete_documents;
mod facets;
mod index_documents;
mod indexer_config;
mod settings;
mod update_builder;
mod update_step;
mod word_prefix_docids;
mod word_prefix_pair_proximity_docids;

View File

@ -2,15 +2,15 @@ use std::collections::{BTreeSet, HashMap, HashSet};
use std::result::Result as StdResult;
use chrono::Utc;
use grenad::CompressionType;
use itertools::Itertools;
use meilisearch_tokenizer::{Analyzer, AnalyzerConfig};
use rayon::ThreadPool;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use super::index_documents::{IndexDocumentsConfig, Transform};
use super::IndexerConfig;
use crate::criterion::Criterion;
use crate::error::UserError;
use crate::update::index_documents::{IndexDocumentsMethod, Transform};
use crate::update::index_documents::IndexDocumentsMethod;
use crate::update::{ClearDocuments, IndexDocuments, UpdateIndexingStep};
use crate::{FieldsIdsMap, Index, Result};
@ -77,14 +77,8 @@ impl<'de, T: Deserialize<'de>> Deserialize<'de> for Setting<T> {
pub struct Settings<'a, 't, 'u, 'i> {
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
pub(crate) log_every_n: Option<usize>,
pub(crate) max_nb_chunks: Option<usize>,
pub(crate) max_memory: Option<usize>,
pub(crate) documents_chunk_size: Option<usize>,
pub(crate) chunk_compression_type: CompressionType,
pub(crate) chunk_compression_level: Option<u32>,
pub(crate) thread_pool: Option<&'a ThreadPool>,
pub(crate) max_positions_per_attributes: Option<u32>,
indexer_config: &'a IndexerConfig,
searchable_fields: Setting<Vec<String>>,
displayed_fields: Setting<Vec<String>>,
@ -98,17 +92,14 @@ pub struct Settings<'a, 't, 'u, 'i> {
}
impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
pub fn new(wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index) -> Settings<'a, 't, 'u, 'i> {
pub fn new(
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
indexer_config: &'a IndexerConfig,
) -> Settings<'a, 't, 'u, 'i> {
Settings {
wtxn,
index,
log_every_n: None,
max_nb_chunks: None,
max_memory: None,
documents_chunk_size: None,
chunk_compression_type: CompressionType::None,
chunk_compression_level: None,
thread_pool: None,
searchable_fields: Setting::NotSet,
displayed_fields: Setting::NotSet,
filterable_fields: Setting::NotSet,
@ -118,14 +109,10 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
distinct_field: Setting::NotSet,
synonyms: Setting::NotSet,
primary_key: Setting::NotSet,
max_positions_per_attributes: None,
indexer_config,
}
}
pub fn log_every_n(&mut self, n: usize) {
self.log_every_n = Some(n);
}
pub fn reset_searchable_fields(&mut self) {
self.searchable_fields = Setting::Reset;
}
@ -210,25 +197,16 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
return Ok(());
}
let transform = Transform {
rtxn: &self.wtxn,
index: self.index,
log_every_n: self.log_every_n,
chunk_compression_type: self.chunk_compression_type,
chunk_compression_level: self.chunk_compression_level,
max_nb_chunks: self.max_nb_chunks,
max_memory: self.max_memory,
index_documents_method: IndexDocumentsMethod::ReplaceDocuments,
autogenerate_docids: false,
};
// There already has been a document addition, the primary key should be set by now.
let primary_key =
self.index.primary_key(&self.wtxn)?.ok_or(UserError::MissingPrimaryKey)?;
let transform = Transform::new(
&self.index,
&self.indexer_config,
IndexDocumentsMethod::ReplaceDocuments,
false,
);
// We remap the documents fields based on the new `FieldsIdsMap`.
let output = transform.remap_index_documents(
primary_key.to_string(),
self.wtxn,
old_fields_ids_map,
fields_ids_map.clone(),
)?;
@ -238,16 +216,14 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
// We index the generated `TransformOutput` which must contain
// all the documents with fields in the newly defined searchable order.
let mut indexing_builder = IndexDocuments::new(self.wtxn, self.index);
indexing_builder.log_every_n = self.log_every_n;
indexing_builder.max_nb_chunks = self.max_nb_chunks;
indexing_builder.max_memory = self.max_memory;
indexing_builder.documents_chunk_size = self.documents_chunk_size;
indexing_builder.chunk_compression_type = self.chunk_compression_type;
indexing_builder.chunk_compression_level = self.chunk_compression_level;
indexing_builder.thread_pool = self.thread_pool;
indexing_builder.max_positions_per_attributes = self.max_positions_per_attributes;
indexing_builder.execute_raw(output, &cb)?;
let indexing_builder = IndexDocuments::new(
self.wtxn,
self.index,
&self.indexer_config,
IndexDocumentsConfig::default(),
&cb,
);
indexing_builder.execute_raw(output)?;
Ok(())
}
@ -535,13 +511,17 @@ mod tests {
{ "id": 2, "name": "kevina", "age": 21},
{ "id": 3, "name": "benoit", "age": 34 }
]);
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config = IndexDocumentsConfig::default();
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// We change the searchable fields to be the "name" field only.
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_searchable_fields(vec!["name".into()]);
builder.execute(|_| ()).unwrap();
wtxn.commit().unwrap();
@ -563,7 +543,7 @@ mod tests {
// We change the searchable fields to be the "name" field only.
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.reset_searchable_fields();
builder.execute(|_| ()).unwrap();
wtxn.commit().unwrap();
@ -592,15 +572,19 @@ mod tests {
{ "name": "kevina", "age": 21 },
{ "name": "benoit", "age": 34 }
]);
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.enable_autogenerate_docids();
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config =
IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() };
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// In the same transaction we change the displayed fields to be only the "age".
// We also change the searchable fields to be the "name" field only.
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_displayed_fields(vec!["age".into()]);
builder.set_searchable_fields(vec!["name".into()]);
builder.execute(|_| ()).unwrap();
@ -614,7 +598,7 @@ mod tests {
// We change the searchable fields to be the "name" field only.
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.reset_searchable_fields();
builder.execute(|_| ()).unwrap();
wtxn.commit().unwrap();
@ -639,9 +623,13 @@ mod tests {
{ "name": "kevina", "age": 21 },
{ "name": "benoit", "age": 34 }
]);
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.enable_autogenerate_docids();
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config =
IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() };
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// Check that the displayed fields are correctly set to `None` (default value).
@ -664,12 +652,16 @@ mod tests {
{ "name": "kevina", "age": 21 },
{ "name": "benoit", "age": 34 }
]);
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.enable_autogenerate_docids();
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config =
IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() };
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
// In the same transaction we change the displayed fields to be only the age.
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_displayed_fields(vec!["age".into()]);
builder.execute(|_| ()).unwrap();
wtxn.commit().unwrap();
@ -682,7 +674,7 @@ mod tests {
// We reset the fields ids to become `None`, the default value.
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.reset_displayed_fields();
builder.execute(|_| ()).unwrap();
wtxn.commit().unwrap();
@ -700,9 +692,11 @@ mod tests {
options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path).unwrap();
let config = IndexerConfig::default();
// Set the filterable fields to be the age.
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_filterable_fields(hashset! { S("age") });
builder.execute(|_| ()).unwrap();
@ -712,9 +706,12 @@ mod tests {
{ "name": "kevina", "age": 21 },
{ "name": "benoit", "age": 34 }
]);
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.enable_autogenerate_docids();
builder.execute(content, |_| ()).unwrap();
let indexing_config =
IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() };
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// Check that the displayed fields are correctly set.
@ -749,9 +746,12 @@ mod tests {
{ "name": "benoit", "age": 35 }
]);
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.enable_autogenerate_docids();
builder.execute(content, |_| ()).unwrap();
let indexing_config =
IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() };
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap();
@ -771,10 +771,11 @@ mod tests {
let mut options = EnvOpenOptions::new();
options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path).unwrap();
let config = IndexerConfig::default();
// Set the filterable fields to be the age.
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
// Don't display the generated `id` field.
builder.set_displayed_fields(vec![S("name")]);
builder.set_criteria(vec![S("age:asc")]);
@ -786,9 +787,12 @@ mod tests {
{ "name": "kevina", "age": 21 },
{ "name": "benoit", "age": 34 }
]);
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.enable_autogenerate_docids();
builder.execute(content, |_| ()).unwrap();
let indexing_config =
IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() };
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// Run an empty query just to ensure that the search results are ordered.
@ -813,10 +817,11 @@ mod tests {
let mut options = EnvOpenOptions::new();
options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path).unwrap();
let config = IndexerConfig::default();
// Set the filterable fields to be the age.
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
// Don't display the generated `id` field.
builder.set_displayed_fields(vec![S("name"), S("age")]);
builder.set_distinct_field(S("age"));
@ -832,9 +837,12 @@ mod tests {
{ "name": "bernie", "age": 34 },
{ "name": "ben", "age": 34 }
]);
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.enable_autogenerate_docids();
builder.execute(content, |_| ()).unwrap();
let indexing_config =
IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() };
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// Run an empty query just to ensure that the search results are ordered.
@ -859,9 +867,13 @@ mod tests {
{ "name": "kevina", "age": 21 },
{ "name": "benoit", "age": 34 }
]);
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.enable_autogenerate_docids();
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config =
IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() };
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// Ensure there is no stop_words by default
@ -884,12 +896,16 @@ mod tests {
{ "name": "kevina", "age": 21, "maxim": "Doggos are the best" },
{ "name": "benoit", "age": 34, "maxim": "The crepes are really good" },
]);
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.enable_autogenerate_docids();
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config =
IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() };
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
// In the same transaction we provide some stop_words
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
let set = btreeset! { "i".to_string(), "the".to_string(), "are".to_string() };
builder.set_stop_words(set.clone());
builder.execute(|_| ()).unwrap();
@ -920,7 +936,7 @@ mod tests {
// now we'll reset the stop_words and ensure it's None
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.reset_stop_words();
builder.execute(|_| ()).unwrap();
wtxn.commit().unwrap();
@ -958,12 +974,16 @@ mod tests {
{ "name": "kevina", "age": 21, "maxim": "Doggos are the best"},
{ "name": "benoit", "age": 34, "maxim": "The crepes are really good"},
]);
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.enable_autogenerate_docids();
builder.execute(content, |_| ()).unwrap();
let config = IndexerConfig::default();
let indexing_config =
IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() };
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
// In the same transaction provide some synonyms
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_synonyms(hashmap! {
"blini".to_string() => vec!["crepes".to_string()],
"super like".to_string() => vec!["love".to_string()],
@ -987,7 +1007,7 @@ mod tests {
// Reset the synonyms
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.reset_synonyms();
builder.execute(|_| ()).unwrap();
wtxn.commit().unwrap();
@ -1012,10 +1032,11 @@ mod tests {
let mut options = EnvOpenOptions::new();
options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path).unwrap();
let config = IndexerConfig::default();
// Set all the settings except searchable
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_displayed_fields(vec!["hello".to_string()]);
builder.set_filterable_fields(hashset! { S("age"), S("toto") });
builder.set_criteria(vec!["toto:asc".to_string()]);
@ -1032,7 +1053,7 @@ mod tests {
// We set toto and age as searchable to force reordering of the fields
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_searchable_fields(vec!["toto".to_string(), "age".to_string()]);
builder.execute(|_| ()).unwrap();
wtxn.commit().unwrap();
@ -1049,10 +1070,11 @@ mod tests {
let mut options = EnvOpenOptions::new();
options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path).unwrap();
let config = IndexerConfig::default();
// Set all the settings except searchable
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_displayed_fields(vec!["hello".to_string()]);
// It is only Asc(toto), there is a facet database but it is denied to filter with toto.
builder.set_criteria(vec!["toto:asc".to_string()]);
@ -1070,10 +1092,11 @@ mod tests {
let mut options = EnvOpenOptions::new();
options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path).unwrap();
let config = IndexerConfig::default();
// Set the primary key settings
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_primary_key(S("mykey"));
builder.execute(|_| ()).unwrap();
@ -1089,14 +1112,17 @@ mod tests {
{ "mykey": 6, "name": "bernie", "age": 34 },
{ "mykey": 7, "name": "ben", "age": 34 }
]);
let mut builder = IndexDocuments::new(&mut wtxn, &index);
builder.disable_autogenerate_docids();
builder.execute(content, |_| ()).unwrap();
let indexing_config =
IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() };
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// We now try to reset the primary key
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.reset_primary_key();
let err = builder.execute(|_| ()).unwrap_err();
@ -1109,7 +1135,7 @@ mod tests {
builder.execute().unwrap();
// ...we can change the primary key
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_primary_key(S("myid"));
builder.execute(|_| ()).unwrap();
wtxn.commit().unwrap();
@ -1121,10 +1147,11 @@ mod tests {
let mut options = EnvOpenOptions::new();
options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path).unwrap();
let config = IndexerConfig::default();
// Set the genres setting
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_filterable_fields(hashset! { S("genres") });
builder.execute(|_| ()).unwrap();
@ -1147,8 +1174,12 @@ mod tests {
"release_date": 819676800
}
]);
let builder = IndexDocuments::new(&mut wtxn, &index);
builder.execute(content, |_| ()).unwrap();
let indexing_config =
IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() };
let mut builder =
IndexDocuments::new(&mut wtxn, &index, &config, indexing_config.clone(), |_| ());
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
// We now try to reset the primary key

View File

@ -1,130 +0,0 @@
use grenad::CompressionType;
use rayon::ThreadPool;
use super::{ClearDocuments, DeleteDocuments, Facets, IndexDocuments, Settings};
use crate::{Index, Result};
pub struct UpdateBuilder<'a> {
pub(crate) log_every_n: Option<usize>,
pub(crate) max_nb_chunks: Option<usize>,
pub(crate) documents_chunk_size: Option<usize>,
pub(crate) max_memory: Option<usize>,
pub(crate) chunk_compression_type: CompressionType,
pub(crate) chunk_compression_level: Option<u32>,
pub(crate) thread_pool: Option<&'a ThreadPool>,
pub(crate) max_positions_per_attributes: Option<u32>,
}
impl<'a> UpdateBuilder<'a> {
pub fn new() -> UpdateBuilder<'a> {
UpdateBuilder {
log_every_n: None,
max_nb_chunks: None,
documents_chunk_size: None,
max_memory: None,
chunk_compression_type: CompressionType::None,
chunk_compression_level: None,
thread_pool: None,
max_positions_per_attributes: None,
}
}
pub fn log_every_n(&mut self, log_every_n: usize) {
self.log_every_n = Some(log_every_n);
}
pub fn max_nb_chunks(&mut self, max_nb_chunks: usize) {
self.max_nb_chunks = Some(max_nb_chunks);
}
pub fn max_memory(&mut self, max_memory: usize) {
self.max_memory = Some(max_memory);
}
pub fn documents_chunk_size(&mut self, documents_chunk_size: usize) {
self.documents_chunk_size = Some(documents_chunk_size);
}
pub fn chunk_compression_type(&mut self, chunk_compression_type: CompressionType) {
self.chunk_compression_type = chunk_compression_type;
}
pub fn chunk_compression_level(&mut self, chunk_compression_level: u32) {
self.chunk_compression_level = Some(chunk_compression_level);
}
pub fn thread_pool(&mut self, thread_pool: &'a ThreadPool) {
self.thread_pool = Some(thread_pool);
}
pub fn max_positions_per_attributes(&mut self, max_positions_per_attributes: u32) {
self.max_positions_per_attributes = Some(max_positions_per_attributes);
}
pub fn clear_documents<'t, 'u, 'i>(
self,
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
) -> ClearDocuments<'t, 'u, 'i> {
ClearDocuments::new(wtxn, index)
}
pub fn delete_documents<'t, 'u, 'i>(
self,
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
) -> Result<DeleteDocuments<'t, 'u, 'i>> {
DeleteDocuments::new(wtxn, index)
}
pub fn index_documents<'t, 'u, 'i>(
self,
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
) -> IndexDocuments<'t, 'u, 'i, 'a> {
let mut builder = IndexDocuments::new(wtxn, index);
builder.log_every_n = self.log_every_n;
builder.max_nb_chunks = self.max_nb_chunks;
builder.max_memory = self.max_memory;
builder.documents_chunk_size = self.documents_chunk_size;
builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level;
builder.thread_pool = self.thread_pool;
builder.max_positions_per_attributes = self.max_positions_per_attributes;
builder
}
pub fn settings<'t, 'u, 'i>(
self,
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
) -> Settings<'a, 't, 'u, 'i> {
let mut builder = Settings::new(wtxn, index);
builder.log_every_n = self.log_every_n;
builder.max_nb_chunks = self.max_nb_chunks;
builder.max_memory = self.max_memory;
builder.documents_chunk_size = self.documents_chunk_size;
builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level;
builder.thread_pool = self.thread_pool;
builder.max_positions_per_attributes = self.max_positions_per_attributes;
builder
}
pub fn facets<'t, 'u, 'i>(
self,
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
) -> Facets<'t, 'u, 'i> {
let mut builder = Facets::new(wtxn, index);
builder.chunk_compression_type = self.chunk_compression_type;
builder.chunk_compression_level = self.chunk_compression_level;
builder
}
}

View File

@ -16,7 +16,8 @@ macro_rules! test_distinct {
// update distinct attribute
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let config = milli::update::IndexerConfig::default();
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_distinct_field(S(stringify!($distinct)));
builder.execute(|_| ()).unwrap();
wtxn.commit().unwrap();

View File

@ -7,7 +7,7 @@ use either::{Either, Left, Right};
use heed::EnvOpenOptions;
use maplit::{hashmap, hashset};
use milli::documents::{DocumentBatchBuilder, DocumentBatchReader};
use milli::update::{Settings, UpdateBuilder};
use milli::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig, Settings};
use milli::{AscDesc, Criterion, DocumentId, Index, Member};
use serde::Deserialize;
use slice_group_by::GroupBy;
@ -31,8 +31,9 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
let index = Index::new(options, &path).unwrap();
let mut wtxn = index.write_txn().unwrap();
let config = IndexerConfig::default();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
let criteria = criteria.iter().map(|c| c.to_string()).collect();
builder.set_criteria(criteria);
@ -54,10 +55,10 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
builder.execute(|_| ()).unwrap();
// index documents
let mut builder = UpdateBuilder::new();
builder.max_memory(10 * 1024 * 1024); // 10MiB
let mut builder = builder.index_documents(&mut wtxn, &index);
builder.enable_autogenerate_docids();
let config = IndexerConfig { max_memory: Some(10 * 1024 * 1024), ..Default::default() };
let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() };
let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
let mut cursor = Cursor::new(Vec::new());
let mut documents_builder = DocumentBatchBuilder::new(&mut cursor).unwrap();
let reader = Cursor::new(CONTENT.as_bytes());
@ -73,7 +74,8 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
// index documents
let content = DocumentBatchReader::from_reader(cursor).unwrap();
builder.execute(content, |_| ()).unwrap();
builder.add_documents(content).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();

View File

@ -6,7 +6,7 @@ use heed::EnvOpenOptions;
use itertools::Itertools;
use maplit::hashset;
use milli::documents::{DocumentBatchBuilder, DocumentBatchReader};
use milli::update::{Settings, UpdateBuilder};
use milli::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig, Settings};
use milli::{AscDesc, Criterion, Index, Member, Search, SearchResult};
use rand::Rng;
use Criterion::*;
@ -337,11 +337,12 @@ fn criteria_mixup() {
]
};
let config = IndexerConfig::default();
for criteria in criteria_mix {
eprintln!("Testing with criteria order: {:?}", &criteria);
//update criteria
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_criteria(criteria.iter().map(ToString::to_string).collect());
builder.execute(|_| ()).unwrap();
wtxn.commit().unwrap();
@ -375,8 +376,9 @@ fn criteria_ascdesc() {
let index = Index::new(options, &path).unwrap();
let mut wtxn = index.write_txn().unwrap();
let config = IndexerConfig::default();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_sortable_fields(hashset! {
S("name"),
@ -385,10 +387,9 @@ fn criteria_ascdesc() {
builder.execute(|_| ()).unwrap();
// index documents
let mut builder = UpdateBuilder::new();
builder.max_memory(10 * 1024 * 1024); // 10MiB
let mut builder = builder.index_documents(&mut wtxn, &index);
builder.enable_autogenerate_docids();
let config = IndexerConfig { max_memory: Some(10 * 1024 * 1024), ..Default::default() };
let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() };
let mut builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ());
let mut cursor = Cursor::new(Vec::new());
let mut batch_builder = DocumentBatchBuilder::new(&mut cursor).unwrap();
@ -419,7 +420,8 @@ fn criteria_ascdesc() {
let reader = DocumentBatchReader::from_reader(cursor).unwrap();
builder.execute(reader, |_| ()).unwrap();
builder.add_documents(reader).unwrap();
builder.execute().unwrap();
wtxn.commit().unwrap();
@ -430,7 +432,7 @@ fn criteria_ascdesc() {
eprintln!("Testing with criterion: {:?}", &criterion);
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index);
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_criteria(vec![criterion.to_string()]);
builder.execute(|_| ()).unwrap();
wtxn.commit().unwrap();