document batch support

reusable transform

rework update api

add indexer config

fix tests

review changes

Co-authored-by: Clément Renault <clement@meilisearch.com>

fmt
This commit is contained in:
Marin Postma 2021-12-08 14:12:07 +01:00 committed by mpostma
parent 74962b2fd9
commit 0c84a40298
18 changed files with 912 additions and 803 deletions

View file

@ -21,13 +21,14 @@ use heed::EnvOpenOptions;
use meilisearch_tokenizer::{Analyzer, AnalyzerConfig};
use milli::documents::DocumentBatchReader;
use milli::update::UpdateIndexingStep::*;
use milli::update::{IndexDocumentsMethod, Setting, UpdateBuilder};
use milli::update::{
ClearDocuments, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Setting,
};
use milli::{
obkv_to_json, CompressionType, Filter as MilliFilter, FilterCondition, Index, MatchingWords,
SearchResult, SortError,
};
use once_cell::sync::OnceCell;
use rayon::ThreadPool;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use structopt::StructOpt;
@ -44,7 +45,7 @@ use self::update_store::UpdateStore;
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
static GLOBAL_THREAD_POOL: OnceCell<ThreadPool> = OnceCell::new();
static GLOBAL_CONFIG: OnceCell<IndexerConfig> = OnceCell::new();
#[derive(Debug, StructOpt)]
/// The HTTP main server of the milli project.
@ -327,7 +328,19 @@ async fn main() -> anyhow::Result<()> {
// Setup the global thread pool
let jobs = opt.indexer.indexing_jobs.unwrap_or(0);
let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?;
GLOBAL_THREAD_POOL.set(pool).unwrap();
let config = IndexerConfig {
max_nb_chunks: opt.indexer.max_nb_chunks,
chunk_compression_level: opt.indexer.chunk_compression_level,
max_positions_per_attributes: opt.indexer.max_positions_per_attributes,
thread_pool: Some(pool),
log_every_n: Some(opt.indexer.log_every_n),
max_memory: Some(opt.indexer.max_memory.get_bytes() as usize),
chunk_compression_type: opt.indexer.chunk_compression_type.unwrap_or(CompressionType::None),
..Default::default()
};
GLOBAL_CONFIG.set(config).unwrap();
// Open the LMDB database.
let index = Index::new(options, &opt.database)?;
@ -342,209 +355,207 @@ async fn main() -> anyhow::Result<()> {
let (update_status_sender, _) = broadcast::channel(100);
let update_status_sender_cloned = update_status_sender.clone();
let index_cloned = index.clone();
let indexer_opt_cloned = opt.indexer.clone();
let update_store = UpdateStore::open(
update_store_options,
update_store_path,
// the type hint is necessary: https://github.com/rust-lang/rust/issues/32600
move |update_id, meta, content: &_| {
// We prepare the update by using the update builder.
let mut update_builder = UpdateBuilder::new();
if let Some(max_nb_chunks) = indexer_opt_cloned.max_nb_chunks {
update_builder.max_nb_chunks(max_nb_chunks);
}
if let Some(chunk_compression_level) = indexer_opt_cloned.chunk_compression_level {
update_builder.chunk_compression_level(chunk_compression_level);
}
if let Some(max_pos_per_attributes) = indexer_opt_cloned.max_positions_per_attributes {
update_builder.max_positions_per_attributes(max_pos_per_attributes);
}
update_builder.thread_pool(GLOBAL_THREAD_POOL.get().unwrap());
update_builder.log_every_n(indexer_opt_cloned.log_every_n);
update_builder.max_memory(indexer_opt_cloned.max_memory.get_bytes() as usize);
update_builder.chunk_compression_type(
indexer_opt_cloned.chunk_compression_type.unwrap_or(CompressionType::None),
);
let before_update = Instant::now();
// we extract the update type and execute the update itself.
let result: anyhow::Result<()> =
(|| match meta {
UpdateMeta::DocumentsAddition { method, format, encoding } => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let mut builder = update_builder.index_documents(&mut wtxn, &index_cloned);
builder.enable_autogenerate_docids();
let result: anyhow::Result<()> = (|| match meta {
UpdateMeta::DocumentsAddition { method, format, encoding } => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let update_method = match method.as_str() {
"replace" => IndexDocumentsMethod::ReplaceDocuments,
"update" => IndexDocumentsMethod::UpdateDocuments,
otherwise => panic!("invalid indexing method {:?}", otherwise),
};
let indexing_config = IndexDocumentsConfig {
update_method,
autogenerate_docids: true,
..Default::default()
};
match method.as_str() {
"replace" => builder
.index_documents_method(IndexDocumentsMethod::ReplaceDocuments),
"update" => builder
.index_documents_method(IndexDocumentsMethod::UpdateDocuments),
otherwise => panic!("invalid indexing method {:?}", otherwise),
let indexing_callback = |indexing_step| {
let (current, total) = match indexing_step {
RemapDocumentAddition { documents_seen } => (documents_seen, None),
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => {
(documents_seen, Some(total_documents))
}
IndexDocuments { documents_seen, total_documents } => {
(documents_seen, Some(total_documents))
}
MergeDataIntoFinalDatabase { databases_seen, total_databases } => {
(databases_seen, Some(total_databases))
}
};
let reader = match encoding.as_deref() {
Some("gzip") => Box::new(GzDecoder::new(content)),
None => Box::new(content) as Box<dyn io::Read>,
otherwise => panic!("invalid encoding format {:?}", otherwise),
};
let documents = match format.as_str() {
"csv" => documents_from_csv(reader)?,
"json" => documents_from_json(reader)?,
"jsonl" => documents_from_jsonl(reader)?,
otherwise => panic!("invalid update format {:?}", otherwise),
};
let documents = DocumentBatchReader::from_reader(Cursor::new(documents))?;
let result = builder.execute(documents, |indexing_step| {
let (current, total) = match indexing_step {
RemapDocumentAddition { documents_seen } => (documents_seen, None),
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => {
(documents_seen, Some(total_documents))
}
IndexDocuments { documents_seen, total_documents } => {
(documents_seen, Some(total_documents))
}
MergeDataIntoFinalDatabase { databases_seen, total_databases } => {
(databases_seen, Some(total_databases))
}
};
let _ = update_status_sender_cloned.send(UpdateStatus::Progressing {
update_id,
meta: UpdateMetaProgress::DocumentsAddition {
step: indexing_step.step(),
total_steps: indexing_step.number_of_steps(),
current,
total,
},
});
let _ = update_status_sender_cloned.send(UpdateStatus::Progressing {
update_id,
meta: UpdateMetaProgress::DocumentsAddition {
step: indexing_step.step(),
total_steps: indexing_step.number_of_steps(),
current,
total,
},
});
};
match result {
Ok(_) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
let mut builder = milli::update::IndexDocuments::new(
&mut wtxn,
&index_cloned,
GLOBAL_CONFIG.get().unwrap(),
indexing_config,
indexing_callback,
);
let reader = match encoding.as_deref() {
Some("gzip") => Box::new(GzDecoder::new(content)),
None => Box::new(content) as Box<dyn io::Read>,
otherwise => panic!("invalid encoding format {:?}", otherwise),
};
let documents = match format.as_str() {
"csv" => documents_from_csv(reader)?,
"json" => documents_from_json(reader)?,
"jsonl" => documents_from_jsonl(reader)?,
otherwise => panic!("invalid update format {:?}", otherwise),
};
let documents = DocumentBatchReader::from_reader(Cursor::new(documents))?;
builder.add_documents(documents)?;
let result = builder.execute();
match result {
Ok(_) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
UpdateMeta::ClearDocuments => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let builder = update_builder.clear_documents(&mut wtxn, &index_cloned);
}
UpdateMeta::ClearDocuments => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let builder = ClearDocuments::new(&mut wtxn, &index_cloned);
match builder.execute() {
Ok(_count) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
match builder.execute() {
Ok(_count) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
UpdateMeta::Settings(settings) => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let mut builder = update_builder.settings(&mut wtxn, &index_cloned);
}
UpdateMeta::Settings(settings) => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let mut builder = milli::update::Settings::new(
&mut wtxn,
&index_cloned,
GLOBAL_CONFIG.get().unwrap(),
);
// We transpose the settings JSON struct into a real setting update.
match settings.searchable_attributes {
Setting::Set(searchable_attributes) => {
builder.set_searchable_fields(searchable_attributes)
// We transpose the settings JSON struct into a real setting update.
match settings.searchable_attributes {
Setting::Set(searchable_attributes) => {
builder.set_searchable_fields(searchable_attributes)
}
Setting::Reset => builder.reset_searchable_fields(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.displayed_attributes {
Setting::Set(displayed_attributes) => {
builder.set_displayed_fields(displayed_attributes)
}
Setting::Reset => builder.reset_displayed_fields(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.filterable_attributes {
Setting::Set(filterable_attributes) => {
builder.set_filterable_fields(filterable_attributes)
}
Setting::Reset => builder.reset_filterable_fields(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.sortable_attributes {
Setting::Set(sortable_attributes) => {
builder.set_sortable_fields(sortable_attributes)
}
Setting::Reset => builder.reset_sortable_fields(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.criteria {
Setting::Set(criteria) => builder.set_criteria(criteria),
Setting::Reset => builder.reset_criteria(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.stop_words {
Setting::Set(stop_words) => builder.set_stop_words(stop_words),
Setting::Reset => builder.reset_stop_words(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.synonyms {
Setting::Set(synonyms) => builder.set_synonyms(synonyms),
Setting::Reset => builder.reset_synonyms(),
Setting::NotSet => (),
}
let result = builder.execute(|indexing_step| {
let (current, total) = match indexing_step {
RemapDocumentAddition { documents_seen } => (documents_seen, None),
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => {
(documents_seen, Some(total_documents))
}
Setting::Reset => builder.reset_searchable_fields(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.displayed_attributes {
Setting::Set(displayed_attributes) => {
builder.set_displayed_fields(displayed_attributes)
IndexDocuments { documents_seen, total_documents } => {
(documents_seen, Some(total_documents))
}
Setting::Reset => builder.reset_displayed_fields(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.filterable_attributes {
Setting::Set(filterable_attributes) => {
builder.set_filterable_fields(filterable_attributes)
MergeDataIntoFinalDatabase { databases_seen, total_databases } => {
(databases_seen, Some(total_databases))
}
Setting::Reset => builder.reset_filterable_fields(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.sortable_attributes {
Setting::Set(sortable_attributes) => {
builder.set_sortable_fields(sortable_attributes)
}
Setting::Reset => builder.reset_sortable_fields(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.criteria {
Setting::Set(criteria) => builder.set_criteria(criteria),
Setting::Reset => builder.reset_criteria(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.stop_words {
Setting::Set(stop_words) => builder.set_stop_words(stop_words),
Setting::Reset => builder.reset_stop_words(),
Setting::NotSet => (),
}
// We transpose the settings JSON struct into a real setting update.
match settings.synonyms {
Setting::Set(synonyms) => builder.set_synonyms(synonyms),
Setting::Reset => builder.reset_synonyms(),
Setting::NotSet => (),
}
let result = builder.execute(|indexing_step| {
let (current, total) = match indexing_step {
RemapDocumentAddition { documents_seen } => (documents_seen, None),
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => {
(documents_seen, Some(total_documents))
}
IndexDocuments { documents_seen, total_documents } => {
(documents_seen, Some(total_documents))
}
MergeDataIntoFinalDatabase { databases_seen, total_databases } => {
(databases_seen, Some(total_databases))
}
};
let _ = update_status_sender_cloned.send(UpdateStatus::Progressing {
update_id,
meta: UpdateMetaProgress::DocumentsAddition {
step: indexing_step.step(),
total_steps: indexing_step.number_of_steps(),
current,
total,
},
});
};
let _ = update_status_sender_cloned.send(UpdateStatus::Progressing {
update_id,
meta: UpdateMetaProgress::DocumentsAddition {
step: indexing_step.step(),
total_steps: indexing_step.number_of_steps(),
current,
total,
},
});
});
match result {
Ok(_count) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
match result {
Ok(_count) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
UpdateMeta::Facets(levels) => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let mut builder = update_builder.facets(&mut wtxn, &index_cloned);
if let Some(value) = levels.level_group_size {
builder.level_group_size(value);
}
if let Some(value) = levels.min_level_size {
builder.min_level_size(value);
}
match builder.execute() {
Ok(()) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
}
UpdateMeta::Facets(levels) => {
// We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?;
let mut builder = milli::update::Facets::new(&mut wtxn, &index_cloned);
if let Some(value) = levels.level_group_size {
builder.level_group_size(value);
}
})();
if let Some(value) = levels.min_level_size {
builder.min_level_size(value);
}
match builder.execute() {
Ok(()) => wtxn.commit().map_err(Into::into),
Err(e) => Err(e.into()),
}
}
})();
let meta = match result {
Ok(()) => {