diff --git a/crates/milli/src/update/new/indexer/extract.rs b/crates/milli/src/update/new/indexer/extract.rs index bb36ddc37..bbba9cfe8 100644 --- a/crates/milli/src/update/new/indexer/extract.rs +++ b/crates/milli/src/update/new/indexer/extract.rs @@ -12,13 +12,18 @@ use super::super::steps::IndexingStep; use super::super::thread_local::{FullySend, ThreadLocal}; use super::super::FacetFieldIdsDelta; use super::document_changes::{extract, DocumentChanges, IndexingContext}; +use crate::documents::FieldIdMapper; +use crate::documents::PrimaryKey; use crate::index::IndexEmbeddingConfig; use crate::progress::MergingWordCache; use crate::proximity::ProximityPrecision; use crate::update::new::extract::EmbeddingExtractor; use crate::update::new::merger::merge_and_send_rtree; use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; +use crate::update::settings::SettingsDelta; use crate::vector::EmbeddingConfigs; +use crate::Index; +use crate::InternalError; use crate::{Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; #[allow(clippy::too_many_arguments)] @@ -312,6 +317,28 @@ where Result::Ok((facet_field_ids_delta, index_embeddings)) } +pub(super) fn extract_all_settings_changes<'extractor, MSP, SD>( + indexing_context: IndexingContext, + indexer_span: Span, + extractor_sender: ExtractorBbqueueSender, + settings_delta: &SD, + extractor_allocs: &'extractor mut ThreadLocal>, + finished_extraction: &AtomicBool, + field_distribution: &mut BTreeMap, + mut index_embeddings: Vec, + modified_docids: &mut RoaringBitmap, +) -> Result> +where + MSP: Fn() -> bool + Sync, + SD: SettingsDelta, +{ + + indexing_context.progress.update_progress(IndexingStep::WaitingForDatabaseWrites); + finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed); + + Result::Ok(index_embeddings) +} + fn request_threads() -> &'static ThreadPoolNoAbort { static REQUEST_THREADS: OnceLock = OnceLock::new(); diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 2ea3c787e..913582dc3 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeMap; use std::sync::atomic::AtomicBool; use std::sync::{Once, RwLock}; use std::thread::{self, Builder}; @@ -20,8 +21,10 @@ use super::thread_local::ThreadLocal; use crate::documents::PrimaryKey; use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; use crate::progress::Progress; +use crate::update::settings::SettingsDelta; use crate::update::GrenadParameters; -use crate::vector::{ArroyWrapper, EmbeddingConfigs}; +use crate::vector::settings::{EmbedderAction, WriteBackToDocuments}; +use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs}; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort}; pub(crate) mod de; @@ -32,6 +35,7 @@ mod extract; mod guess_primary_key; mod partial_dump; mod post_processing; +pub mod settings_changes; mod update_by_function; mod write; @@ -40,8 +44,6 @@ static LOG_MEMORY_METRICS_ONCE: Once = Once::new(); /// This is the main function of this crate. /// /// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`]. -/// -/// TODO return stats #[allow(clippy::too_many_arguments)] // clippy: 😝 pub fn index<'pl, 'indexer, 'index, DC, MSP>( wtxn: &mut RwTxn, @@ -65,48 +67,8 @@ where let arroy_memory = grenad_parameters.max_memory; - // We reduce the actual memory used to 5%. The reason we do this here and not in Meilisearch - // is because we still use the old indexer for the settings and it is highly impacted by the - // max memory. So we keep the changes here and will remove these changes once we use the new - // indexer to also index settings. Related to #5125 and #5141. - let grenad_parameters = GrenadParameters { - max_memory: grenad_parameters.max_memory.map(|mm| mm * 5 / 100), - ..grenad_parameters - }; - - // 5% percent of the allocated memory for the extractors, or min 100MiB - // 5% percent of the allocated memory for the bbqueues, or min 50MiB - // - // Minimum capacity for bbqueues - let minimum_total_bbbuffer_capacity = 50 * 1024 * 1024 * pool.current_num_threads(); // 50 MiB - let minimum_total_extractors_capacity = minimum_total_bbbuffer_capacity * 2; - - let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or( - ( - GrenadParameters { - max_memory: Some(minimum_total_extractors_capacity), - ..grenad_parameters - }, - minimum_total_bbbuffer_capacity, - ), // 100 MiB by thread by default - |max_memory| { - let total_bbbuffer_capacity = max_memory.max(minimum_total_bbbuffer_capacity); - let new_grenad_parameters = GrenadParameters { - max_memory: Some(max_memory.max(minimum_total_extractors_capacity)), - ..grenad_parameters - }; - (new_grenad_parameters, total_bbbuffer_capacity) - }, - ); - - LOG_MEMORY_METRICS_ONCE.call_once(|| { - tracing::debug!( - "Indexation allocated memory metrics - \ - Total BBQueue size: {total_bbbuffer_capacity}, \ - Total extractor memory: {:?}", - grenad_parameters.max_memory, - ); - }); + let (grenad_parameters, total_bbbuffer_capacity) = + indexer_memory_settings(pool.current_num_threads(), grenad_parameters); let (extractor_sender, writer_receiver) = pool .install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000)) @@ -239,3 +201,219 @@ where Ok(congestion) } + +#[allow(clippy::too_many_arguments)] // clippy: 😝 +pub fn reindex<'pl, 'indexer, 'index, MSP, SD>( + wtxn: &mut RwTxn<'index>, + index: &'index Index, + pool: &ThreadPoolNoAbort, + grenad_parameters: GrenadParameters, + settings_delta: &'indexer SD, + must_stop_processing: &'indexer MSP, + progress: &'indexer Progress, +) -> Result +where + MSP: Fn() -> bool + Sync, + SD: SettingsDelta + Sync, +{ + let mut bbbuffers = Vec::new(); + let finished_extraction = AtomicBool::new(false); + + let arroy_memory = grenad_parameters.max_memory; + + let (grenad_parameters, total_bbbuffer_capacity) = + indexer_memory_settings(pool.current_num_threads(), grenad_parameters); + + let (extractor_sender, writer_receiver) = pool + .install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000)) + .unwrap(); + + let mut extractor_allocs = ThreadLocal::with_capacity(rayon::current_num_threads()); + + let db_fields_ids_map = index.fields_ids_map(wtxn)?; + let new_fields_ids_map = settings_delta.new_fields_ids_map().clone(); + let new_fields_ids_map = RwLock::new(new_fields_ids_map); + let fields_ids_map_store = ThreadLocal::with_capacity(rayon::current_num_threads()); + let doc_allocs = ThreadLocal::with_capacity(rayon::current_num_threads()); + + let indexing_context = IndexingContext { + index, + db_fields_ids_map: &db_fields_ids_map, + new_fields_ids_map: &new_fields_ids_map, + doc_allocs: &doc_allocs, + fields_ids_map_store: &fields_ids_map_store, + must_stop_processing, + progress, + grenad_parameters: &grenad_parameters, + }; + + let index_embeddings = index.embedding_configs(wtxn)?; + let mut field_distribution = index.field_distribution(wtxn)?; + let mut modified_docids = roaring::RoaringBitmap::new(); + + let congestion = thread::scope(|s| -> Result { + let indexer_span = tracing::Span::current(); + let finished_extraction = &finished_extraction; + // prevent moving the field_distribution and document_ids in the inner closure... + let field_distribution = &mut field_distribution; + let modified_docids = &mut modified_docids; + let extractor_handle = + Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { + pool.install(move || { + extract::extract_all_settings_changes( + indexing_context, + indexer_span, + extractor_sender, + settings_delta, + &mut extractor_allocs, + finished_extraction, + field_distribution, + index_embeddings, + modified_docids, + ) + }) + .unwrap() + })?; + + let new_embedders = settings_delta.new_embedders(); + let embedder_actions = settings_delta.embedder_actions(); + let index_embedder_category_ids = settings_delta.new_embedder_category_id(); + let mut arroy_writers = arroy_writers_from_embedder_actions( + index, + embedder_actions, + new_embedders, + index_embedder_category_ids, + )?; + + let congestion = + write_to_db(writer_receiver, finished_extraction, index, wtxn, &arroy_writers)?; + + indexing_context.progress.update_progress(IndexingStep::WaitingForExtractors); + + let index_embeddings = extractor_handle.join().unwrap()?; + + indexing_context.progress.update_progress(IndexingStep::WritingEmbeddingsToDatabase); + + pool.install(|| { + build_vectors( + index, + wtxn, + indexing_context.progress, + index_embeddings, + arroy_memory, + &mut arroy_writers, + Some(&embedder_actions), + &indexing_context.must_stop_processing, + ) + }) + .unwrap()?; + + indexing_context.progress.update_progress(IndexingStep::Finalizing); + + Ok(congestion) as Result<_> + })?; + + // required to into_inner the new_fields_ids_map + drop(fields_ids_map_store); + + let new_fields_ids_map = new_fields_ids_map.into_inner().unwrap(); + let document_ids = index.documents_ids(wtxn)?; + update_index( + index, + wtxn, + new_fields_ids_map, + None, + settings_delta.new_embedders().clone(), + field_distribution, + document_ids, + )?; + + Ok(congestion) +} + +fn arroy_writers_from_embedder_actions<'indexer, 'index>( + index: &'index Index, + embedder_actions: &'indexer BTreeMap, + embedders: &'indexer EmbeddingConfigs, + index_embedder_category_ids: &'indexer std::collections::HashMap, +) -> Result> { + let vector_arroy = index.vector_arroy; + + embedders + .inner_as_ref() + .iter() + .filter_map(|(embedder_name, (embedder, _, _))| match embedder_actions.get(embedder_name) { + None => None, + Some(action) if action.write_back().is_some() => None, + Some(action) => { + let Some(&embedder_category_id) = index_embedder_category_ids.get(embedder_name) + else { + return Some(Err(crate::error::Error::InternalError( + crate::InternalError::DatabaseMissingEntry { + db_name: crate::index::db_name::VECTOR_EMBEDDER_CATEGORY_ID, + key: None, + }, + ))); + }; + let writer = + ArroyWrapper::new(vector_arroy, embedder_category_id, action.was_quantized); + let dimensions = embedder.dimensions(); + Some(Ok(( + embedder_category_id, + (embedder_name.as_str(), embedder.as_ref(), writer, dimensions), + ))) + } + }) + .collect() +} + +fn indexer_memory_settings( + current_num_threads: usize, + grenad_parameters: GrenadParameters, +) -> (GrenadParameters, usize) { + // We reduce the actual memory used to 5%. The reason we do this here and not in Meilisearch + // is because we still use the old indexer for the settings and it is highly impacted by the + // max memory. So we keep the changes here and will remove these changes once we use the new + // indexer to also index settings. Related to #5125 and #5141. + let grenad_parameters = GrenadParameters { + max_memory: grenad_parameters.max_memory.map(|mm| mm * 5 / 100), + ..grenad_parameters + }; + + // 5% percent of the allocated memory for the extractors, or min 100MiB + // 5% percent of the allocated memory for the bbqueues, or min 50MiB + // + // Minimum capacity for bbqueues + let minimum_total_bbbuffer_capacity = 50 * 1024 * 1024 * current_num_threads; + // 50 MiB + let minimum_total_extractors_capacity = minimum_total_bbbuffer_capacity * 2; + + let (grenad_parameters, total_bbbuffer_capacity) = grenad_parameters.max_memory.map_or( + ( + GrenadParameters { + max_memory: Some(minimum_total_extractors_capacity), + ..grenad_parameters + }, + minimum_total_bbbuffer_capacity, + ), // 100 MiB by thread by default + |max_memory| { + let total_bbbuffer_capacity = max_memory.max(minimum_total_bbbuffer_capacity); + let new_grenad_parameters = GrenadParameters { + max_memory: Some(max_memory.max(minimum_total_extractors_capacity)), + ..grenad_parameters + }; + (new_grenad_parameters, total_bbbuffer_capacity) + }, + ); + + LOG_MEMORY_METRICS_ONCE.call_once(|| { + tracing::debug!( + "Indexation allocated memory metrics - \ + Total BBQueue size: {total_bbbuffer_capacity}, \ + Total extractor memory: {:?}", + grenad_parameters.max_memory, + ); + }); + + (grenad_parameters, total_bbbuffer_capacity) +} diff --git a/crates/milli/src/update/settings.rs b/crates/milli/src/update/settings.rs index a7ab2dd04..ec84cfb40 100644 --- a/crates/milli/src/update/settings.rs +++ b/crates/milli/src/update/settings.rs @@ -2218,6 +2218,38 @@ fn deserialize_sub_embedder( } } +/// Implement this trait for the settings delta type. +/// This is used in the new settings update flow and will allow to easily replace the old settings delta type: `InnerIndexSettingsDiff`. +pub trait SettingsDelta { + fn new_embedders(&self) -> &EmbeddingConfigs; + fn old_embedders(&self) -> &EmbeddingConfigs; + fn new_embedder_category_id(&self) -> &HashMap; + fn embedder_actions(&self) -> &BTreeMap; + fn new_fields_ids_map(&self) -> &FieldIdMapWithMetadata; +} + +impl SettingsDelta for InnerIndexSettingsDiff { + fn new_embedders(&self) -> &EmbeddingConfigs { + &self.new.embedding_configs + } + + fn old_embedders(&self) -> &EmbeddingConfigs { + &self.old.embedding_configs + } + + fn new_embedder_category_id(&self) -> &HashMap { + &self.new.embedder_category_id + } + + fn embedder_actions(&self) -> &BTreeMap { + &self.embedding_config_updates + } + + fn new_fields_ids_map(&self) -> &FieldIdMapWithMetadata { + &self.new.fields_ids_map + } +} + #[cfg(test)] #[path = "test_settings.rs"] mod tests;