From b7d55763473a6762d473790b16ea322023f5c8ed Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Sat, 1 Mar 2025 23:46:04 +0100 Subject: [PATCH] benchmarks and fuzzers --- crates/benchmarks/benches/indexing.rs | 1049 ++++++----------------- crates/benchmarks/benches/utils.rs | 81 +- crates/fuzzers/src/bin/fuzz-indexing.rs | 9 + 3 files changed, 330 insertions(+), 809 deletions(-) diff --git a/crates/benchmarks/benches/indexing.rs b/crates/benchmarks/benches/indexing.rs index 7c1783a1a..85e277eff 100644 --- a/crates/benchmarks/benches/indexing.rs +++ b/crates/benchmarks/benches/indexing.rs @@ -1,17 +1,15 @@ mod datasets_paths; mod utils; +use utils::{index_delete_documents, index_documents}; use std::fs::{create_dir_all, remove_dir_all}; use std::path::Path; -use bumpalo::Bump; use criterion::{criterion_group, criterion_main, Criterion}; use milli::documents::PrimaryKey; use milli::heed::{EnvOpenOptions, RwTxn}; -use milli::progress::Progress; use milli::update::new::indexer; use milli::update::{IndexDocumentsMethod, IndexerConfig, Settings}; -use milli::vector::EmbeddingConfigs; use milli::Index; use rand::seq::SliceRandom; use rand_chacha::rand_core::SeedableRng; @@ -136,40 +134,22 @@ fn indexing_songs_default(c: &mut Criterion) { let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); indexer.add_documents(&documents).unwrap(); - let indexer_alloc = Bump::new(); - let (document_changes, _operation_stats, primary_key) = indexer - .into_changes( - &indexer_alloc, - &index, - &rtxn, - None, - &mut new_fields_ids_map, - &|| false, - Progress::default(), - ) - .unwrap(); - - indexer::index( - &mut wtxn, + index_documents( + indexer, &index, - &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), - config.grenad_parameters(), - &db_fields_ids_map, + &rtxn, new_fields_ids_map, - primary_key, - &document_changes, - EmbeddingConfigs::default(), - &|| false, - &Progress::default(), - ) - .unwrap(); + &mut wtxn, + config, + db_fields_ids_map, + ); wtxn.commit().unwrap(); drop(rtxn); @@ -203,40 +183,22 @@ fn reindexing_songs_default(c: &mut Criterion) { let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); indexer.add_documents(&documents).unwrap(); - let indexer_alloc = Bump::new(); - let (document_changes, _operation_stats, primary_key) = indexer - .into_changes( - &indexer_alloc, - &index, - &rtxn, - None, - &mut new_fields_ids_map, - &|| false, - Progress::default(), - ) - .unwrap(); - - indexer::index( - &mut wtxn, + index_documents( + indexer, &index, - &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), - config.grenad_parameters(), - &db_fields_ids_map, + &rtxn, new_fields_ids_map, - primary_key, - &document_changes, - EmbeddingConfigs::default(), - &|| false, - &Progress::default(), - ) - .unwrap(); + &mut wtxn, + config, + db_fields_ids_map, + ); wtxn.commit().unwrap(); drop(rtxn); @@ -248,40 +210,21 @@ fn reindexing_songs_default(c: &mut Criterion) { let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); indexer.add_documents(&documents).unwrap(); - - let indexer_alloc = Bump::new(); - let (document_changes, _operation_stats, primary_key) = indexer - .into_changes( - &indexer_alloc, - &index, - &rtxn, - None, - &mut new_fields_ids_map, - &|| false, - Progress::default(), - ) - .unwrap(); - - indexer::index( - &mut wtxn, + index_documents( + indexer, &index, - &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), - config.grenad_parameters(), - &db_fields_ids_map, + &rtxn, new_fields_ids_map, - primary_key, - &document_changes, - EmbeddingConfigs::default(), - &|| false, - &Progress::default(), - ) - .unwrap(); + &mut wtxn, + config, + db_fields_ids_map, + ); wtxn.commit().unwrap(); drop(rtxn); @@ -317,40 +260,21 @@ fn deleting_songs_in_batches_default(c: &mut Criterion) { let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); indexer.add_documents(&documents).unwrap(); - - let indexer_alloc = Bump::new(); - let (document_changes, _operation_stats, primary_key) = indexer - .into_changes( - &indexer_alloc, - &index, - &rtxn, - None, - &mut new_fields_ids_map, - &|| false, - Progress::default(), - ) - .unwrap(); - - indexer::index( - &mut wtxn, + index_documents( + indexer, &index, - &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), - config.grenad_parameters(), - &db_fields_ids_map, + &rtxn, new_fields_ids_map, - primary_key, - &document_changes, - EmbeddingConfigs::default(), - &|| false, - &Progress::default(), - ) - .unwrap(); + &mut wtxn, + config, + db_fields_ids_map, + ); wtxn.commit().unwrap(); drop(rtxn); @@ -394,40 +318,22 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) { let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = utils::documents_from(datasets_paths::SMOL_SONGS_1_2, "csv"); indexer.add_documents(&documents).unwrap(); - let indexer_alloc = Bump::new(); - let (document_changes, _operation_stats, primary_key) = indexer - .into_changes( - &indexer_alloc, - &index, - &rtxn, - None, - &mut new_fields_ids_map, - &|| false, - Progress::default(), - ) - .unwrap(); - - indexer::index( - &mut wtxn, + index_documents( + indexer, &index, - &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), - config.grenad_parameters(), - &db_fields_ids_map, + &rtxn, new_fields_ids_map, - primary_key, - &document_changes, - EmbeddingConfigs::default(), - &|| false, - &Progress::default(), - ) - .unwrap(); + &mut wtxn, + config, + db_fields_ids_map, + ); wtxn.commit().unwrap(); drop(rtxn); @@ -439,40 +345,22 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) { let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = utils::documents_from(datasets_paths::SMOL_SONGS_3_4, "csv"); indexer.add_documents(&documents).unwrap(); - let indexer_alloc = Bump::new(); - let (document_changes, _operation_stats, primary_key) = indexer - .into_changes( - &indexer_alloc, - &index, - &rtxn, - None, - &mut new_fields_ids_map, - &|| false, - Progress::default(), - ) - .unwrap(); - - indexer::index( - &mut wtxn, + index_documents( + indexer, &index, - &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), - config.grenad_parameters(), - &db_fields_ids_map, + &rtxn, new_fields_ids_map, - primary_key, - &document_changes, - EmbeddingConfigs::default(), - &|| false, - &Progress::default(), - ) - .unwrap(); + &mut wtxn, + config, + db_fields_ids_map, + ); wtxn.commit().unwrap(); drop(rtxn); @@ -480,40 +368,23 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) { let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = utils::documents_from(datasets_paths::SMOL_SONGS_4_4, "csv"); indexer.add_documents(&documents).unwrap(); + let config = IndexerConfig::default(); - let indexer_alloc = Bump::new(); - let (document_changes, _operation_stats, primary_key) = indexer - .into_changes( - &indexer_alloc, - &index, - &rtxn, - None, - &mut new_fields_ids_map, - &|| false, - Progress::default(), - ) - .unwrap(); - - indexer::index( - &mut wtxn, + index_documents( + indexer, &index, - &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), - config.grenad_parameters(), - &db_fields_ids_map, + &rtxn, new_fields_ids_map, - primary_key, - &document_changes, - EmbeddingConfigs::default(), - &|| false, - &Progress::default(), - ) - .unwrap(); + &mut wtxn, + config, + db_fields_ids_map, + ); wtxn.commit().unwrap(); drop(rtxn); @@ -547,7 +418,7 @@ fn indexing_songs_without_faceted_numbers(c: &mut Criterion) { let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); @@ -555,33 +426,15 @@ fn indexing_songs_without_faceted_numbers(c: &mut Criterion) { indexer.add_documents(&documents).unwrap(); - let indexer_alloc = Bump::new(); - let (document_changes, _operation_stats, primary_key) = indexer - .into_changes( - &indexer_alloc, - &index, - &rtxn, - None, - &mut new_fields_ids_map, - &|| false, - Progress::default(), - ) - .unwrap(); - - indexer::index( - &mut wtxn, + index_documents( + indexer, &index, - &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), - config.grenad_parameters(), - &db_fields_ids_map, + &rtxn, new_fields_ids_map, - primary_key, - &document_changes, - EmbeddingConfigs::default(), - &|| false, - &Progress::default(), - ) - .unwrap(); + &mut wtxn, + config, + db_fields_ids_map, + ); wtxn.commit().unwrap(); drop(rtxn); @@ -615,40 +468,22 @@ fn indexing_songs_without_faceted_fields(c: &mut Criterion) { let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); indexer.add_documents(&documents).unwrap(); - let indexer_alloc = Bump::new(); - let (document_changes, _operation_stats, primary_key) = indexer - .into_changes( - &indexer_alloc, - &index, - &rtxn, - None, - &mut new_fields_ids_map, - &|| false, - Progress::default(), - ) - .unwrap(); - - indexer::index( - &mut wtxn, + index_documents( + indexer, &index, - &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), - config.grenad_parameters(), - &db_fields_ids_map, + &rtxn, new_fields_ids_map, - primary_key, - &document_changes, - EmbeddingConfigs::default(), - &|| false, - &Progress::default(), - ) - .unwrap(); + &mut wtxn, + config, + db_fields_ids_map, + ); wtxn.commit().unwrap(); drop(rtxn); @@ -682,40 +517,22 @@ fn indexing_wiki(c: &mut Criterion) { let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES, "csv"); indexer.add_documents(&documents).unwrap(); - let indexer_alloc = Bump::new(); - let (document_changes, _operation_stats, primary_key) = indexer - .into_changes( - &indexer_alloc, - &index, - &rtxn, - None, - &mut new_fields_ids_map, - &|| false, - Progress::default(), - ) - .unwrap(); - - indexer::index( - &mut wtxn, + index_documents( + indexer, &index, - &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), - config.grenad_parameters(), - &db_fields_ids_map, + &rtxn, new_fields_ids_map, - primary_key, - &document_changes, - EmbeddingConfigs::default(), - &|| false, - &Progress::default(), - ) - .unwrap(); + &mut wtxn, + config, + db_fields_ids_map, + ); wtxn.commit().unwrap(); drop(rtxn); @@ -748,40 +565,22 @@ fn reindexing_wiki(c: &mut Criterion) { let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES, "csv"); indexer.add_documents(&documents).unwrap(); - let indexer_alloc = Bump::new(); - let (document_changes, _operation_stats, primary_key) = indexer - .into_changes( - &indexer_alloc, - &index, - &rtxn, - None, - &mut new_fields_ids_map, - &|| false, - Progress::default(), - ) - .unwrap(); - - indexer::index( - &mut wtxn, + index_documents( + indexer, &index, - &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), - config.grenad_parameters(), - &db_fields_ids_map, + &rtxn, new_fields_ids_map, - primary_key, - &document_changes, - EmbeddingConfigs::default(), - &|| false, - &Progress::default(), - ) - .unwrap(); + &mut wtxn, + config, + db_fields_ids_map, + ); wtxn.commit().unwrap(); drop(rtxn); @@ -793,40 +592,22 @@ fn reindexing_wiki(c: &mut Criterion) { let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES, "csv"); indexer.add_documents(&documents).unwrap(); - let indexer_alloc = Bump::new(); - let (document_changes, _operation_stats, primary_key) = indexer - .into_changes( - &indexer_alloc, - &index, - &rtxn, - None, - &mut new_fields_ids_map, - &|| false, - Progress::default(), - ) - .unwrap(); - - indexer::index( - &mut wtxn, + index_documents( + indexer, &index, - &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), - config.grenad_parameters(), - &db_fields_ids_map, + &rtxn, new_fields_ids_map, - primary_key, - &document_changes, - EmbeddingConfigs::default(), - &|| false, - &Progress::default(), - ) - .unwrap(); + &mut wtxn, + config, + db_fields_ids_map, + ); wtxn.commit().unwrap(); drop(rtxn); @@ -861,40 +642,22 @@ fn deleting_wiki_in_batches_default(c: &mut Criterion) { let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES, "csv"); indexer.add_documents(&documents).unwrap(); - let indexer_alloc = Bump::new(); - let (document_changes, _operation_stats, primary_key) = indexer - .into_changes( - &indexer_alloc, - &index, - &rtxn, - None, - &mut new_fields_ids_map, - &|| false, - Progress::default(), - ) - .unwrap(); - - indexer::index( - &mut wtxn, + index_documents( + indexer, &index, - &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), - config.grenad_parameters(), - &db_fields_ids_map, + &rtxn, new_fields_ids_map, - primary_key, - &document_changes, - EmbeddingConfigs::default(), - &|| false, - &Progress::default(), - ) - .unwrap(); + &mut wtxn, + config, + db_fields_ids_map, + ); wtxn.commit().unwrap(); drop(rtxn); @@ -937,7 +700,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); @@ -945,33 +708,15 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES_1_2, "csv"); indexer.add_documents(&documents).unwrap(); - let indexer_alloc = Bump::new(); - let (document_changes, _operation_stats, primary_key) = indexer - .into_changes( - &indexer_alloc, - &index, - &rtxn, - None, - &mut new_fields_ids_map, - &|| false, - Progress::default(), - ) - .unwrap(); - - indexer::index( - &mut wtxn, + index_documents( + indexer, &index, - &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), - config.grenad_parameters(), - &db_fields_ids_map, + &rtxn, new_fields_ids_map, - primary_key, - &document_changes, - EmbeddingConfigs::default(), - &|| false, - &Progress::default(), - ) - .unwrap(); + &mut wtxn, + config, + db_fields_ids_map, + ); wtxn.commit().unwrap(); drop(rtxn); @@ -983,7 +728,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); @@ -991,41 +736,24 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES_3_4, "csv"); indexer.add_documents(&documents).unwrap(); - let indexer_alloc = Bump::new(); - let (document_changes, _operation_stats, primary_key) = indexer - .into_changes( - &indexer_alloc, - &index, - &rtxn, - None, - &mut new_fields_ids_map, - &|| false, - Progress::default(), - ) - .unwrap(); - - indexer::index( - &mut wtxn, + index_documents( + indexer, &index, - &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), - config.grenad_parameters(), - &db_fields_ids_map, + &rtxn, new_fields_ids_map, - primary_key, - &document_changes, - EmbeddingConfigs::default(), - &|| false, - &Progress::default(), - ) - .unwrap(); + &mut wtxn, + config, + db_fields_ids_map, + ); wtxn.commit().unwrap(); drop(rtxn); + let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); @@ -1033,33 +761,15 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES_4_4, "csv"); indexer.add_documents(&documents).unwrap(); - let indexer_alloc = Bump::new(); - let (document_changes, _operation_stats, primary_key) = indexer - .into_changes( - &indexer_alloc, - &index, - &rtxn, - None, - &mut new_fields_ids_map, - &|| false, - Progress::default(), - ) - .unwrap(); - - indexer::index( - &mut wtxn, + index_documents( + indexer, &index, - &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), - config.grenad_parameters(), - &db_fields_ids_map, + &rtxn, new_fields_ids_map, - primary_key, - &document_changes, - EmbeddingConfigs::default(), - &|| false, - &Progress::default(), - ) - .unwrap(); + &mut wtxn, + config, + db_fields_ids_map, + ); wtxn.commit().unwrap(); drop(rtxn); @@ -1093,40 +803,22 @@ fn indexing_movies_default(c: &mut Criterion) { let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = utils::documents_from(datasets_paths::MOVIES, "json"); indexer.add_documents(&documents).unwrap(); - let indexer_alloc = Bump::new(); - let (document_changes, _operation_stats, primary_key) = indexer - .into_changes( - &indexer_alloc, - &index, - &rtxn, - None, - &mut new_fields_ids_map, - &|| false, - Progress::default(), - ) - .unwrap(); - - indexer::index( - &mut wtxn, + index_documents( + indexer, &index, - &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), - config.grenad_parameters(), - &db_fields_ids_map, + &rtxn, new_fields_ids_map, - primary_key, - &document_changes, - EmbeddingConfigs::default(), - &|| false, - &Progress::default(), - ) - .unwrap(); + &mut wtxn, + config, + db_fields_ids_map, + ); wtxn.commit().unwrap(); drop(rtxn); @@ -1159,40 +851,22 @@ fn reindexing_movies_default(c: &mut Criterion) { let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = utils::documents_from(datasets_paths::MOVIES, "json"); indexer.add_documents(&documents).unwrap(); - let indexer_alloc = Bump::new(); - let (document_changes, _operation_stats, primary_key) = indexer - .into_changes( - &indexer_alloc, - &index, - &rtxn, - None, - &mut new_fields_ids_map, - &|| false, - Progress::default(), - ) - .unwrap(); - - indexer::index( - &mut wtxn, + index_documents( + indexer, &index, - &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), - config.grenad_parameters(), - &db_fields_ids_map, + &rtxn, new_fields_ids_map, - primary_key, - &document_changes, - EmbeddingConfigs::default(), - &|| false, - &Progress::default(), - ) - .unwrap(); + &mut wtxn, + config, + db_fields_ids_map, + ); wtxn.commit().unwrap(); drop(rtxn); @@ -1204,40 +878,22 @@ fn reindexing_movies_default(c: &mut Criterion) { let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = utils::documents_from(datasets_paths::MOVIES, "json"); indexer.add_documents(&documents).unwrap(); - let indexer_alloc = Bump::new(); - let (document_changes, _operation_stats, primary_key) = indexer - .into_changes( - &indexer_alloc, - &index, - &rtxn, - None, - &mut new_fields_ids_map, - &|| false, - Progress::default(), - ) - .unwrap(); - - indexer::index( - &mut wtxn, + index_documents( + indexer, &index, - &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), - config.grenad_parameters(), - &db_fields_ids_map, + &rtxn, new_fields_ids_map, - primary_key, - &document_changes, - EmbeddingConfigs::default(), - &|| false, - &Progress::default(), - ) - .unwrap(); + &mut wtxn, + config, + db_fields_ids_map, + ); wtxn.commit().unwrap(); drop(rtxn); @@ -1272,40 +928,22 @@ fn deleting_movies_in_batches_default(c: &mut Criterion) { let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); let documents = utils::documents_from(datasets_paths::MOVIES, "json"); indexer.add_documents(&documents).unwrap(); - let indexer_alloc = Bump::new(); - let (document_changes, _operation_stats, primary_key) = indexer - .into_changes( - &indexer_alloc, - &index, - &rtxn, - None, - &mut new_fields_ids_map, - &|| false, - Progress::default(), - ) - .unwrap(); - - indexer::index( - &mut wtxn, + index_documents( + indexer, &index, - &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), - config.grenad_parameters(), - &db_fields_ids_map, + &rtxn, new_fields_ids_map, - primary_key, - &document_changes, - EmbeddingConfigs::default(), - &|| false, - &Progress::default(), - ) - .unwrap(); + &mut wtxn, + config, + db_fields_ids_map, + ); wtxn.commit().unwrap(); drop(rtxn); @@ -1337,23 +975,15 @@ fn delete_documents_from_ids(index: Index, document_ids_to_delete: Vec Index { let mut wtxn = index.write_txn().unwrap(); let rtxn = index.read_txn().unwrap(); let db_fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); - let mut new_fields_ids_map = db_fields_ids_map.clone(); + let new_fields_ids_map = db_fields_ids_map.clone(); let documents = documents_from(conf.dataset, conf.dataset_format); let mut indexer = indexer::DocumentOperation::new(IndexDocumentsMethod::ReplaceDocuments); indexer.add_documents(&documents).unwrap(); + index_documents( + indexer, + &index, + &rtxn, + new_fields_ids_map, + &mut wtxn, + config, + db_fields_ids_map, + ); + + wtxn.commit().unwrap(); + drop(rtxn); + + index +} + +pub fn index_documents( + indexer: indexer::DocumentOperation, + index: &Index, + rtxn: &milli::heed::RoTxn, + mut new_fields_ids_map: milli::FieldsIdsMap, + wtxn: &mut RwTxn, + config: IndexerConfig, + db_fields_ids_map: milli::FieldsIdsMap, +) { let indexer_alloc = Bump::new(); + let thread_count = + std::thread::available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap()); + let thread_pool = scoped_thread_pool::ThreadPool::new(thread_count, "index".into()); let (document_changes, _operation_stats, primary_key) = indexer .into_changes( &indexer_alloc, - &index, - &rtxn, + index, + rtxn, None, &mut new_fields_ids_map, &|| false, Progress::default(), + &thread_pool, + CHUNK_SIZE, ) .unwrap(); indexer::index( - &mut wtxn, - &index, + wtxn, + index, + &thread_pool, &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), config.grenad_parameters(), &db_fields_ids_map, @@ -129,11 +163,38 @@ pub fn base_setup(conf: &Conf) -> Index { &Progress::default(), ) .unwrap(); +} - wtxn.commit().unwrap(); - drop(rtxn); - - index +pub fn index_delete_documents( + indexer: indexer::DocumentDeletion, + primary_key: PrimaryKey, + wtxn: &mut RwTxn, + index: &Index, + config: &IndexerConfig, + db_fields_ids_map: milli::FieldsIdsMap, + new_fields_ids_map: milli::FieldsIdsMap, +) { + let indexer_alloc = Bump::new(); + let thread_count = + std::thread::available_parallelism().unwrap_or(NonZeroUsize::new(1).unwrap()); + let thread_pool = scoped_thread_pool::ThreadPool::new(thread_count, "index".into()); + let document_changes = + indexer.into_changes(&indexer_alloc, primary_key, &thread_pool, CHUNK_SIZE); + indexer::index( + wtxn, + index, + &thread_pool, + &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), + config.grenad_parameters(), + &db_fields_ids_map, + new_fields_ids_map, + Some(primary_key), + &document_changes, + EmbeddingConfigs::default(), + &|| false, + &Progress::default(), + ) + .unwrap(); } pub fn run_benches(c: &mut criterion::Criterion, confs: &[Conf]) { diff --git a/crates/fuzzers/src/bin/fuzz-indexing.rs b/crates/fuzzers/src/bin/fuzz-indexing.rs index 1216083ca..612d51d74 100644 --- a/crates/fuzzers/src/bin/fuzz-indexing.rs +++ b/crates/fuzzers/src/bin/fuzz-indexing.rs @@ -12,6 +12,7 @@ use milli::documents::mmap_from_objects; use milli::heed::EnvOpenOptions; use milli::progress::Progress; use milli::update::new::indexer; +use milli::update::new::indexer::document_changes::CHUNK_SIZE; use milli::update::{IndexDocumentsMethod, IndexerConfig}; use milli::vector::EmbeddingConfigs; use milli::Index; @@ -121,6 +122,11 @@ fn main() { } } + let thread_pool = + scoped_thread_pool::ThreadPool::with_available_parallelism( + "index".into(), + ); + let (document_changes, _operation_stats, primary_key) = indexer .into_changes( &indexer_alloc, @@ -130,12 +136,15 @@ fn main() { &mut new_fields_ids_map, &|| false, Progress::default(), + &thread_pool, + CHUNK_SIZE, ) .unwrap(); indexer::index( &mut wtxn, &index, + &thread_pool, &milli::ThreadPoolNoAbortBuilder::new().build().unwrap(), indexer_config.grenad_parameters(), &db_fields_ids_map,