diff --git a/benchmarks/benches/indexing.rs b/benchmarks/benches/indexing.rs index a409e1343..d567b3da1 100644 --- a/benchmarks/benches/indexing.rs +++ b/benchmarks/benches/indexing.rs @@ -59,7 +59,7 @@ fn setup_settings<'t>( let sortable_fields = sortable_fields.iter().map(|s| s.to_string()).collect(); builder.set_sortable_fields(sortable_fields); - builder.execute(|_| ()).unwrap(); + builder.execute(|_| (), || false).unwrap(); } fn setup_index_with_settings<'t>( @@ -131,9 +131,15 @@ fn indexing_songs_default(c: &mut Criterion) { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); let (builder, user_error) = builder.add_documents(documents).unwrap(); @@ -169,9 +175,15 @@ fn reindexing_songs_default(c: &mut Criterion) { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); let (builder, user_error) = builder.add_documents(documents).unwrap(); @@ -185,9 +197,15 @@ fn reindexing_songs_default(c: &mut Criterion) { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); let (builder, user_error) = builder.add_documents(documents).unwrap(); @@ -225,9 +243,15 @@ fn deleting_songs_in_batches_default(c: &mut Criterion) { let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); let indexing_config = IndexDocumentsConfig::default(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); let (builder, user_error) = builder.add_documents(documents).unwrap(); user_error.unwrap(); @@ -282,9 +306,15 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) { let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); let indexing_config = IndexDocumentsConfig::default(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_SONGS_1_2, "csv"); let (builder, user_error) = builder.add_documents(documents).unwrap(); user_error.unwrap(); @@ -298,18 +328,30 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_SONGS_3_4, "csv"); let (builder, user_error) = builder.add_documents(documents).unwrap(); user_error.unwrap(); builder.execute().unwrap(); let indexing_config = IndexDocumentsConfig::default(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_SONGS_4_4, "csv"); let (builder, user_error) = builder.add_documents(documents).unwrap(); user_error.unwrap(); @@ -345,9 +387,15 @@ fn indexing_songs_without_faceted_numbers(c: &mut Criterion) { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); @@ -384,9 +432,15 @@ fn indexing_songs_without_faceted_fields(c: &mut Criterion) { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_SONGS, "csv"); let (builder, user_error) = builder.add_documents(documents).unwrap(); @@ -423,9 +477,15 @@ fn indexing_wiki(c: &mut Criterion) { let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut wtxn = index.write_txn().unwrap(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES, "csv"); let (builder, user_error) = builder.add_documents(documents).unwrap(); @@ -461,9 +521,15 @@ fn reindexing_wiki(c: &mut Criterion) { let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut wtxn = index.write_txn().unwrap(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES, "csv"); let (builder, user_error) = builder.add_documents(documents).unwrap(); @@ -478,9 +544,15 @@ fn reindexing_wiki(c: &mut Criterion) { let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut wtxn = index.write_txn().unwrap(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES, "csv"); let (builder, user_error) = builder.add_documents(documents).unwrap(); @@ -518,9 +590,15 @@ fn deleting_wiki_in_batches_default(c: &mut Criterion) { let mut wtxn = index.write_txn().unwrap(); let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES, "csv"); let (builder, user_error) = builder.add_documents(documents).unwrap(); user_error.unwrap(); @@ -576,9 +654,15 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES_1_2, "csv"); let (builder, user_error) = builder.add_documents(documents).unwrap(); @@ -594,9 +678,15 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; let mut wtxn = index.write_txn().unwrap(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES_3_4, "csv"); @@ -606,9 +696,15 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_WIKI_ARTICLES_4_4, "csv"); @@ -646,9 +742,15 @@ fn indexing_movies_default(c: &mut Criterion) { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::MOVIES, "json"); let (builder, user_error) = builder.add_documents(documents).unwrap(); @@ -683,9 +785,15 @@ fn reindexing_movies_default(c: &mut Criterion) { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::MOVIES, "json"); let (builder, user_error) = builder.add_documents(documents).unwrap(); @@ -699,9 +807,15 @@ fn reindexing_movies_default(c: &mut Criterion) { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::MOVIES, "json"); let (builder, user_error) = builder.add_documents(documents).unwrap(); @@ -738,9 +852,15 @@ fn deleting_movies_in_batches_default(c: &mut Criterion) { let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); let indexing_config = IndexDocumentsConfig::default(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::MOVIES, "json"); let (builder, user_error) = builder.add_documents(documents).unwrap(); user_error.unwrap(); @@ -794,9 +914,15 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) { // as we don't care about the time it takes. let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::MOVIES_1_2, "json"); let (builder, user_error) = builder.add_documents(documents).unwrap(); @@ -811,9 +937,15 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::MOVIES_3_4, "json"); let (builder, user_error) = builder.add_documents(documents).unwrap(); @@ -821,9 +953,15 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) { builder.execute().unwrap(); let indexing_config = IndexDocumentsConfig::default(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::MOVIES_4_4, "json"); let (builder, user_error) = builder.add_documents(documents).unwrap(); @@ -883,9 +1021,15 @@ fn indexing_nested_movies_default(c: &mut Criterion) { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::NESTED_MOVIES, "json"); let (builder, user_error) = builder.add_documents(documents).unwrap(); @@ -945,9 +1089,15 @@ fn deleting_nested_movies_in_batches_default(c: &mut Criterion) { let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); let indexing_config = IndexDocumentsConfig::default(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::NESTED_MOVIES, "json"); let (builder, user_error) = builder.add_documents(documents).unwrap(); user_error.unwrap(); @@ -1008,9 +1158,15 @@ fn indexing_nested_movies_without_faceted_fields(c: &mut Criterion) { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::NESTED_MOVIES, "json"); let (builder, user_error) = builder.add_documents(documents).unwrap(); @@ -1046,9 +1202,15 @@ fn indexing_geo(c: &mut Criterion) { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_ALL_COUNTRIES, "jsonl"); let (builder, user_error) = builder.add_documents(documents).unwrap(); @@ -1084,9 +1246,15 @@ fn reindexing_geo(c: &mut Criterion) { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_ALL_COUNTRIES, "jsonl"); let (builder, user_error) = builder.add_documents(documents).unwrap(); @@ -1101,9 +1269,15 @@ fn reindexing_geo(c: &mut Criterion) { let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); let mut wtxn = index.write_txn().unwrap(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_ALL_COUNTRIES, "jsonl"); let (builder, user_error) = builder.add_documents(documents).unwrap(); @@ -1141,9 +1315,15 @@ fn deleting_geo_in_batches_default(c: &mut Criterion) { let config = IndexerConfig::default(); let mut wtxn = index.write_txn().unwrap(); let indexing_config = IndexDocumentsConfig::default(); - let builder = - IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()) - .unwrap(); + let builder = IndexDocuments::new( + &mut wtxn, + &index, + &config, + indexing_config, + |_| (), + || false, + ) + .unwrap(); let documents = utils::documents_from(datasets_paths::SMOL_ALL_COUNTRIES, "jsonl"); let (builder, user_error) = builder.add_documents(documents).unwrap(); user_error.unwrap(); diff --git a/benchmarks/benches/utils.rs b/benchmarks/benches/utils.rs index a240ce299..511b3b8d5 100644 --- a/benchmarks/benches/utils.rs +++ b/benchmarks/benches/utils.rs @@ -86,7 +86,7 @@ pub fn base_setup(conf: &Conf) -> Index { (conf.configure)(&mut builder); - builder.execute(|_| ()).unwrap(); + builder.execute(|_| (), || false).unwrap(); wtxn.commit().unwrap(); let config = IndexerConfig::default(); @@ -96,7 +96,8 @@ pub fn base_setup(conf: &Conf) -> Index { update_method: IndexDocumentsMethod::ReplaceDocuments, ..Default::default() }; - let builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); + let builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| (), || false).unwrap(); let documents = documents_from(conf.dataset, conf.dataset_format); let (builder, user_error) = builder.add_documents(documents).unwrap(); user_error.unwrap(); diff --git a/cli/src/main.rs b/cli/src/main.rs index a633e9fa7..dd5489ebc 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -239,7 +239,7 @@ impl Performer for DocumentAddition { if let Some(primary) = self.primary { let mut builder = update::Settings::new(&mut txn, &index, &config); builder.set_primary_key(primary); - builder.execute(|_| ()).unwrap(); + builder.execute(|_| (), || false).unwrap(); } let indexing_config = IndexDocumentsConfig { @@ -260,6 +260,7 @@ impl Performer for DocumentAddition { &config, indexing_config, |step| indexing_callback(step, &bars), + || false, ) .unwrap(); let (addition, user_error) = addition.add_documents(reader)?; @@ -517,7 +518,7 @@ impl Performer for SettingsUpdate { bars.push(bar); } - update.execute(|step| indexing_callback(step, &bars))?; + update.execute(|step| indexing_callback(step, &bars), || false)?; txn.commit()?; Ok(()) diff --git a/milli/src/error.rs b/milli/src/error.rs index d3f0a179f..bd691ab1d 100644 --- a/milli/src/error.rs +++ b/milli/src/error.rs @@ -56,6 +56,8 @@ pub enum InternalError { Store(#[from] MdbError), #[error(transparent)] Utf8(#[from] str::Utf8Error), + #[error("An indexation process was explicitly aborted.")] + AbortedIndexation, } #[derive(Error, Debug)] diff --git a/milli/src/index.rs b/milli/src/index.rs index 7c5e92d05..63568e3b0 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -1188,6 +1188,7 @@ pub(crate) mod tests { use tempfile::TempDir; use crate::documents::DocumentsBatchReader; + use crate::error::{Error, InternalError}; use crate::index::{DEFAULT_MIN_WORD_LEN_ONE_TYPO, DEFAULT_MIN_WORD_LEN_TWO_TYPOS}; use crate::update::{self, IndexDocuments, IndexDocumentsConfig, IndexerConfig, Settings}; use crate::{db_snap, Index}; @@ -1237,6 +1238,7 @@ pub(crate) mod tests { &self.indexer_config, self.index_documents_config.clone(), |_| (), + || false, ) .unwrap(); let (builder, user_error) = builder.add_documents(documents).unwrap(); @@ -1273,11 +1275,45 @@ pub(crate) mod tests { ) -> Result<(), crate::error::Error> { let mut builder = update::Settings::new(wtxn, &self.inner, &self.indexer_config); update(&mut builder); - builder.execute(drop)?; + builder.execute(drop, || false)?; Ok(()) } } + #[test] + fn aborting_indexation() { + use std::sync::atomic::AtomicBool; + use std::sync::atomic::Ordering::Relaxed; + + let index = TempIndex::new(); + let mut wtxn = index.inner.write_txn().unwrap(); + + let should_abort = AtomicBool::new(false); + let builder = IndexDocuments::new( + &mut wtxn, + &index.inner, + &index.indexer_config, + index.index_documents_config.clone(), + |_| (), + || should_abort.load(Relaxed), + ) + .unwrap(); + + let (builder, user_error) = builder + .add_documents(documents!([ + { "id": 1, "name": "kevin" }, + { "id": 2, "name": "bob", "age": 20 }, + { "id": 2, "name": "bob", "age": 20 }, + ])) + .unwrap(); + user_error.unwrap(); + + should_abort.store(true, Relaxed); + let err = builder.execute().unwrap_err(); + + assert!(matches!(err, Error::InternalError(InternalError::AbortedIndexation))); + } + #[test] fn initial_field_distribution() { let index = TempIndex::new(); diff --git a/milli/src/search/distinct/mod.rs b/milli/src/search/distinct/mod.rs index 1a9c56cf3..b6ed26917 100644 --- a/milli/src/search/distinct/mod.rs +++ b/milli/src/search/distinct/mod.rs @@ -89,7 +89,7 @@ mod test { let config = IndexerConfig::default(); let mut update = Settings::new(&mut txn, &index, &config); update.set_distinct_field(distinct.to_string()); - update.execute(|_| ()).unwrap(); + update.execute(|_| (), || false).unwrap(); // add documents to the index let config = IndexerConfig::default(); @@ -98,7 +98,8 @@ mod test { ..Default::default() }; let addition = - IndexDocuments::new(&mut txn, &index, &config, indexing_config, |_| ()).unwrap(); + IndexDocuments::new(&mut txn, &index, &config, indexing_config, |_| (), || false) + .unwrap(); let reader = crate::documents::DocumentsBatchReader::from_reader(Cursor::new(JSON.as_slice())) diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 10a831ddf..a121d3ae0 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -32,7 +32,7 @@ pub use self::helpers::{ use self::helpers::{grenad_obkv_into_chunks, GrenadParameters}; pub use self::transform::{Transform, TransformOutput}; use crate::documents::{obkv_to_object, DocumentsBatchReader}; -use crate::error::UserError; +use crate::error::{Error, InternalError, UserError}; pub use crate::update::index_documents::helpers::CursorClonableMmap; use crate::update::{ self, IndexerConfig, PrefixWordPairsProximityDocids, UpdateIndexingStep, WordPrefixDocids, @@ -70,13 +70,14 @@ impl Default for IndexDocumentsMethod { } } -pub struct IndexDocuments<'t, 'u, 'i, 'a, F> { +pub struct IndexDocuments<'t, 'u, 'i, 'a, FP, FA> { wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index, config: IndexDocumentsConfig, indexer_config: &'a IndexerConfig, transform: Option>, - progress: F, + progress: FP, + should_abort: FA, added_documents: u64, } @@ -90,17 +91,19 @@ pub struct IndexDocumentsConfig { pub autogenerate_docids: bool, } -impl<'t, 'u, 'i, 'a, F> IndexDocuments<'t, 'u, 'i, 'a, F> +impl<'t, 'u, 'i, 'a, FP, FA> IndexDocuments<'t, 'u, 'i, 'a, FP, FA> where - F: Fn(UpdateIndexingStep) + Sync, + FP: Fn(UpdateIndexingStep) + Sync, + FA: Fn() -> bool + Sync, { pub fn new( wtxn: &'t mut heed::RwTxn<'i, 'u>, index: &'i Index, indexer_config: &'a IndexerConfig, config: IndexDocumentsConfig, - progress: F, - ) -> Result> { + progress: FP, + should_abort: FA, + ) -> Result> { let transform = Some(Transform::new( wtxn, index, @@ -114,6 +117,7 @@ where config, indexer_config, progress, + should_abort, wtxn, index, added_documents: 0, @@ -148,12 +152,13 @@ where Err(user_error) => return Ok((self, Err(user_error))), }; - let indexed_documents = self - .transform - .as_mut() - .expect("Invalid document addition state") - .read_documents(enriched_documents_reader, self.wtxn, &self.progress)? - as u64; + let indexed_documents = + self.transform.as_mut().expect("Invalid document addition state").read_documents( + enriched_documents_reader, + self.wtxn, + &self.progress, + &self.should_abort, + )? as u64; self.added_documents += indexed_documents; @@ -197,7 +202,8 @@ where #[logging_timer::time("IndexDocuments::{}")] pub fn execute_raw(self, output: TransformOutput) -> Result where - F: Fn(UpdateIndexingStep) + Sync, + FP: Fn(UpdateIndexingStep) + Sync, + FA: Fn() -> bool + Sync, { let TransformOutput { primary_key, @@ -346,6 +352,10 @@ where }); for result in lmdb_writer_rx { + if (self.should_abort)() { + return Err(Error::InternalError(InternalError::AbortedIndexation)); + } + let typed_chunk = match result? { TypedChunk::WordDocids { word_docids_reader, exact_word_docids_reader } => { let cloneable_chunk = unsafe { as_cloneable_grenad(&word_docids_reader)? }; @@ -422,17 +432,26 @@ where word_position_docids: Option>, ) -> Result<()> where - F: Fn(UpdateIndexingStep) + Sync, + FP: Fn(UpdateIndexingStep) + Sync, + FA: Fn() -> bool + Sync, { // Merged databases are already been indexed, we start from this count; let mut databases_seen = MERGED_DATABASE_COUNT; + if (self.should_abort)() { + return Err(Error::InternalError(InternalError::AbortedIndexation)); + } + databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { databases_seen, total_databases: TOTAL_POSTING_DATABASE_COUNT, }); + if (self.should_abort)() { + return Err(Error::InternalError(InternalError::AbortedIndexation)); + } + let previous_words_prefixes_fst = self.index.words_prefixes_fst(self.wtxn)?.map_data(|cow| cow.into_owned())?; @@ -446,6 +465,10 @@ where } builder.execute()?; + if (self.should_abort)() { + return Err(Error::InternalError(InternalError::AbortedIndexation)); + } + let current_prefix_fst = self.index.words_prefixes_fst(self.wtxn)?; // We retrieve the common words between the previous and new prefix word fst. @@ -473,6 +496,10 @@ where total_databases: TOTAL_POSTING_DATABASE_COUNT, }); + if (self.should_abort)() { + return Err(Error::InternalError(InternalError::AbortedIndexation)); + } + if let Some(word_docids) = word_docids { execute_word_prefix_docids( self.wtxn, @@ -499,6 +526,10 @@ where )?; } + if (self.should_abort)() { + return Err(Error::InternalError(InternalError::AbortedIndexation)); + } + databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { databases_seen, @@ -521,6 +552,10 @@ where )?; } + if (self.should_abort)() { + return Err(Error::InternalError(InternalError::AbortedIndexation)); + } + databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { databases_seen, @@ -548,6 +583,10 @@ where )?; } + if (self.should_abort)() { + return Err(Error::InternalError(InternalError::AbortedIndexation)); + } + databases_seen += 1; (self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase { databases_seen, diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index 3786c5bcb..7c9a912b3 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -138,15 +138,17 @@ impl<'a, 'i> Transform<'a, 'i> { }) } - pub fn read_documents( + pub fn read_documents( &mut self, reader: EnrichedDocumentsBatchReader, wtxn: &mut heed::RwTxn, - progress_callback: F, + progress_callback: FP, + should_abort: FA, ) -> Result where R: Read + Seek, - F: Fn(UpdateIndexingStep) + Sync, + FP: Fn(UpdateIndexingStep) + Sync, + FA: Fn() -> bool + Sync, { let (mut cursor, fields_index) = reader.into_cursor_and_fields_index(); @@ -165,6 +167,10 @@ impl<'a, 'i> Transform<'a, 'i> { while let Some(enriched_document) = cursor.next_enriched_document()? { let EnrichedDocument { document, document_id } = enriched_document; + if should_abort() { + return Err(Error::InternalError(InternalError::AbortedIndexation)); + } + // drop_and_reuse is called instead of .clear() to communicate to the compiler that field_buffer // does not keep references from the cursor between loop iterations let mut field_buffer_cache = drop_and_reuse(field_buffer); diff --git a/milli/src/update/settings.rs b/milli/src/update/settings.rs index b3b1420f8..f82a57cbc 100644 --- a/milli/src/update/settings.rs +++ b/milli/src/update/settings.rs @@ -266,9 +266,15 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { self.pagination_max_total_hits = Setting::Reset; } - fn reindex(&mut self, cb: &F, old_fields_ids_map: FieldsIdsMap) -> Result<()> + fn reindex( + &mut self, + progress_callback: &FP, + should_abort: &FA, + old_fields_ids_map: FieldsIdsMap, + ) -> Result<()> where - F: Fn(UpdateIndexingStep) + Sync, + FP: Fn(UpdateIndexingStep) + Sync, + FA: Fn() -> bool + Sync, { let fields_ids_map = self.index.fields_ids_map(self.wtxn)?; // if the settings are set before any document update, we don't need to do anything, and @@ -302,7 +308,8 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { self.index, self.indexer_config, IndexDocumentsConfig::default(), - &cb, + &progress_callback, + &should_abort, )?; indexing_builder.execute_raw(output)?; @@ -657,9 +664,10 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { Ok(()) } - pub fn execute(mut self, progress_callback: F) -> Result<()> + pub fn execute(mut self, progress_callback: FP, should_abort: FA) -> Result<()> where - F: Fn(UpdateIndexingStep) + Sync, + FP: Fn(UpdateIndexingStep) + Sync, + FA: Fn() -> bool + Sync, { self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?; @@ -695,7 +703,7 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> { || searchable_updated || exact_attributes_updated { - self.reindex(&progress_callback, old_fields_ids_map)?; + self.reindex(&progress_callback, &should_abort, old_fields_ids_map)?; } Ok(()) diff --git a/milli/tests/search/distinct.rs b/milli/tests/search/distinct.rs index 64dd16f09..c2b7e2c1e 100644 --- a/milli/tests/search/distinct.rs +++ b/milli/tests/search/distinct.rs @@ -19,7 +19,7 @@ macro_rules! test_distinct { let config = milli::update::IndexerConfig::default(); let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_distinct_field(S(stringify!($distinct))); - builder.execute(|_| ()).unwrap(); + builder.execute(|_| (), || false).unwrap(); wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); diff --git a/milli/tests/search/facet_distribution.rs b/milli/tests/search/facet_distribution.rs index 83d692d7f..e2f89f2db 100644 --- a/milli/tests/search/facet_distribution.rs +++ b/milli/tests/search/facet_distribution.rs @@ -23,13 +23,14 @@ fn test_facet_distribution_with_no_facet_values() { S("genres"), S("tags"), }); - builder.execute(|_| ()).unwrap(); + builder.execute(|_| (), || false).unwrap(); // index documents let config = IndexerConfig { max_memory: Some(10 * 1024 * 1024), ..Default::default() }; let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; - let builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); + let builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| (), || false).unwrap(); let mut documents_builder = DocumentsBatchBuilder::new(Vec::new()); let reader = Cursor::new( r#"{ diff --git a/milli/tests/search/mod.rs b/milli/tests/search/mod.rs index 4ec1aeb83..c8b01648c 100644 --- a/milli/tests/search/mod.rs +++ b/milli/tests/search/mod.rs @@ -57,13 +57,14 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { S("america") => vec![S("the united states")], }); builder.set_searchable_fields(vec![S("title"), S("description")]); - builder.execute(|_| ()).unwrap(); + builder.execute(|_| (), || false).unwrap(); // index documents let config = IndexerConfig { max_memory: Some(10 * 1024 * 1024), ..Default::default() }; let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; - let builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); + let builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| (), || false).unwrap(); let mut documents_builder = DocumentsBatchBuilder::new(Vec::new()); let reader = Cursor::new(CONTENT.as_bytes()); diff --git a/milli/tests/search/query_criteria.rs b/milli/tests/search/query_criteria.rs index f873f56f7..90b4d6362 100644 --- a/milli/tests/search/query_criteria.rs +++ b/milli/tests/search/query_criteria.rs @@ -345,7 +345,7 @@ fn criteria_mixup() { let mut wtxn = index.write_txn().unwrap(); let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_criteria(criteria.iter().map(ToString::to_string).collect()); - builder.execute(|_| ()).unwrap(); + builder.execute(|_| (), || false).unwrap(); wtxn.commit().unwrap(); let mut rtxn = index.read_txn().unwrap(); @@ -385,12 +385,13 @@ fn criteria_ascdesc() { S("name"), S("age"), }); - builder.execute(|_| ()).unwrap(); + builder.execute(|_| (), || false).unwrap(); // index documents let config = IndexerConfig { max_memory: Some(10 * 1024 * 1024), ..Default::default() }; let indexing_config = IndexDocumentsConfig { autogenerate_docids: true, ..Default::default() }; - let builder = IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| ()).unwrap(); + let builder = + IndexDocuments::new(&mut wtxn, &index, &config, indexing_config, |_| (), || false).unwrap(); let mut batch_builder = DocumentsBatchBuilder::new(Vec::new()); @@ -436,7 +437,7 @@ fn criteria_ascdesc() { let mut wtxn = index.write_txn().unwrap(); let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_criteria(vec![criterion.to_string()]); - builder.execute(|_| ()).unwrap(); + builder.execute(|_| (), || false).unwrap(); wtxn.commit().unwrap(); let mut rtxn = index.read_txn().unwrap(); diff --git a/milli/tests/search/typo_tolerance.rs b/milli/tests/search/typo_tolerance.rs index 7dc6b0c4f..c939186e5 100644 --- a/milli/tests/search/typo_tolerance.rs +++ b/milli/tests/search/typo_tolerance.rs @@ -40,7 +40,7 @@ fn test_typo_tolerance_one_typo() { let config = IndexerConfig::default(); let mut builder = Settings::new(&mut txn, &index, &config); builder.set_min_word_len_one_typo(4); - builder.execute(|_| ()).unwrap(); + builder.execute(|_| (), || false).unwrap(); // typo is now supported for 4 letters words let mut search = Search::new(&txn, &index); @@ -86,7 +86,7 @@ fn test_typo_tolerance_two_typo() { let config = IndexerConfig::default(); let mut builder = Settings::new(&mut txn, &index, &config); builder.set_min_word_len_two_typos(7); - builder.execute(|_| ()).unwrap(); + builder.execute(|_| (), || false).unwrap(); // typo is now supported for 4 letters words let mut search = Search::new(&txn, &index); @@ -127,7 +127,8 @@ fn test_typo_disabled_on_word() { let mut txn = index.write_txn().unwrap(); let config = IndexerConfig::default(); let indexing_config = IndexDocumentsConfig::default(); - let builder = IndexDocuments::new(&mut txn, &index, &config, indexing_config, |_| ()).unwrap(); + let builder = + IndexDocuments::new(&mut txn, &index, &config, indexing_config, |_| (), || false).unwrap(); let (builder, user_error) = builder.add_documents(documents).unwrap(); user_error.unwrap(); @@ -156,7 +157,7 @@ fn test_typo_disabled_on_word() { // `zealand` doesn't allow typos anymore exact_words.insert("zealand".to_string()); builder.set_exact_words(exact_words); - builder.execute(|_| ()).unwrap(); + builder.execute(|_| (), || false).unwrap(); let mut search = Search::new(&txn, &index); search.query("zealand"); @@ -194,7 +195,7 @@ fn test_disable_typo_on_attribute() { let mut builder = Settings::new(&mut txn, &index, &config); // disable typos on `description` builder.set_exact_attributes(vec!["description".to_string()].into_iter().collect()); - builder.execute(|_| ()).unwrap(); + builder.execute(|_| (), || false).unwrap(); let mut search = Search::new(&txn, &index); search.query("antebelum");