diff --git a/Cargo.lock b/Cargo.lock index fd01352a9..e78372421 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3619,6 +3619,7 @@ dependencies = [ "csv", "deserr", "either", + "enum-iterator", "filter-parser", "flatten-serde-json", "fst", diff --git a/crates/milli/Cargo.toml b/crates/milli/Cargo.toml index 622292e8a..07e18ef4d 100644 --- a/crates/milli/Cargo.toml +++ b/crates/milli/Cargo.toml @@ -101,6 +101,7 @@ thread_local = "1.1.8" allocator-api2 = "0.2.18" rustc-hash = "2.0.0" uell = "0.1.0" +enum-iterator = "2.1.0" [dev-dependencies] mimalloc = { version = "0.1.43", default-features = false } diff --git a/crates/milli/src/update/new/extract/faceted/extract_facets.rs b/crates/milli/src/update/new/extract/faceted/extract_facets.rs index acf211d63..5394a6e86 100644 --- a/crates/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/crates/milli/src/update/new/extract/faceted/extract_facets.rs @@ -18,6 +18,7 @@ use crate::update::new::indexer::document_changes::{ extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress, }; use crate::update::new::ref_cell_ext::RefCellExt as _; +use crate::update::new::steps::Step; use crate::update::new::thread_local::{FullySend, ThreadLocal}; use crate::update::new::DocumentChange; use crate::update::GrenadParameters; @@ -337,7 +338,6 @@ fn truncate_str(s: &str) -> &str { } impl FacetedDocidsExtractor { - #[allow(clippy::too_many_arguments)] #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")] pub fn run_extraction< 'pl, @@ -354,9 +354,7 @@ impl FacetedDocidsExtractor { indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, extractor_allocs: &'extractor mut ThreadLocal>, sender: &FieldIdDocidFacetSender, - finished_steps: u16, - total_steps: u16, - step_name: &'static str, + step: Step, ) -> Result>> where MSP: Fn() -> bool + Sync, @@ -386,9 +384,7 @@ impl FacetedDocidsExtractor { indexing_context, extractor_allocs, &datastore, - finished_steps, - total_steps, - step_name, + step, )?; } diff --git a/crates/milli/src/update/new/extract/mod.rs b/crates/milli/src/update/new/extract/mod.rs index 3b2bd77ce..7364434ee 100644 --- a/crates/milli/src/update/new/extract/mod.rs +++ b/crates/milli/src/update/new/extract/mod.rs @@ -14,6 +14,7 @@ pub use searchable::*; pub use vectors::EmbeddingExtractor; use super::indexer::document_changes::{DocumentChanges, IndexingContext, Progress}; +use super::steps::Step; use super::thread_local::{FullySend, ThreadLocal}; use crate::update::GrenadParameters; use crate::Result; @@ -24,9 +25,7 @@ pub trait DocidsExtractor { document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, extractor_allocs: &'extractor mut ThreadLocal>, - finished_steps: u16, - total_steps: u16, - step_name: &'static str, + step: Step, ) -> Result>> where MSP: Fn() -> bool + Sync, diff --git a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs index 9822570d0..f3d4afcb8 100644 --- a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -14,6 +14,7 @@ use crate::update::new::indexer::document_changes::{ extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress, }; use crate::update::new::ref_cell_ext::RefCellExt as _; +use crate::update::new::steps::Step; use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; use crate::update::new::DocumentChange; use crate::update::GrenadParameters; @@ -249,9 +250,7 @@ impl WordDocidsExtractors { document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, extractor_allocs: &'extractor mut ThreadLocal>, - finished_steps: u16, - total_steps: u16, - step_name: &'static str, + step: Step, ) -> Result> where MSP: Fn() -> bool + Sync, @@ -306,9 +305,7 @@ impl WordDocidsExtractors { indexing_context, extractor_allocs, &datastore, - finished_steps, - total_steps, - step_name, + step, )?; } diff --git a/crates/milli/src/update/new/extract/searchable/mod.rs b/crates/milli/src/update/new/extract/searchable/mod.rs index 2a9078d6e..b61dfcf92 100644 --- a/crates/milli/src/update/new/extract/searchable/mod.rs +++ b/crates/milli/src/update/new/extract/searchable/mod.rs @@ -16,6 +16,7 @@ use super::DocidsExtractor; use crate::update::new::indexer::document_changes::{ extract, DocumentChangeContext, DocumentChanges, Extractor, IndexingContext, Progress, }; +use crate::update::new::steps::Step; use crate::update::new::thread_local::{FullySend, ThreadLocal}; use crate::update::new::DocumentChange; use crate::update::GrenadParameters; @@ -60,9 +61,7 @@ pub trait SearchableExtractor: Sized + Sync { document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, extractor_allocs: &'extractor mut ThreadLocal>, - finished_steps: u16, - total_steps: u16, - step_name: &'static str, + step: Step, ) -> Result>> where MSP: Fn() -> bool + Sync, @@ -115,9 +114,7 @@ pub trait SearchableExtractor: Sized + Sync { indexing_context, extractor_allocs, &datastore, - finished_steps, - total_steps, - step_name, + step, )?; } @@ -142,9 +139,7 @@ impl DocidsExtractor for T { document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, extractor_allocs: &'extractor mut ThreadLocal>, - finished_steps: u16, - total_steps: u16, - step_name: &'static str, + step: Step, ) -> Result>> where MSP: Fn() -> bool + Sync, @@ -155,9 +150,7 @@ impl DocidsExtractor for T { document_changes, indexing_context, extractor_allocs, - finished_steps, - total_steps, - step_name, + step, ) } } diff --git a/crates/milli/src/update/new/facet_search_builder.rs b/crates/milli/src/update/new/facet_search_builder.rs index 0c924bff4..39e04a589 100644 --- a/crates/milli/src/update/new/facet_search_builder.rs +++ b/crates/milli/src/update/new/facet_search_builder.rs @@ -1,3 +1,4 @@ +use std::collections::hash_map::Entry; use std::collections::{BTreeSet, HashMap}; use charabia::normalizer::NormalizerOption; @@ -83,9 +84,9 @@ impl<'indexer> FacetSearchBuilder<'indexer> { } fn locales(&mut self, field_id: FieldId) -> Option<&[Language]> { - if !self.localized_field_ids.contains_key(&field_id) { + if let Entry::Vacant(e) = self.localized_field_ids.entry(field_id) { let Some(field_name) = self.global_fields_ids_map.name(field_id) else { - unreachable!("Field id {} not found in the global fields ids map", field_id); + unreachable!("Field id {field_id} not found in the global fields ids map"); }; let locales = self @@ -94,7 +95,7 @@ impl<'indexer> FacetSearchBuilder<'indexer> { .find(|rule| rule.match_str(field_name)) .map(|rule| rule.locales.clone()); - self.localized_field_ids.insert(field_id, locales); + e.insert(locales); } self.localized_field_ids.get(&field_id).unwrap().as_deref() diff --git a/crates/milli/src/update/new/indexer/document_changes.rs b/crates/milli/src/update/new/indexer/document_changes.rs index 308582002..4efebc586 100644 --- a/crates/milli/src/update/new/indexer/document_changes.rs +++ b/crates/milli/src/update/new/indexer/document_changes.rs @@ -8,6 +8,7 @@ use rayon::iter::IndexedParallelIterator; use super::super::document_change::DocumentChange; use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; +use crate::update::new::steps::Step; use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result}; @@ -191,7 +192,6 @@ where const CHUNK_SIZE: usize = 100; -#[allow(clippy::too_many_arguments)] pub fn extract< 'pl, // covariant lifetime of the underlying payload 'extractor, // invariant lifetime of extractor_alloc @@ -217,9 +217,7 @@ pub fn extract< }: IndexingContext<'fid, 'indexer, 'index, MSP, SP>, extractor_allocs: &'extractor mut ThreadLocal>, datastore: &'data ThreadLocal, - finished_steps: u16, - total_steps: u16, - step_name: &'static str, + step: Step, ) -> Result<()> where EX: Extractor<'extractor>, @@ -233,7 +231,7 @@ where extractor_alloc.0.reset(); } - let total_documents = document_changes.len(); + let total_documents = document_changes.len() as u32; let pi = document_changes.iter(CHUNK_SIZE); pi.enumerate().try_arc_for_each_try_init( @@ -253,14 +251,13 @@ where if (must_stop_processing)() { return Err(Arc::new(InternalError::AbortedIndexation.into())); } - let finished_documents = finished_documents * CHUNK_SIZE; + let finished_documents = (finished_documents * CHUNK_SIZE) as u32; - (send_progress)(Progress { - finished_steps, - total_steps, - step_name, - finished_total_documents: Some((finished_documents as u32, total_documents as u32)), - }); + (send_progress)(Progress::from_step_documents( + step, + finished_documents, + total_documents, + )); // Clean up and reuse the document-specific allocator context.doc_alloc.reset(); @@ -279,12 +276,7 @@ where }, )?; - (send_progress)(Progress { - finished_steps, - total_steps, - step_name, - finished_total_documents: Some((total_documents as u32, total_documents as u32)), - }); + (send_progress)(Progress::from_step_documents(step, total_documents, total_documents)); Ok(()) } @@ -295,3 +287,20 @@ pub struct Progress { pub step_name: &'static str, pub finished_total_documents: Option<(u32, u32)>, } + +impl Progress { + pub fn from_step(step: Step) -> Self { + Self { + finished_steps: step.finished_steps(), + total_steps: Step::total_steps(), + step_name: step.name(), + finished_total_documents: None, + } + } + pub fn from_step_documents(step: Step, finished_documents: u32, total_documents: u32) -> Self { + Self { + finished_total_documents: Some((finished_documents, total_documents)), + ..Progress::from_step(step) + } + } +} diff --git a/crates/milli/src/update/new/indexer/document_deletion.rs b/crates/milli/src/update/new/indexer/document_deletion.rs index 2e46be63d..fe3f08583 100644 --- a/crates/milli/src/update/new/indexer/document_deletion.rs +++ b/crates/milli/src/update/new/indexer/document_deletion.rs @@ -96,6 +96,7 @@ mod test { extract, DocumentChangeContext, Extractor, IndexingContext, }; use crate::update::new::indexer::DocumentDeletion; + use crate::update::new::steps::Step; use crate::update::new::thread_local::{MostlySend, ThreadLocal}; use crate::update::new::DocumentChange; use crate::DocumentId; @@ -175,9 +176,7 @@ mod test { context, &mut extractor_allocs, &datastore, - 0, - 1, - "test", + Step::ExtractingDocuments, ) .unwrap(); diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 8998780fb..dfc3d9b02 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -20,6 +20,7 @@ use super::channel::*; use super::extract::*; use super::facet_search_builder::FacetSearchBuilder; use super::merger::FacetFieldIdsDelta; +use super::steps::Step; use super::thread_local::ThreadLocal; use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder}; use super::words_prefix_docids::{ @@ -51,80 +52,6 @@ mod document_operation; mod partial_dump; mod update_by_function; -mod steps { - pub const STEPS: &[&str] = &[ - "extracting documents", - "extracting facets", - "extracting words", - "extracting word proximity", - "extracting embeddings", - "writing geo points", - "writing to database", - "writing embeddings to database", - "waiting for extractors", - "post-processing facets", - "post-processing words", - "finalizing", - ]; - - const fn step(step: u16) -> (u16, &'static str) { - /// TODO: convert to an enum_iterator enum of steps - (step, STEPS[step as usize]) - } - - pub const fn total_steps() -> u16 { - STEPS.len() as u16 - } - - pub const fn extract_documents() -> (u16, &'static str) { - step(0) - } - - pub const fn extract_facets() -> (u16, &'static str) { - step(1) - } - - pub const fn extract_words() -> (u16, &'static str) { - step(2) - } - - pub const fn extract_word_proximity() -> (u16, &'static str) { - step(3) - } - - pub const fn extract_embeddings() -> (u16, &'static str) { - step(4) - } - - pub const fn extract_geo_points() -> (u16, &'static str) { - step(5) - } - - pub const fn write_db() -> (u16, &'static str) { - step(6) - } - - pub const fn write_embedding_db() -> (u16, &'static str) { - step(7) - } - - pub const fn waiting_extractors() -> (u16, &'static str) { - step(8) - } - - pub const fn post_processing_facets() -> (u16, &'static str) { - step(9) - } - - pub const fn post_processing_words() -> (u16, &'static str) { - step(10) - } - - pub const fn finalizing() -> (u16, &'static str) { - step(11) - } -} - /// This is the main function of this crate. /// /// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`]. @@ -167,8 +94,6 @@ where send_progress, }; - let total_steps = steps::total_steps(); - let mut field_distribution = index.field_distribution(wtxn)?; let mut document_ids = index.documents_ids(wtxn)?; @@ -189,15 +114,13 @@ where let document_sender = extractor_sender.documents(); let document_extractor = DocumentsExtractor::new(&document_sender, embedders); let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); - let (finished_steps, step_name) = steps::extract_documents(); + extract(document_changes, &document_extractor, indexing_context, &mut extractor_allocs, &datastore, - finished_steps, - total_steps, - step_name, + Step::ExtractingDocuments, )?; for document_extractor_data in datastore { @@ -218,8 +141,6 @@ where let span = tracing::trace_span!(target: "indexing::documents::extract", "faceted"); let _entered = span.enter(); - let (finished_steps, step_name) = steps::extract_facets(); - facet_field_ids_delta = merge_and_send_facet_docids( FacetedDocidsExtractor::run_extraction( grenad_parameters, @@ -227,9 +148,7 @@ where indexing_context, &mut extractor_allocs, &extractor_sender.field_id_docid_facet_sender(), - finished_steps, - total_steps, - step_name, + Step::ExtractingFacets )?, FacetDatabases::new(index), index, @@ -240,7 +159,7 @@ where { let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); let _entered = span.enter(); - let (finished_steps, step_name) = steps::extract_words(); + let WordDocidsCaches { word_docids, @@ -253,9 +172,7 @@ where document_changes, indexing_context, &mut extractor_allocs, - finished_steps, - total_steps, - step_name, + Step::ExtractingWords )?; // TODO Word Docids Merger @@ -336,16 +253,13 @@ where let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); let _entered = span.enter(); - let (finished_steps, step_name) = steps::extract_word_proximity(); let caches = ::run_extraction( grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, - finished_steps, - total_steps, - step_name, + Step::ExtractingWordProximity, )?; merge_and_send_docids( @@ -369,8 +283,7 @@ where let embedding_sender = extractor_sender.embeddings(); let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads()); let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); - let (finished_steps, step_name) = steps::extract_embeddings(); - extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?; + extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, Step::ExtractingEmbeddings)?; for config in &mut index_embeddings { 'data: for data in datastore.iter_mut() { @@ -392,16 +305,13 @@ where break 'geo; }; let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); - let (finished_steps, step_name) = steps::extract_geo_points(); extract( document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, - finished_steps, - total_steps, - step_name, + Step::WritingGeoPoints )?; merge_and_send_rtree( @@ -431,8 +341,7 @@ where { let span = tracing::trace_span!(target: "indexing::documents::extract", "FINISH"); let _entered = span.enter(); - let (finished_steps, step_name) = steps::write_db(); - (indexing_context.send_progress)(Progress { finished_steps, total_steps, step_name, finished_total_documents: None }); + (indexing_context.send_progress)(Progress::from_step(Step::WritingToDatabase)); } Result::Ok(facet_field_ids_delta) @@ -513,13 +422,9 @@ where let span = tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build"); let _entered = span.enter(); - let (finished_steps, step_name) = steps::write_embedding_db(); - (indexing_context.send_progress)(Progress { - finished_steps, - total_steps, - step_name, - finished_total_documents: None, - }); + (indexing_context.send_progress)(Progress::from_step( + Step::WritingEmbeddingsToDatabase, + )); for (_embedder_index, (_embedder_name, _embedder, writer, dimensions)) in &mut arroy_writers @@ -540,46 +445,21 @@ where } } - let (finished_steps, step_name) = steps::waiting_extractors(); - (indexing_context.send_progress)(Progress { - finished_steps, - total_steps, - step_name, - finished_total_documents: None, - }); + (indexing_context.send_progress)(Progress::from_step(Step::WaitingForExtractors)); let facet_field_ids_delta = extractor_handle.join().unwrap()?; - let (finished_steps, step_name) = steps::post_processing_facets(); - (indexing_context.send_progress)(Progress { - finished_steps, - total_steps, - step_name, - finished_total_documents: None, - }); + (indexing_context.send_progress)(Progress::from_step(Step::PostProcessingFacets)); compute_facet_search_database(index, wtxn, global_fields_ids_map)?; compute_facet_level_database(index, wtxn, facet_field_ids_delta)?; - let (finished_steps, step_name) = steps::post_processing_words(); - (indexing_context.send_progress)(Progress { - finished_steps, - total_steps, - step_name, - finished_total_documents: None, - }); + (indexing_context.send_progress)(Progress::from_step(Step::PostProcessingWords)); if let Some(prefix_delta) = compute_word_fst(index, wtxn)? { compute_prefix_database(index, wtxn, prefix_delta)?; } - - let (finished_steps, step_name) = steps::finalizing(); - (indexing_context.send_progress)(Progress { - finished_steps, - total_steps, - step_name, - finished_total_documents: None, - }); + (indexing_context.send_progress)(Progress::from_step(Step::Finalizing)); Ok(()) as Result<_> })?; diff --git a/crates/milli/src/update/new/mod.rs b/crates/milli/src/update/new/mod.rs index edbbdf497..140f4ccf0 100644 --- a/crates/milli/src/update/new/mod.rs +++ b/crates/milli/src/update/new/mod.rs @@ -17,6 +17,7 @@ pub mod indexer; mod merger; mod parallel_iterator_ext; mod ref_cell_ext; +pub(crate) mod steps; pub(crate) mod thread_local; mod top_level_map; pub mod vector_document; diff --git a/crates/milli/src/update/new/steps.rs b/crates/milli/src/update/new/steps.rs new file mode 100644 index 000000000..60a0c872b --- /dev/null +++ b/crates/milli/src/update/new/steps.rs @@ -0,0 +1,45 @@ +use enum_iterator::Sequence; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Sequence)] +#[repr(u16)] +pub enum Step { + ExtractingDocuments, + ExtractingFacets, + ExtractingWords, + ExtractingWordProximity, + ExtractingEmbeddings, + WritingGeoPoints, + WritingToDatabase, + WritingEmbeddingsToDatabase, + WaitingForExtractors, + PostProcessingFacets, + PostProcessingWords, + Finalizing, +} + +impl Step { + pub fn name(&self) -> &'static str { + match self { + Step::ExtractingDocuments => "extracting documents", + Step::ExtractingFacets => "extracting facets", + Step::ExtractingWords => "extracting words", + Step::ExtractingWordProximity => "extracting word proximity", + Step::ExtractingEmbeddings => "extracting embeddings", + Step::WritingGeoPoints => "writing geo points", + Step::WritingToDatabase => "writing to database", + Step::WritingEmbeddingsToDatabase => "writing embeddings to database", + Step::WaitingForExtractors => "waiting for extractors", + Step::PostProcessingFacets => "post-processing facets", + Step::PostProcessingWords => "post-processing words", + Step::Finalizing => "finalizing", + } + } + + pub fn finished_steps(self) -> u16 { + self as u16 + } + + pub const fn total_steps() -> u16 { + Self::CARDINALITY as u16 + } +}