diff --git a/crates/milli/src/update/new/extract/faceted/extract_facets.rs b/crates/milli/src/update/new/extract/faceted/extract_facets.rs index 66ed6cbfb..7e0484e39 100644 --- a/crates/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/crates/milli/src/update/new/extract/faceted/extract_facets.rs @@ -28,7 +28,7 @@ use crate::{DocumentId, FieldId, Index, Result, MAX_FACET_VALUE_LENGTH}; pub struct FacetedExtractorData<'a, 'b> { attributes_to_extract: &'a [&'a str], sender: &'a FieldIdDocidFacetSender<'a, 'b>, - grenad_parameters: GrenadParameters, + grenad_parameters: &'a GrenadParameters, buckets: usize, } @@ -374,7 +374,6 @@ fn truncate_str(s: &str) -> &str { impl FacetedDocidsExtractor { #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract::faceted")] pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( - grenad_parameters: GrenadParameters, document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, extractor_allocs: &'extractor mut ThreadLocal>, @@ -398,7 +397,7 @@ impl FacetedDocidsExtractor { let extractor = FacetedExtractorData { attributes_to_extract: &attributes_to_extract, - grenad_parameters, + grenad_parameters: indexing_context.grenad_parameters, buckets: rayon::current_num_threads(), sender, }; diff --git a/crates/milli/src/update/new/extract/mod.rs b/crates/milli/src/update/new/extract/mod.rs index 4bcb918e4..aa0a3d333 100644 --- a/crates/milli/src/update/new/extract/mod.rs +++ b/crates/milli/src/update/new/extract/mod.rs @@ -18,12 +18,10 @@ pub use vectors::EmbeddingExtractor; use super::indexer::document_changes::{DocumentChanges, IndexingContext}; use super::steps::IndexingStep; use super::thread_local::{FullySend, ThreadLocal}; -use crate::update::GrenadParameters; use crate::Result; pub trait DocidsExtractor { fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( - grenad_parameters: GrenadParameters, document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, extractor_allocs: &'extractor mut ThreadLocal>, diff --git a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs index 952ee91e4..49259cd64 100644 --- a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -208,7 +208,7 @@ impl<'extractor> WordDocidsCaches<'extractor> { pub struct WordDocidsExtractorData<'a> { tokenizer: &'a DocumentTokenizer<'a>, - grenad_parameters: GrenadParameters, + grenad_parameters: &'a GrenadParameters, buckets: usize, } @@ -240,7 +240,6 @@ pub struct WordDocidsExtractors; impl WordDocidsExtractors { pub fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( - grenad_parameters: GrenadParameters, document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, extractor_allocs: &'extractor mut ThreadLocal>, @@ -288,7 +287,7 @@ impl WordDocidsExtractors { let extractor = WordDocidsExtractorData { tokenizer: &document_tokenizer, - grenad_parameters, + grenad_parameters: indexing_context.grenad_parameters, buckets: rayon::current_num_threads(), }; diff --git a/crates/milli/src/update/new/extract/searchable/mod.rs b/crates/milli/src/update/new/extract/searchable/mod.rs index c4240196a..7c949a3ce 100644 --- a/crates/milli/src/update/new/extract/searchable/mod.rs +++ b/crates/milli/src/update/new/extract/searchable/mod.rs @@ -24,7 +24,7 @@ use crate::{Index, Result, MAX_POSITION_PER_ATTRIBUTE}; pub struct SearchableExtractorData<'a, EX: SearchableExtractor> { tokenizer: &'a DocumentTokenizer<'a>, - grenad_parameters: GrenadParameters, + grenad_parameters: &'a GrenadParameters, buckets: usize, _ex: PhantomData, } @@ -57,7 +57,6 @@ impl<'a, 'extractor, EX: SearchableExtractor + Sync> Extractor<'extractor> pub trait SearchableExtractor: Sized + Sync { fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( - grenad_parameters: GrenadParameters, document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, extractor_allocs: &'extractor mut ThreadLocal>, @@ -96,7 +95,7 @@ pub trait SearchableExtractor: Sized + Sync { let extractor_data: SearchableExtractorData = SearchableExtractorData { tokenizer: &document_tokenizer, - grenad_parameters, + grenad_parameters: indexing_context.grenad_parameters, buckets: rayon::current_num_threads(), _ex: PhantomData, }; @@ -134,7 +133,6 @@ pub trait SearchableExtractor: Sized + Sync { impl DocidsExtractor for T { fn run_extraction<'pl, 'fid, 'indexer, 'index, 'extractor, DC: DocumentChanges<'pl>, MSP>( - grenad_parameters: GrenadParameters, document_changes: &DC, indexing_context: IndexingContext<'fid, 'indexer, 'index, MSP>, extractor_allocs: &'extractor mut ThreadLocal>, @@ -143,12 +141,6 @@ impl DocidsExtractor for T { where MSP: Fn() -> bool + Sync, { - Self::run_extraction( - grenad_parameters, - document_changes, - indexing_context, - extractor_allocs, - step, - ) + Self::run_extraction(document_changes, indexing_context, extractor_allocs, step) } } diff --git a/crates/milli/src/update/new/indexer/document_changes.rs b/crates/milli/src/update/new/indexer/document_changes.rs index a45fcee85..f77ac7658 100644 --- a/crates/milli/src/update/new/indexer/document_changes.rs +++ b/crates/milli/src/update/new/indexer/document_changes.rs @@ -12,6 +12,7 @@ use crate::progress::{AtomicDocumentStep, Progress}; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; +use crate::update::GrenadParameters; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result}; pub struct DocumentChangeContext< @@ -145,6 +146,7 @@ pub struct IndexingContext< pub fields_ids_map_store: &'indexer ThreadLocal>>>, pub must_stop_processing: &'indexer MSP, pub progress: &'indexer Progress, + pub grenad_parameters: &'indexer GrenadParameters, } impl< @@ -207,6 +209,7 @@ pub fn extract< fields_ids_map_store, must_stop_processing, progress, + grenad_parameters: _, }: IndexingContext<'fid, 'indexer, 'index, MSP>, extractor_allocs: &'extractor mut ThreadLocal>, datastore: &'data ThreadLocal, diff --git a/crates/milli/src/update/new/indexer/document_deletion.rs b/crates/milli/src/update/new/indexer/document_deletion.rs index b42a6c859..03f763f18 100644 --- a/crates/milli/src/update/new/indexer/document_deletion.rs +++ b/crates/milli/src/update/new/indexer/document_deletion.rs @@ -166,6 +166,7 @@ mod test { fields_ids_map_store: &fields_ids_map_store, must_stop_processing: &(|| false), progress: &Progress::default(), + grenad_parameters: &Default::default(), }; for _ in 0..3 { diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index 090c1eb8e..8f14fa7ed 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -13,7 +13,7 @@ use serde_json::Deserializer; use super::super::document_change::DocumentChange; use super::document_changes::{DocumentChangeContext, DocumentChanges}; -use super::retrieve_or_guess_primary_key; +use super::guess_primary_key::retrieve_or_guess_primary_key; use crate::documents::PrimaryKey; use crate::progress::{AtomicPayloadStep, Progress}; use crate::update::new::document::Versions; diff --git a/crates/milli/src/update/new/indexer/extract.rs b/crates/milli/src/update/new/indexer/extract.rs new file mode 100644 index 000000000..53fd8a89b --- /dev/null +++ b/crates/milli/src/update/new/indexer/extract.rs @@ -0,0 +1,309 @@ +use std::collections::BTreeMap; +use std::sync::atomic::AtomicBool; +use std::sync::OnceLock; + +use bumpalo::Bump; +use roaring::RoaringBitmap; +use tracing::Span; + +use super::super::channel::*; +use super::super::extract::*; +use super::super::steps::IndexingStep; +use super::super::thread_local::{FullySend, ThreadLocal}; +use super::super::FacetFieldIdsDelta; +use super::document_changes::{extract, DocumentChanges, IndexingContext}; +use crate::index::IndexEmbeddingConfig; +use crate::proximity::ProximityPrecision; +use crate::update::new::extract::EmbeddingExtractor; +use crate::update::new::merger::merge_and_send_rtree; +use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; +use crate::vector::EmbeddingConfigs; +use crate::{Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; + +#[allow(clippy::too_many_arguments)] +pub(super) fn extract_all<'pl, 'extractor, DC, MSP>( + document_changes: &DC, + indexing_context: IndexingContext, + indexer_span: Span, + extractor_sender: ExtractorBbqueueSender, + embedders: &EmbeddingConfigs, + extractor_allocs: &'extractor mut ThreadLocal>, + finished_extraction: &AtomicBool, + field_distribution: &mut BTreeMap, + mut index_embeddings: Vec, + document_ids: &mut RoaringBitmap, +) -> Result<(FacetFieldIdsDelta, Vec)> +where + DC: DocumentChanges<'pl>, + MSP: Fn() -> bool + Sync, +{ + let span = + tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); + let _entered = span.enter(); + + let index = indexing_context.index; + let rtxn = index.read_txn()?; + + // document but we need to create a function that collects and compresses documents. + let document_sender = extractor_sender.documents(); + let document_extractor = DocumentsExtractor::new(document_sender, embedders); + let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); + { + let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents"); + let _entered = span.enter(); + extract( + document_changes, + &document_extractor, + indexing_context, + extractor_allocs, + &datastore, + IndexingStep::ExtractingDocuments, + )?; + } + { + let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "documents"); + let _entered = span.enter(); + for document_extractor_data in datastore { + let document_extractor_data = document_extractor_data.0.into_inner(); + for (field, delta) in document_extractor_data.field_distribution_delta { + let current = field_distribution.entry(field).or_default(); + // adding the delta should never cause a negative result, as we are removing fields that previously existed. + *current = current.saturating_add_signed(delta); + } + document_extractor_data.docids_delta.apply_to(document_ids); + } + + field_distribution.retain(|_, v| *v != 0); + } + + let facet_field_ids_delta; + + { + let caches = { + let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "faceted"); + let _entered = span.enter(); + + FacetedDocidsExtractor::run_extraction( + document_changes, + indexing_context, + extractor_allocs, + &extractor_sender.field_id_docid_facet_sender(), + IndexingStep::ExtractingFacets, + )? + }; + + { + let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "faceted"); + let _entered = span.enter(); + + facet_field_ids_delta = merge_and_send_facet_docids( + caches, + FacetDatabases::new(index), + index, + extractor_sender.facet_docids(), + )?; + } + } + + { + let WordDocidsCaches { + word_docids, + word_fid_docids, + exact_word_docids, + word_position_docids, + fid_word_count_docids, + } = { + let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); + let _entered = span.enter(); + + WordDocidsExtractors::run_extraction( + document_changes, + indexing_context, + extractor_allocs, + IndexingStep::ExtractingWords, + )? + }; + + { + let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); + let _entered = span.enter(); + merge_and_send_docids( + word_docids, + index.word_docids.remap_types(), + index, + extractor_sender.docids::(), + &indexing_context.must_stop_processing, + )?; + } + + { + let span = + tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids"); + let _entered = span.enter(); + merge_and_send_docids( + word_fid_docids, + index.word_fid_docids.remap_types(), + index, + extractor_sender.docids::(), + &indexing_context.must_stop_processing, + )?; + } + + { + let span = + tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids"); + let _entered = span.enter(); + merge_and_send_docids( + exact_word_docids, + index.exact_word_docids.remap_types(), + index, + extractor_sender.docids::(), + &indexing_context.must_stop_processing, + )?; + } + + { + let span = + tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids"); + let _entered = span.enter(); + merge_and_send_docids( + word_position_docids, + index.word_position_docids.remap_types(), + index, + extractor_sender.docids::(), + &indexing_context.must_stop_processing, + )?; + } + + { + let span = + tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids"); + let _entered = span.enter(); + merge_and_send_docids( + fid_word_count_docids, + index.field_id_word_count_docids.remap_types(), + index, + extractor_sender.docids::(), + &indexing_context.must_stop_processing, + )?; + } + } + + // run the proximity extraction only if the precision is by word + // this works only if the settings didn't change during this transaction. + let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default(); + if proximity_precision == ProximityPrecision::ByWord { + let caches = { + let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); + let _entered = span.enter(); + + ::run_extraction( + document_changes, + indexing_context, + extractor_allocs, + IndexingStep::ExtractingWordProximity, + )? + }; + + { + let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids"); + let _entered = span.enter(); + + merge_and_send_docids( + caches, + index.word_pair_proximity_docids.remap_types(), + index, + extractor_sender.docids::(), + &indexing_context.must_stop_processing, + )?; + } + } + + 'vectors: { + if index_embeddings.is_empty() { + break 'vectors; + } + + let embedding_sender = extractor_sender.embeddings(); + let extractor = EmbeddingExtractor::new( + embedders, + embedding_sender, + field_distribution, + request_threads(), + ); + let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); + { + let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); + let _entered = span.enter(); + + extract( + document_changes, + &extractor, + indexing_context, + extractor_allocs, + &datastore, + IndexingStep::ExtractingEmbeddings, + )?; + } + { + let span = tracing::trace_span!(target: "indexing::documents::merge", "vectors"); + let _entered = span.enter(); + + for config in &mut index_embeddings { + 'data: for data in datastore.iter_mut() { + let data = &mut data.get_mut().0; + let Some(deladd) = data.remove(&config.name) else { + continue 'data; + }; + deladd.apply_to(&mut config.user_provided); + } + } + } + } + + 'geo: { + let Some(extractor) = GeoExtractor::new(&rtxn, index, *indexing_context.grenad_parameters)? + else { + break 'geo; + }; + let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); + + { + let span = tracing::trace_span!(target: "indexing::documents::extract", "geo"); + let _entered = span.enter(); + + extract( + document_changes, + &extractor, + indexing_context, + extractor_allocs, + &datastore, + IndexingStep::WritingGeoPoints, + )?; + } + + merge_and_send_rtree( + datastore, + &rtxn, + index, + extractor_sender.geo(), + &indexing_context.must_stop_processing, + )?; + } + indexing_context.progress.update_progress(IndexingStep::WritingToDatabase); + finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed); + + Result::Ok((facet_field_ids_delta, index_embeddings)) +} + +fn request_threads() -> &'static ThreadPoolNoAbort { + static REQUEST_THREADS: OnceLock = OnceLock::new(); + + REQUEST_THREADS.get_or_init(|| { + ThreadPoolNoAbortBuilder::new() + .num_threads(crate::vector::REQUEST_PARALLELISM) + .thread_name(|index| format!("embedding-request-{index}")) + .build() + .unwrap() + }) +} diff --git a/crates/milli/src/update/new/indexer/guess_primary_key.rs b/crates/milli/src/update/new/indexer/guess_primary_key.rs new file mode 100644 index 000000000..f0eb82b8d --- /dev/null +++ b/crates/milli/src/update/new/indexer/guess_primary_key.rs @@ -0,0 +1,85 @@ +use bumparaw_collections::RawMap; +use heed::RoTxn; +use rustc_hash::FxBuildHasher; + +use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; +use crate::update::new::StdResult; +use crate::{FieldsIdsMap, Index, Result, UserError}; + +/// Returns the primary key that has already been set for this index or the +/// one we will guess by searching for the first key that contains "id" as a substring, +/// and whether the primary key changed +pub fn retrieve_or_guess_primary_key<'a>( + rtxn: &'a RoTxn<'a>, + index: &Index, + new_fields_ids_map: &mut FieldsIdsMap, + primary_key_from_op: Option<&'a str>, + first_document: Option>, +) -> Result, bool), UserError>> { + // make sure that we have a declared primary key, either fetching it from the index or attempting to guess it. + + // do we have an existing declared primary key? + let (primary_key, has_changed) = if let Some(primary_key_from_db) = index.primary_key(rtxn)? { + // did we request a primary key in the operation? + match primary_key_from_op { + // we did, and it is different from the DB one + Some(primary_key_from_op) if primary_key_from_op != primary_key_from_db => { + return Ok(Err(UserError::PrimaryKeyCannotBeChanged( + primary_key_from_db.to_string(), + ))); + } + _ => (primary_key_from_db, false), + } + } else { + // no primary key in the DB => let's set one + // did we request a primary key in the operation? + let primary_key = if let Some(primary_key_from_op) = primary_key_from_op { + // set primary key from operation + primary_key_from_op + } else { + // guess primary key + let first_document = match first_document { + Some(document) => document, + // previous indexer when no pk is set + we send an empty payload => index_primary_key_no_candidate_found + None => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)), + }; + + let guesses: Result> = first_document + .keys() + .filter_map(|name| { + let Some(_) = new_fields_ids_map.insert(name) else { + return Some(Err(UserError::AttributeLimitReached.into())); + }; + name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY).then_some(Ok(name)) + }) + .collect(); + + let mut guesses = guesses?; + + // sort the keys in lexicographical order, so that fields are always in the same order. + guesses.sort_unstable(); + + match guesses.as_slice() { + [] => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)), + [name] => { + tracing::info!("Primary key was not specified in index. Inferred to '{name}'"); + *name + } + multiple => { + return Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound { + candidates: multiple + .iter() + .map(|candidate| candidate.to_string()) + .collect(), + })) + } + } + }; + (primary_key, true) + }; + + match PrimaryKey::new_or_insert(primary_key, new_fields_ids_map) { + Ok(primary_key) => Ok(Ok((primary_key, has_changed))), + Err(err) => Ok(Err(err)), + } +} diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index a850c0d03..1cf83f2d2 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -1,59 +1,37 @@ -use std::cmp::Ordering; use std::sync::atomic::AtomicBool; -use std::sync::{OnceLock, RwLock}; +use std::sync::RwLock; use std::thread::{self, Builder}; use big_s::S; -use bumparaw_collections::RawMap; -use document_changes::{extract, DocumentChanges, IndexingContext}; +use document_changes::{DocumentChanges, IndexingContext}; pub use document_deletion::DocumentDeletion; pub use document_operation::{DocumentOperation, PayloadStats}; use hashbrown::HashMap; -use heed::types::{Bytes, DecodeIgnore, Str}; -use heed::{RoTxn, RwTxn}; -use itertools::{merge_join_by, EitherOrBoth}; +use heed::RwTxn; pub use partial_dump::PartialDump; -use rand::SeedableRng as _; -use rustc_hash::FxBuildHasher; -use time::OffsetDateTime; pub use update_by_function::UpdateByFunction; +use write::{build_vectors, update_index, write_to_db}; use super::channel::*; -use super::extract::*; -use super::facet_search_builder::FacetSearchBuilder; -use super::merger::FacetFieldIdsDelta; use super::steps::IndexingStep; use super::thread_local::ThreadLocal; -use super::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder}; -use super::words_prefix_docids::{ - compute_word_prefix_docids, compute_word_prefix_fid_docids, compute_word_prefix_position_docids, -}; -use super::StdResult; -use crate::documents::{PrimaryKey, DEFAULT_PRIMARY_KEY}; -use crate::facet::FacetType; +use crate::documents::PrimaryKey; use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; -use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY}; use crate::progress::Progress; -use crate::proximity::ProximityPrecision; -use crate::update::del_add::DelAdd; -use crate::update::new::extract::EmbeddingExtractor; -use crate::update::new::merger::merge_and_send_rtree; -use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids; -use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; -use crate::update::settings::InnerIndexSettings; -use crate::update::{FacetsUpdateBulk, GrenadParameters}; -use crate::vector::{ArroyWrapper, EmbeddingConfigs, Embeddings}; -use crate::{ - Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort, - ThreadPoolNoAbortBuilder, UserError, -}; +use crate::update::GrenadParameters; +use crate::vector::{ArroyWrapper, EmbeddingConfigs}; +use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort}; pub(crate) mod de; pub mod document_changes; mod document_deletion; mod document_operation; +mod extract; +mod guess_primary_key; mod partial_dump; +mod post_processing; mod update_by_function; +mod write; /// This is the main function of this crate. /// @@ -107,7 +85,7 @@ where }, ); - let (extractor_sender, mut writer_receiver) = pool + let (extractor_sender, writer_receiver) = pool .install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000)) .unwrap(); @@ -126,9 +104,10 @@ where fields_ids_map_store: &fields_ids_map_store, must_stop_processing, progress, + grenad_parameters: &grenad_parameters, }; - let mut index_embeddings = index.embedding_configs(wtxn)?; + let index_embeddings = index.embedding_configs(wtxn)?; let mut field_distribution = index.field_distribution(wtxn)?; let mut document_ids = index.documents_ids(wtxn)?; @@ -139,261 +118,28 @@ where // prevent moving the field_distribution and document_ids in the inner closure... let field_distribution = &mut field_distribution; let document_ids = &mut document_ids; - let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { - pool.install(move || { - let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); - let _entered = span.enter(); - - let rtxn = index.read_txn()?; - - // document but we need to create a function that collects and compresses documents. - let document_sender = extractor_sender.documents(); - let document_extractor = DocumentsExtractor::new(document_sender, embedders); - let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); - { - let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents"); - let _entered = span.enter(); - extract( + let extractor_handle = + Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { + pool.install(move || { + extract::extract_all( document_changes, - &document_extractor, indexing_context, + indexer_span, + extractor_sender, + embedders, &mut extractor_allocs, - &datastore, - IndexingStep::ExtractingDocuments, - )?; - } - { - let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "documents"); - let _entered = span.enter(); - for document_extractor_data in datastore { - let document_extractor_data = document_extractor_data.0.into_inner(); - for (field, delta) in document_extractor_data.field_distribution_delta { - let current = field_distribution.entry(field).or_default(); - // adding the delta should never cause a negative result, as we are removing fields that previously existed. - *current = current.saturating_add_signed(delta); - } - document_extractor_data.docids_delta.apply_to(document_ids); - } - - field_distribution.retain(|_, v| *v != 0); - } - - let facet_field_ids_delta; - - { - let caches = { - let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "faceted"); - let _entered = span.enter(); - - FacetedDocidsExtractor::run_extraction( - grenad_parameters, - document_changes, - indexing_context, - &mut extractor_allocs, - &extractor_sender.field_id_docid_facet_sender(), - IndexingStep::ExtractingFacets - )? - }; - - { - let span = tracing::trace_span!(target: "indexing::documents::merge", parent: &indexer_span, "faceted"); - let _entered = span.enter(); - - facet_field_ids_delta = merge_and_send_facet_docids( - caches, - FacetDatabases::new(index), - index, - extractor_sender.facet_docids(), - )?; - } - } - - { - let WordDocidsCaches { - word_docids, - word_fid_docids, - exact_word_docids, - word_position_docids, - fid_word_count_docids, - } = { - let span = tracing::trace_span!(target: "indexing::documents::extract", "word_docids"); - let _entered = span.enter(); - - WordDocidsExtractors::run_extraction( - grenad_parameters, - document_changes, - indexing_context, - &mut extractor_allocs, - IndexingStep::ExtractingWords - )? - }; - - { - let span = tracing::trace_span!(target: "indexing::documents::merge", "word_docids"); - let _entered = span.enter(); - merge_and_send_docids( - word_docids, - index.word_docids.remap_types(), - index, - extractor_sender.docids::(), - &indexing_context.must_stop_processing, - )?; - } - - { - let span = tracing::trace_span!(target: "indexing::documents::merge", "word_fid_docids"); - let _entered = span.enter(); - merge_and_send_docids( - word_fid_docids, - index.word_fid_docids.remap_types(), - index, - extractor_sender.docids::(), - &indexing_context.must_stop_processing, - )?; - } - - { - let span = tracing::trace_span!(target: "indexing::documents::merge", "exact_word_docids"); - let _entered = span.enter(); - merge_and_send_docids( - exact_word_docids, - index.exact_word_docids.remap_types(), - index, - extractor_sender.docids::(), - &indexing_context.must_stop_processing, - )?; - } - - { - let span = tracing::trace_span!(target: "indexing::documents::merge", "word_position_docids"); - let _entered = span.enter(); - merge_and_send_docids( - word_position_docids, - index.word_position_docids.remap_types(), - index, - extractor_sender.docids::(), - &indexing_context.must_stop_processing, - )?; - } - - { - let span = tracing::trace_span!(target: "indexing::documents::merge", "fid_word_count_docids"); - let _entered = span.enter(); - merge_and_send_docids( - fid_word_count_docids, - index.field_id_word_count_docids.remap_types(), - index, - extractor_sender.docids::(), - &indexing_context.must_stop_processing, - )?; - } - } - - // run the proximity extraction only if the precision is by word - // this works only if the settings didn't change during this transaction. - let proximity_precision = index.proximity_precision(&rtxn)?.unwrap_or_default(); - if proximity_precision == ProximityPrecision::ByWord { - let caches = { - let span = tracing::trace_span!(target: "indexing::documents::extract", "word_pair_proximity_docids"); - let _entered = span.enter(); - - ::run_extraction( - grenad_parameters, - document_changes, - indexing_context, - &mut extractor_allocs, - IndexingStep::ExtractingWordProximity, - )? - }; - - { - let span = tracing::trace_span!(target: "indexing::documents::merge", "word_pair_proximity_docids"); - let _entered = span.enter(); - - merge_and_send_docids( - caches, - index.word_pair_proximity_docids.remap_types(), - index, - extractor_sender.docids::(), - &indexing_context.must_stop_processing, - )?; - } - } - - 'vectors: { - if index_embeddings.is_empty() { - break 'vectors; - } - - let embedding_sender = extractor_sender.embeddings(); - let extractor = EmbeddingExtractor::new(embedders, embedding_sender, field_distribution, request_threads()); - let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); - { - let span = tracing::trace_span!(target: "indexing::documents::extract", "vectors"); - let _entered = span.enter(); - - extract( - document_changes, - &extractor, - indexing_context, - &mut extractor_allocs, - &datastore, - IndexingStep::ExtractingEmbeddings, - )?; - } - { - let span = tracing::trace_span!(target: "indexing::documents::merge", "vectors"); - let _entered = span.enter(); - - for config in &mut index_embeddings { - 'data: for data in datastore.iter_mut() { - let data = &mut data.get_mut().0; - let Some(deladd) = data.remove(&config.name) else { continue 'data; }; - deladd.apply_to(&mut config.user_provided); - } - } - } - } - - 'geo: { - let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else { - break 'geo; - }; - let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); - - { - let span = tracing::trace_span!(target: "indexing::documents::extract", "geo"); - let _entered = span.enter(); - - extract( - document_changes, - &extractor, - indexing_context, - &mut extractor_allocs, - &datastore, - IndexingStep::WritingGeoPoints - )?; - } - - merge_and_send_rtree( - datastore, - &rtxn, - index, - extractor_sender.geo(), - &indexing_context.must_stop_processing, - )?; - } - indexing_context.progress.update_progress(IndexingStep::WritingToDatabase); - finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed); - - Result::Ok((facet_field_ids_delta, index_embeddings)) - }).unwrap() - })?; + finished_extraction, + field_distribution, + index_embeddings, + document_ids, + ) + }) + .unwrap() + })?; let global_fields_ids_map = GlobalFieldsIdsMap::new(&new_fields_ids_map); let vector_arroy = index.vector_arroy; - let indexer_span = tracing::Span::current(); let arroy_writers: Result> = embedders .inner_as_ref() .iter() @@ -415,114 +161,30 @@ where }) .collect(); - // Used by by the ArroySetVector to copy the embedding into an - // aligned memory area, required by arroy to accept a new vector. - let mut aligned_embedding = Vec::new(); let mut arroy_writers = arroy_writers?; - { - let span = tracing::trace_span!(target: "indexing::write_db", "all"); - let _entered = span.enter(); - - let span = tracing::trace_span!(target: "indexing::write_db", "post_merge"); - let mut _entered_post_merge = None; - - while let Some(action) = writer_receiver.recv_action() { - if _entered_post_merge.is_none() - && finished_extraction.load(std::sync::atomic::Ordering::Relaxed) - { - _entered_post_merge = Some(span.enter()); - } - - match action { - ReceiverAction::WakeUp => (), - ReceiverAction::LargeEntry(LargeEntry { database, key, value }) => { - let database_name = database.database_name(); - let database = database.database(index); - if let Err(error) = database.put(wtxn, &key, &value) { - return Err(Error::InternalError(InternalError::StorePut { - database_name, - key: bstr::BString::from(&key[..]), - value_length: value.len(), - error, - })); - } - } - ReceiverAction::LargeVectors(large_vectors) => { - let LargeVectors { docid, embedder_id, .. } = large_vectors; - let (_, _, writer, dimensions) = - arroy_writers.get(&embedder_id).expect("requested a missing embedder"); - let mut embeddings = Embeddings::new(*dimensions); - for embedding in large_vectors.read_embeddings(*dimensions) { - embeddings.push(embedding.to_vec()).unwrap(); - } - writer.del_items(wtxn, *dimensions, docid)?; - writer.add_items(wtxn, docid, &embeddings)?; - } - } - - // Every time the is a message in the channel we search - // for new entries in the BBQueue buffers. - write_from_bbqueue( - &mut writer_receiver, - index, - wtxn, - &arroy_writers, - &mut aligned_embedding, - )?; - } - - // Once the extractor/writer channel is closed - // we must process the remaining BBQueue messages. - write_from_bbqueue( - &mut writer_receiver, - index, - wtxn, - &arroy_writers, - &mut aligned_embedding, - )?; - } + write_to_db(writer_receiver, finished_extraction, index, wtxn, &arroy_writers)?; indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors); let (facet_field_ids_delta, index_embeddings) = extractor_handle.join().unwrap()?; - 'vectors: { - let span = - tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build"); - let _entered = span.enter(); + indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase); - if index_embeddings.is_empty() { - break 'vectors; - } + build_vectors( + index, + wtxn, + index_embeddings, + &mut arroy_writers, + &indexing_context.must_stop_processing, + )?; - indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase); - let mut rng = rand::rngs::StdRng::seed_from_u64(42); - for (_index, (_embedder_name, _embedder, writer, dimensions)) in &mut arroy_writers { - let dimensions = *dimensions; - writer.build_and_quantize( - wtxn, - &mut rng, - dimensions, - false, - &indexing_context.must_stop_processing, - )?; - } - - index.put_embedding_configs(wtxn, index_embeddings)?; - } - - indexing_context.progress.update_progress(IndexingStep::PostProcessingFacets); - if index.facet_search(wtxn)? { - compute_facet_search_database(index, wtxn, global_fields_ids_map)?; - } - - compute_facet_level_database(index, wtxn, facet_field_ids_delta)?; - - indexing_context.progress.update_progress(IndexingStep::PostProcessingWords); - if let Some(prefix_delta) = compute_word_fst(index, wtxn)? { - compute_prefix_database(index, wtxn, prefix_delta, grenad_parameters)?; - } + post_processing::post_process( + indexing_context, + wtxn, + global_fields_ids_map, + facet_field_ids_delta, + )?; indexing_context.progress.update_progress(IndexingStep::Finalizing); @@ -533,321 +195,15 @@ where drop(fields_ids_map_store); let new_fields_ids_map = new_fields_ids_map.into_inner().unwrap(); - index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?; - - if let Some(new_primary_key) = new_primary_key { - index.put_primary_key(wtxn, new_primary_key.name())?; - } - - // used to update the localized and weighted maps while sharing the update code with the settings pipeline. - let mut inner_index_settings = InnerIndexSettings::from_index(index, wtxn, Some(embedders))?; - inner_index_settings.recompute_facets(wtxn, index)?; - inner_index_settings.recompute_searchables(wtxn, index)?; - index.put_field_distribution(wtxn, &field_distribution)?; - index.put_documents_ids(wtxn, &document_ids)?; - index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?; + update_index( + index, + wtxn, + new_fields_ids_map, + new_primary_key, + embedders, + field_distribution, + document_ids, + )?; Ok(()) } - -/// A function dedicated to manage all the available BBQueue frames. -/// -/// It reads all the available frames, do the corresponding database operations -/// and stops when no frame are available. -fn write_from_bbqueue( - writer_receiver: &mut WriterBbqueueReceiver<'_>, - index: &Index, - wtxn: &mut RwTxn<'_>, - arroy_writers: &HashMap, - aligned_embedding: &mut Vec, -) -> crate::Result<()> { - while let Some(frame_with_header) = writer_receiver.recv_frame() { - match frame_with_header.header() { - EntryHeader::DbOperation(operation) => { - let database_name = operation.database.database_name(); - let database = operation.database.database(index); - let frame = frame_with_header.frame(); - match operation.key_value(frame) { - (key, Some(value)) => { - if let Err(error) = database.put(wtxn, key, value) { - return Err(Error::InternalError(InternalError::StorePut { - database_name, - key: key.into(), - value_length: value.len(), - error, - })); - } - } - (key, None) => match database.delete(wtxn, key) { - Ok(false) => { - unreachable!("We tried to delete an unknown key: {key:?}") - } - Ok(_) => (), - Err(error) => { - return Err(Error::InternalError(InternalError::StoreDeletion { - database_name, - key: key.into(), - error, - })); - } - }, - } - } - EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }) => { - for (_index, (_name, _embedder, writer, dimensions)) in arroy_writers { - let dimensions = *dimensions; - writer.del_items(wtxn, dimensions, docid)?; - } - } - EntryHeader::ArroySetVectors(asvs) => { - let ArroySetVectors { docid, embedder_id, .. } = asvs; - let frame = frame_with_header.frame(); - let (_, _, writer, dimensions) = - arroy_writers.get(&embedder_id).expect("requested a missing embedder"); - let mut embeddings = Embeddings::new(*dimensions); - let all_embeddings = asvs.read_all_embeddings_into_vec(frame, aligned_embedding); - embeddings.append(all_embeddings.to_vec()).unwrap(); - writer.del_items(wtxn, *dimensions, docid)?; - writer.add_items(wtxn, docid, &embeddings)?; - } - } - } - - Ok(()) -} - -#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] -fn compute_prefix_database( - index: &Index, - wtxn: &mut RwTxn, - prefix_delta: PrefixDelta, - grenad_parameters: GrenadParameters, -) -> Result<()> { - let PrefixDelta { modified, deleted } = prefix_delta; - // Compute word prefix docids - compute_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; - // Compute exact word prefix docids - compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; - // Compute word prefix fid docids - compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; - // Compute word prefix position docids - compute_word_prefix_position_docids(wtxn, index, &modified, &deleted, grenad_parameters) -} - -#[tracing::instrument(level = "trace", skip_all, target = "indexing")] -fn compute_word_fst(index: &Index, wtxn: &mut RwTxn) -> Result> { - let rtxn = index.read_txn()?; - let words_fst = index.words_fst(&rtxn)?; - let mut word_fst_builder = WordFstBuilder::new(&words_fst)?; - let prefix_settings = index.prefix_settings(&rtxn)?; - word_fst_builder.with_prefix_settings(prefix_settings); - - let previous_words = index.word_docids.iter(&rtxn)?.remap_data_type::(); - let current_words = index.word_docids.iter(wtxn)?.remap_data_type::(); - for eob in merge_join_by(previous_words, current_words, |lhs, rhs| match (lhs, rhs) { - (Ok((l, _)), Ok((r, _))) => l.cmp(r), - (Err(_), _) | (_, Err(_)) => Ordering::Equal, - }) { - match eob { - EitherOrBoth::Both(lhs, rhs) => { - let (word, lhs_bytes) = lhs?; - let (_, rhs_bytes) = rhs?; - if lhs_bytes != rhs_bytes { - word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?; - } - } - EitherOrBoth::Left(result) => { - let (word, _) = result?; - word_fst_builder.register_word(DelAdd::Deletion, word.as_ref())?; - } - EitherOrBoth::Right(result) => { - let (word, _) = result?; - word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?; - } - } - } - - let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, &rtxn)?; - index.main.remap_types::().put(wtxn, WORDS_FST_KEY, &word_fst_mmap)?; - if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data { - index.main.remap_types::().put( - wtxn, - WORDS_PREFIXES_FST_KEY, - &prefixes_fst_mmap, - )?; - Ok(Some(prefix_delta)) - } else { - Ok(None) - } -} - -#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_search")] -fn compute_facet_search_database( - index: &Index, - wtxn: &mut RwTxn, - global_fields_ids_map: GlobalFieldsIdsMap, -) -> Result<()> { - let rtxn = index.read_txn()?; - let localized_attributes_rules = index.localized_attributes_rules(&rtxn)?; - let mut facet_search_builder = FacetSearchBuilder::new( - global_fields_ids_map, - localized_attributes_rules.unwrap_or_default(), - ); - - let previous_facet_id_string_docids = index - .facet_id_string_docids - .iter(&rtxn)? - .remap_data_type::() - .filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0)); - let current_facet_id_string_docids = index - .facet_id_string_docids - .iter(wtxn)? - .remap_data_type::() - .filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0)); - for eob in merge_join_by( - previous_facet_id_string_docids, - current_facet_id_string_docids, - |lhs, rhs| match (lhs, rhs) { - (Ok((l, _)), Ok((r, _))) => l.cmp(r), - (Err(_), _) | (_, Err(_)) => Ordering::Equal, - }, - ) { - match eob { - EitherOrBoth::Both(lhs, rhs) => { - let (_, _) = lhs?; - let (_, _) = rhs?; - } - EitherOrBoth::Left(result) => { - let (key, _) = result?; - facet_search_builder.register_from_key(DelAdd::Deletion, key)?; - } - EitherOrBoth::Right(result) => { - let (key, _) = result?; - facet_search_builder.register_from_key(DelAdd::Addition, key)?; - } - } - } - - facet_search_builder.merge_and_write(index, wtxn, &rtxn) -} - -#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")] -fn compute_facet_level_database( - index: &Index, - wtxn: &mut RwTxn, - facet_field_ids_delta: FacetFieldIdsDelta, -) -> Result<()> { - if let Some(modified_facet_string_ids) = facet_field_ids_delta.modified_facet_string_ids() { - let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string"); - let _entered = span.enter(); - FacetsUpdateBulk::new_not_updating_level_0( - index, - modified_facet_string_ids, - FacetType::String, - ) - .execute(wtxn)?; - } - if let Some(modified_facet_number_ids) = facet_field_ids_delta.modified_facet_number_ids() { - let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number"); - let _entered = span.enter(); - FacetsUpdateBulk::new_not_updating_level_0( - index, - modified_facet_number_ids, - FacetType::Number, - ) - .execute(wtxn)?; - } - - Ok(()) -} - -/// Returns the primary key that has already been set for this index or the -/// one we will guess by searching for the first key that contains "id" as a substring, -/// and whether the primary key changed -/// TODO move this elsewhere -pub fn retrieve_or_guess_primary_key<'a>( - rtxn: &'a RoTxn<'a>, - index: &Index, - new_fields_ids_map: &mut FieldsIdsMap, - primary_key_from_op: Option<&'a str>, - first_document: Option>, -) -> Result, bool), UserError>> { - // make sure that we have a declared primary key, either fetching it from the index or attempting to guess it. - - // do we have an existing declared primary key? - let (primary_key, has_changed) = if let Some(primary_key_from_db) = index.primary_key(rtxn)? { - // did we request a primary key in the operation? - match primary_key_from_op { - // we did, and it is different from the DB one - Some(primary_key_from_op) if primary_key_from_op != primary_key_from_db => { - return Ok(Err(UserError::PrimaryKeyCannotBeChanged( - primary_key_from_db.to_string(), - ))); - } - _ => (primary_key_from_db, false), - } - } else { - // no primary key in the DB => let's set one - // did we request a primary key in the operation? - let primary_key = if let Some(primary_key_from_op) = primary_key_from_op { - // set primary key from operation - primary_key_from_op - } else { - // guess primary key - let first_document = match first_document { - Some(document) => document, - // previous indexer when no pk is set + we send an empty payload => index_primary_key_no_candidate_found - None => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)), - }; - - let guesses: Result> = first_document - .keys() - .filter_map(|name| { - let Some(_) = new_fields_ids_map.insert(name) else { - return Some(Err(UserError::AttributeLimitReached.into())); - }; - name.to_lowercase().ends_with(DEFAULT_PRIMARY_KEY).then_some(Ok(name)) - }) - .collect(); - - let mut guesses = guesses?; - - // sort the keys in lexicographical order, so that fields are always in the same order. - guesses.sort_unstable(); - - match guesses.as_slice() { - [] => return Ok(Err(UserError::NoPrimaryKeyCandidateFound)), - [name] => { - tracing::info!("Primary key was not specified in index. Inferred to '{name}'"); - *name - } - multiple => { - return Ok(Err(UserError::MultiplePrimaryKeyCandidatesFound { - candidates: multiple - .iter() - .map(|candidate| candidate.to_string()) - .collect(), - })) - } - } - }; - (primary_key, true) - }; - - match PrimaryKey::new_or_insert(primary_key, new_fields_ids_map) { - Ok(primary_key) => Ok(Ok((primary_key, has_changed))), - Err(err) => Ok(Err(err)), - } -} - -fn request_threads() -> &'static ThreadPoolNoAbort { - static REQUEST_THREADS: OnceLock = OnceLock::new(); - - REQUEST_THREADS.get_or_init(|| { - ThreadPoolNoAbortBuilder::new() - .num_threads(crate::vector::REQUEST_PARALLELISM) - .thread_name(|index| format!("embedding-request-{index}")) - .build() - .unwrap() - }) -} diff --git a/crates/milli/src/update/new/indexer/post_processing.rs b/crates/milli/src/update/new/indexer/post_processing.rs new file mode 100644 index 000000000..6bd139068 --- /dev/null +++ b/crates/milli/src/update/new/indexer/post_processing.rs @@ -0,0 +1,187 @@ +use std::cmp::Ordering; + +use heed::types::{Bytes, DecodeIgnore, Str}; +use heed::RwTxn; +use itertools::{merge_join_by, EitherOrBoth}; + +use super::document_changes::IndexingContext; +use crate::facet::FacetType; +use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY}; +use crate::update::del_add::DelAdd; +use crate::update::new::facet_search_builder::FacetSearchBuilder; +use crate::update::new::steps::IndexingStep; +use crate::update::new::word_fst_builder::{PrefixData, PrefixDelta, WordFstBuilder}; +use crate::update::new::words_prefix_docids::{ + compute_exact_word_prefix_docids, compute_word_prefix_docids, compute_word_prefix_fid_docids, + compute_word_prefix_position_docids, +}; +use crate::update::new::FacetFieldIdsDelta; +use crate::update::{FacetsUpdateBulk, GrenadParameters}; +use crate::{GlobalFieldsIdsMap, Index, Result}; + +pub(super) fn post_process( + indexing_context: IndexingContext, + wtxn: &mut RwTxn<'_>, + global_fields_ids_map: GlobalFieldsIdsMap<'_>, + facet_field_ids_delta: FacetFieldIdsDelta, +) -> Result<()> +where + MSP: Fn() -> bool + Sync, +{ + let index = indexing_context.index; + indexing_context.progress.update_progress(IndexingStep::PostProcessingFacets); + if index.facet_search(wtxn)? { + compute_facet_search_database(index, wtxn, global_fields_ids_map)?; + } + compute_facet_level_database(index, wtxn, facet_field_ids_delta)?; + indexing_context.progress.update_progress(IndexingStep::PostProcessingWords); + if let Some(prefix_delta) = compute_word_fst(index, wtxn)? { + compute_prefix_database(index, wtxn, prefix_delta, indexing_context.grenad_parameters)?; + }; + Ok(()) +} + +#[tracing::instrument(level = "trace", skip_all, target = "indexing::prefix")] +fn compute_prefix_database( + index: &Index, + wtxn: &mut RwTxn, + prefix_delta: PrefixDelta, + grenad_parameters: &GrenadParameters, +) -> Result<()> { + let PrefixDelta { modified, deleted } = prefix_delta; + // Compute word prefix docids + compute_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; + // Compute exact word prefix docids + compute_exact_word_prefix_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; + // Compute word prefix fid docids + compute_word_prefix_fid_docids(wtxn, index, &modified, &deleted, grenad_parameters)?; + // Compute word prefix position docids + compute_word_prefix_position_docids(wtxn, index, &modified, &deleted, grenad_parameters) +} + +#[tracing::instrument(level = "trace", skip_all, target = "indexing")] +fn compute_word_fst(index: &Index, wtxn: &mut RwTxn) -> Result> { + let rtxn = index.read_txn()?; + let words_fst = index.words_fst(&rtxn)?; + let mut word_fst_builder = WordFstBuilder::new(&words_fst)?; + let prefix_settings = index.prefix_settings(&rtxn)?; + word_fst_builder.with_prefix_settings(prefix_settings); + + let previous_words = index.word_docids.iter(&rtxn)?.remap_data_type::(); + let current_words = index.word_docids.iter(wtxn)?.remap_data_type::(); + for eob in merge_join_by(previous_words, current_words, |lhs, rhs| match (lhs, rhs) { + (Ok((l, _)), Ok((r, _))) => l.cmp(r), + (Err(_), _) | (_, Err(_)) => Ordering::Equal, + }) { + match eob { + EitherOrBoth::Both(lhs, rhs) => { + let (word, lhs_bytes) = lhs?; + let (_, rhs_bytes) = rhs?; + if lhs_bytes != rhs_bytes { + word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?; + } + } + EitherOrBoth::Left(result) => { + let (word, _) = result?; + word_fst_builder.register_word(DelAdd::Deletion, word.as_ref())?; + } + EitherOrBoth::Right(result) => { + let (word, _) = result?; + word_fst_builder.register_word(DelAdd::Addition, word.as_ref())?; + } + } + } + + let (word_fst_mmap, prefix_data) = word_fst_builder.build(index, &rtxn)?; + index.main.remap_types::().put(wtxn, WORDS_FST_KEY, &word_fst_mmap)?; + if let Some(PrefixData { prefixes_fst_mmap, prefix_delta }) = prefix_data { + index.main.remap_types::().put( + wtxn, + WORDS_PREFIXES_FST_KEY, + &prefixes_fst_mmap, + )?; + Ok(Some(prefix_delta)) + } else { + Ok(None) + } +} + +#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_search")] +fn compute_facet_search_database( + index: &Index, + wtxn: &mut RwTxn, + global_fields_ids_map: GlobalFieldsIdsMap, +) -> Result<()> { + let rtxn = index.read_txn()?; + let localized_attributes_rules = index.localized_attributes_rules(&rtxn)?; + let mut facet_search_builder = FacetSearchBuilder::new( + global_fields_ids_map, + localized_attributes_rules.unwrap_or_default(), + ); + + let previous_facet_id_string_docids = index + .facet_id_string_docids + .iter(&rtxn)? + .remap_data_type::() + .filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0)); + let current_facet_id_string_docids = index + .facet_id_string_docids + .iter(wtxn)? + .remap_data_type::() + .filter(|r| r.as_ref().map_or(true, |(k, _)| k.level == 0)); + for eob in merge_join_by( + previous_facet_id_string_docids, + current_facet_id_string_docids, + |lhs, rhs| match (lhs, rhs) { + (Ok((l, _)), Ok((r, _))) => l.cmp(r), + (Err(_), _) | (_, Err(_)) => Ordering::Equal, + }, + ) { + match eob { + EitherOrBoth::Both(lhs, rhs) => { + let (_, _) = lhs?; + let (_, _) = rhs?; + } + EitherOrBoth::Left(result) => { + let (key, _) = result?; + facet_search_builder.register_from_key(DelAdd::Deletion, key)?; + } + EitherOrBoth::Right(result) => { + let (key, _) = result?; + facet_search_builder.register_from_key(DelAdd::Addition, key)?; + } + } + } + + facet_search_builder.merge_and_write(index, wtxn, &rtxn) +} + +#[tracing::instrument(level = "trace", skip_all, target = "indexing::facet_field_ids")] +fn compute_facet_level_database( + index: &Index, + wtxn: &mut RwTxn, + facet_field_ids_delta: FacetFieldIdsDelta, +) -> Result<()> { + if let Some(modified_facet_string_ids) = facet_field_ids_delta.modified_facet_string_ids() { + let span = tracing::trace_span!(target: "indexing::facet_field_ids", "string"); + let _entered = span.enter(); + FacetsUpdateBulk::new_not_updating_level_0( + index, + modified_facet_string_ids, + FacetType::String, + ) + .execute(wtxn)?; + } + if let Some(modified_facet_number_ids) = facet_field_ids_delta.modified_facet_number_ids() { + let span = tracing::trace_span!(target: "indexing::facet_field_ids", "number"); + let _entered = span.enter(); + FacetsUpdateBulk::new_not_updating_level_0( + index, + modified_facet_number_ids, + FacetType::Number, + ) + .execute(wtxn)?; + } + + Ok(()) +} diff --git a/crates/milli/src/update/new/indexer/write.rs b/crates/milli/src/update/new/indexer/write.rs new file mode 100644 index 000000000..fc647cfa5 --- /dev/null +++ b/crates/milli/src/update/new/indexer/write.rs @@ -0,0 +1,189 @@ +use std::sync::atomic::AtomicBool; + +use hashbrown::HashMap; +use heed::RwTxn; +use rand::SeedableRng as _; +use time::OffsetDateTime; + +use super::super::channel::*; +use crate::documents::PrimaryKey; +use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; +use crate::index::IndexEmbeddingConfig; +use crate::update::settings::InnerIndexSettings; +use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs, Embeddings}; +use crate::{Error, Index, InternalError, Result}; + +pub(super) fn write_to_db( + mut writer_receiver: WriterBbqueueReceiver<'_>, + finished_extraction: &AtomicBool, + index: &Index, + wtxn: &mut RwTxn<'_>, + arroy_writers: &HashMap, +) -> Result<()> { + // Used by by the ArroySetVector to copy the embedding into an + // aligned memory area, required by arroy to accept a new vector. + let mut aligned_embedding = Vec::new(); + let span = tracing::trace_span!(target: "indexing::write_db", "all"); + let _entered = span.enter(); + let span = tracing::trace_span!(target: "indexing::write_db", "post_merge"); + let mut _entered_post_merge = None; + while let Some(action) = writer_receiver.recv_action() { + if _entered_post_merge.is_none() + && finished_extraction.load(std::sync::atomic::Ordering::Relaxed) + { + _entered_post_merge = Some(span.enter()); + } + + match action { + ReceiverAction::WakeUp => (), + ReceiverAction::LargeEntry(LargeEntry { database, key, value }) => { + let database_name = database.database_name(); + let database = database.database(index); + if let Err(error) = database.put(wtxn, &key, &value) { + return Err(Error::InternalError(InternalError::StorePut { + database_name, + key: bstr::BString::from(&key[..]), + value_length: value.len(), + error, + })); + } + } + ReceiverAction::LargeVectors(large_vectors) => { + let LargeVectors { docid, embedder_id, .. } = large_vectors; + let (_, _, writer, dimensions) = + arroy_writers.get(&embedder_id).expect("requested a missing embedder"); + let mut embeddings = Embeddings::new(*dimensions); + for embedding in large_vectors.read_embeddings(*dimensions) { + embeddings.push(embedding.to_vec()).unwrap(); + } + writer.del_items(wtxn, *dimensions, docid)?; + writer.add_items(wtxn, docid, &embeddings)?; + } + } + + // Every time the is a message in the channel we search + // for new entries in the BBQueue buffers. + write_from_bbqueue( + &mut writer_receiver, + index, + wtxn, + arroy_writers, + &mut aligned_embedding, + )?; + } + write_from_bbqueue(&mut writer_receiver, index, wtxn, arroy_writers, &mut aligned_embedding)?; + Ok(()) +} + +#[tracing::instrument(level = "trace", skip_all, target = "indexing::vectors")] +pub(super) fn build_vectors( + index: &Index, + wtxn: &mut RwTxn<'_>, + index_embeddings: Vec, + arroy_writers: &mut HashMap, + must_stop_processing: &MSP, +) -> Result<()> +where + MSP: Fn() -> bool + Sync + Send, +{ + if index_embeddings.is_empty() { + return Ok(()); + } + + let mut rng = rand::rngs::StdRng::seed_from_u64(42); + for (_index, (_embedder_name, _embedder, writer, dimensions)) in arroy_writers { + let dimensions = *dimensions; + writer.build_and_quantize(wtxn, &mut rng, dimensions, false, must_stop_processing)?; + } + + index.put_embedding_configs(wtxn, index_embeddings)?; + Ok(()) +} + +pub(super) fn update_index( + index: &Index, + wtxn: &mut RwTxn<'_>, + new_fields_ids_map: FieldIdMapWithMetadata, + new_primary_key: Option>, + embedders: EmbeddingConfigs, + field_distribution: std::collections::BTreeMap, + document_ids: roaring::RoaringBitmap, +) -> Result<()> { + index.put_fields_ids_map(wtxn, new_fields_ids_map.as_fields_ids_map())?; + if let Some(new_primary_key) = new_primary_key { + index.put_primary_key(wtxn, new_primary_key.name())?; + } + let mut inner_index_settings = InnerIndexSettings::from_index(index, wtxn, Some(embedders))?; + inner_index_settings.recompute_facets(wtxn, index)?; + inner_index_settings.recompute_searchables(wtxn, index)?; + index.put_field_distribution(wtxn, &field_distribution)?; + index.put_documents_ids(wtxn, &document_ids)?; + index.set_updated_at(wtxn, &OffsetDateTime::now_utc())?; + Ok(()) +} + +/// A function dedicated to manage all the available BBQueue frames. +/// +/// It reads all the available frames, do the corresponding database operations +/// and stops when no frame are available. +pub fn write_from_bbqueue( + writer_receiver: &mut WriterBbqueueReceiver<'_>, + index: &Index, + wtxn: &mut RwTxn<'_>, + arroy_writers: &HashMap, + aligned_embedding: &mut Vec, +) -> crate::Result<()> { + while let Some(frame_with_header) = writer_receiver.recv_frame() { + match frame_with_header.header() { + EntryHeader::DbOperation(operation) => { + let database_name = operation.database.database_name(); + let database = operation.database.database(index); + let frame = frame_with_header.frame(); + match operation.key_value(frame) { + (key, Some(value)) => { + if let Err(error) = database.put(wtxn, key, value) { + return Err(Error::InternalError(InternalError::StorePut { + database_name, + key: key.into(), + value_length: value.len(), + error, + })); + } + } + (key, None) => match database.delete(wtxn, key) { + Ok(false) => { + unreachable!("We tried to delete an unknown key: {key:?}") + } + Ok(_) => (), + Err(error) => { + return Err(Error::InternalError(InternalError::StoreDeletion { + database_name, + key: key.into(), + error, + })); + } + }, + } + } + EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }) => { + for (_index, (_name, _embedder, writer, dimensions)) in arroy_writers { + let dimensions = *dimensions; + writer.del_items(wtxn, dimensions, docid)?; + } + } + EntryHeader::ArroySetVectors(asvs) => { + let ArroySetVectors { docid, embedder_id, .. } = asvs; + let frame = frame_with_header.frame(); + let (_, _, writer, dimensions) = + arroy_writers.get(&embedder_id).expect("requested a missing embedder"); + let mut embeddings = Embeddings::new(*dimensions); + let all_embeddings = asvs.read_all_embeddings_into_vec(frame, aligned_embedding); + embeddings.append(all_embeddings.to_vec()).unwrap(); + writer.del_items(wtxn, *dimensions, docid)?; + writer.add_items(wtxn, docid, &embeddings)?; + } + } + } + + Ok(()) +} diff --git a/crates/milli/src/update/new/words_prefix_docids.rs b/crates/milli/src/update/new/words_prefix_docids.rs index bf64049c3..7ba2b9b71 100644 --- a/crates/milli/src/update/new/words_prefix_docids.rs +++ b/crates/milli/src/update/new/words_prefix_docids.rs @@ -25,7 +25,7 @@ impl WordPrefixDocids { fn new( database: Database, prefix_database: Database, - grenad_parameters: GrenadParameters, + grenad_parameters: &GrenadParameters, ) -> WordPrefixDocids { WordPrefixDocids { database, @@ -161,7 +161,7 @@ impl WordPrefixIntegerDocids { fn new( database: Database, prefix_database: Database, - grenad_parameters: GrenadParameters, + grenad_parameters: &GrenadParameters, ) -> WordPrefixIntegerDocids { WordPrefixIntegerDocids { database, @@ -311,7 +311,7 @@ pub fn compute_word_prefix_docids( index: &Index, prefix_to_compute: &BTreeSet, prefix_to_delete: &BTreeSet, - grenad_parameters: GrenadParameters, + grenad_parameters: &GrenadParameters, ) -> Result<()> { WordPrefixDocids::new( index.word_docids.remap_key_type(), @@ -327,7 +327,7 @@ pub fn compute_exact_word_prefix_docids( index: &Index, prefix_to_compute: &BTreeSet, prefix_to_delete: &BTreeSet, - grenad_parameters: GrenadParameters, + grenad_parameters: &GrenadParameters, ) -> Result<()> { WordPrefixDocids::new( index.exact_word_docids.remap_key_type(), @@ -343,7 +343,7 @@ pub fn compute_word_prefix_fid_docids( index: &Index, prefix_to_compute: &BTreeSet, prefix_to_delete: &BTreeSet, - grenad_parameters: GrenadParameters, + grenad_parameters: &GrenadParameters, ) -> Result<()> { WordPrefixIntegerDocids::new( index.word_fid_docids.remap_key_type(), @@ -359,7 +359,7 @@ pub fn compute_word_prefix_position_docids( index: &Index, prefix_to_compute: &BTreeSet, prefix_to_delete: &BTreeSet, - grenad_parameters: GrenadParameters, + grenad_parameters: &GrenadParameters, ) -> Result<()> { WordPrefixIntegerDocids::new( index.word_position_docids.remap_key_type(),