From b086c51a232dd76406525c7caa128daa9bc5b10d Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Tue, 1 Jul 2025 23:57:14 +0200 Subject: [PATCH] new settings indexer --- .../src/update/new/extract/vectors/mod.rs | 294 ++++++++++++------ .../milli/src/update/new/indexer/extract.rs | 25 +- crates/milli/src/update/new/indexer/mod.rs | 67 +++- 3 files changed, 262 insertions(+), 124 deletions(-) diff --git a/crates/milli/src/update/new/extract/vectors/mod.rs b/crates/milli/src/update/new/extract/vectors/mod.rs index 3b8f5fa58..c08fadb14 100644 --- a/crates/milli/src/update/new/extract/vectors/mod.rs +++ b/crates/milli/src/update/new/extract/vectors/mod.rs @@ -1,5 +1,4 @@ use std::cell::RefCell; -use std::collections::BTreeMap; use std::fmt::Debug; use bumpalo::collections::Vec as BVec; @@ -16,15 +15,17 @@ use crate::update::new::indexer::settings_changes::SettingsChangeExtractor; use crate::update::new::thread_local::MostlySend; use crate::update::new::vector_document::VectorDocument; use crate::update::new::DocumentChange; +use crate::update::settings::SettingsDelta; use crate::vector::db::{EmbedderInfo, EmbeddingStatus, EmbeddingStatusDelta}; use crate::vector::error::{ EmbedErrorKind, PossibleEmbeddingMistakes, UnusedVectorsDistributionBump, }; use crate::vector::extractor::{ - DocumentTemplateExtractor, Extractor as VectorExtractor, RequestFragmentExtractor, + DocumentTemplateExtractor, Extractor as VectorExtractor, ExtractorDiff, + RequestFragmentExtractor, }; use crate::vector::session::{EmbedSession, Input, Metadata, OnEmbed}; -use crate::vector::settings::{EmbedderAction, ReindexAction}; +use crate::vector::settings::ReindexAction; use crate::vector::{Embedding, RuntimeEmbedder, RuntimeEmbedders, RuntimeFragment}; use crate::{DocumentId, FieldDistribution, InternalError, Result, ThreadPoolNoAbort, UserError}; @@ -260,44 +261,31 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> { } } -pub struct SettingsChangeEmbeddingExtractor<'a, 'b> { - embedders: &'a EmbeddingConfigs, - old_embedders: &'a EmbeddingConfigs, - embedder_actions: &'a BTreeMap, - embedder_category_id: &'a std::collections::HashMap, +pub struct SettingsChangeEmbeddingExtractor<'a, 'b, SD> { + settings_delta: &'a SD, embedder_stats: &'a EmbedderStats, sender: EmbeddingSender<'a, 'b>, possible_embedding_mistakes: PossibleEmbeddingMistakes, threads: &'a ThreadPoolNoAbort, } -impl<'a, 'b> SettingsChangeEmbeddingExtractor<'a, 'b> { +impl<'a, 'b, SD: SettingsDelta> SettingsChangeEmbeddingExtractor<'a, 'b, SD> { #[allow(clippy::too_many_arguments)] pub fn new( - embedders: &'a EmbeddingConfigs, - old_embedders: &'a EmbeddingConfigs, - embedder_actions: &'a BTreeMap, - embedder_category_id: &'a std::collections::HashMap, + settings_delta: &'a SD, embedder_stats: &'a EmbedderStats, sender: EmbeddingSender<'a, 'b>, field_distribution: &'a FieldDistribution, threads: &'a ThreadPoolNoAbort, ) -> Self { let possible_embedding_mistakes = PossibleEmbeddingMistakes::new(field_distribution); - Self { - embedders, - old_embedders, - embedder_actions, - embedder_category_id, - embedder_stats, - sender, - threads, - possible_embedding_mistakes, - } + Self { settings_delta, embedder_stats, sender, threads, possible_embedding_mistakes } } } -impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeEmbeddingExtractor<'_, '_> { +impl<'extractor, SD: SettingsDelta + Sync> SettingsChangeExtractor<'extractor> + for SettingsChangeEmbeddingExtractor<'_, '_, SD> +{ type Data = RefCell>; fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> crate::Result { @@ -309,44 +297,49 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeEmbedding documents: impl Iterator>>, context: &'doc DocumentContext, ) -> crate::Result<()> { - let embedders = self.embedders.inner_as_ref(); - let old_embedders = self.old_embedders.inner_as_ref(); + let embedders = self.settings_delta.new_embedders(); + let old_embedders = self.settings_delta.old_embedders(); let unused_vectors_distribution = UnusedVectorsDistributionBump::new_in(&context.doc_alloc); let mut all_chunks = BVec::with_capacity_in(embedders.len(), &context.doc_alloc); - for (embedder_name, (embedder, prompt, _is_quantized)) in embedders { - // if the embedder is not in the embedder_actions, we don't need to reindex. - if let Some((embedder_id, reindex_action)) = - self.embedder_actions - .get(embedder_name) - // keep only the reindex actions - .and_then(EmbedderAction::reindex) - // map the reindex action to the embedder_id - .map(|reindex| { - let embedder_id = self.embedder_category_id.get(embedder_name).expect( - "An embedder_category_id must exist for all reindexed embedders", - ); - (*embedder_id, reindex) - }) - { - all_chunks.push(( - Chunks::new( - embedder, - embedder_id, - embedder_name, - prompt, - context.data, - &self.possible_embedding_mistakes, - self.embedder_stats, - self.threads, - self.sender, - &context.doc_alloc, - ), - reindex_action, - )) - } + let embedder_configs = context.index.embedding_configs(); + for (embedder_name, action) in self.settings_delta.embedder_actions().iter() { + let Some(reindex_action) = action.reindex() else { + continue; + }; + let runtime = embedders + .get(embedder_name) + .expect("A runtime must exist for all reindexed embedder"); + let embedder_info = embedder_configs + .embedder_info(&context.rtxn, embedder_name)? + .unwrap_or_else(|| { + // new embedder + EmbedderInfo { + embedder_id: *self + .settings_delta + .new_embedder_category_id() + .get(embedder_name) + .expect( + "An embedder_category_id must exist for all reindexed embedders", + ), + embedding_status: EmbeddingStatus::new(), + } + }); + all_chunks.push(( + Chunks::new( + runtime, + embedder_info, + embedder_name.as_str(), + context.data, + &self.possible_embedding_mistakes, + self.embedder_stats, + self.threads, + self.sender, + &context.doc_alloc, + ), + reindex_action, + )); } - for document in documents { let document = document?; @@ -360,6 +353,16 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeEmbedding for (chunks, reindex_action) in &mut all_chunks { let embedder_name = chunks.embedder_name(); let current_vectors = current_vectors.vectors_for_key(embedder_name)?; + let (old_is_user_provided, _) = + chunks.is_user_provided_must_regenerate(document.docid()); + let old_has_fragments = old_embedders + .get(embedder_name) + .map(|embedder| embedder.fragments().is_empty()) + .unwrap_or_default(); + + let new_has_fragments = chunks.has_fragments(); + + let fragments_changed = old_has_fragments ^ new_has_fragments; // if the vectors for this document have been already provided, we don't need to reindex. let (is_new_embedder, must_regenerate) = @@ -368,60 +371,33 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeEmbedding }); match reindex_action { - ReindexAction::RegeneratePrompts => { + ReindexAction::RegeneratePrompts | ReindexAction::RegenerateFragments(_) => { if !must_regenerate { continue; } // we need to regenerate the prompts for the document - - // Get the old prompt and render the document with it - let Some((_, old_prompt, _)) = old_embedders.get(embedder_name) else { - unreachable!("ReindexAction::RegeneratePrompts implies that the embedder {embedder_name} is in the old_embedders") - }; - let old_rendered = old_prompt.render_document( + chunks.settings_change_autogenerated( + document.docid(), document.external_document_id(), document.current( &context.rtxn, context.index, context.db_fields_ids_map, )?, + self.settings_delta, context.new_fields_ids_map, - &context.doc_alloc, + &unused_vectors_distribution, + old_is_user_provided, + fragments_changed, )?; - - // Get the new prompt and render the document with it - let new_prompt = chunks.prompt(); - let new_rendered = new_prompt.render_document( - document.external_document_id(), - document.current( - &context.rtxn, - context.index, - context.db_fields_ids_map, - )?, - context.new_fields_ids_map, - &context.doc_alloc, - )?; - - // Compare the rendered documents - // if they are different, regenerate the vectors - if new_rendered != old_rendered { - chunks.set_autogenerated( - document.docid(), - document.external_document_id(), - new_rendered, - &unused_vectors_distribution, - )?; - } } ReindexAction::FullReindex => { - let prompt = chunks.prompt(); // if no inserted vectors, then regenerate: true + no embeddings => autogenerate if let Some(embeddings) = current_vectors .and_then(|vectors| vectors.embeddings) // insert the embeddings only for new embedders .filter(|_| is_new_embedder) { - chunks.set_regenerate(document.docid(), must_regenerate); chunks.set_vectors( document.external_document_id(), document.docid(), @@ -431,24 +407,27 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeEmbedding error: error.to_string(), }, )?, + old_is_user_provided, + true, + must_regenerate, )?; } else if must_regenerate { - let rendered = prompt.render_document( + chunks.settings_change_autogenerated( + document.docid(), document.external_document_id(), document.current( &context.rtxn, context.index, context.db_fields_ids_map, )?, + self.settings_delta, context.new_fields_ids_map, - &context.doc_alloc, - )?; - chunks.set_autogenerated( - document.docid(), - document.external_document_id(), - rendered, &unused_vectors_distribution, + old_is_user_provided, + true, )?; + } else if is_new_embedder { + chunks.set_status(document.docid(), false, true, false, false); } } } @@ -585,7 +564,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> { let embedder = &runtime.embedder; let dimensions = embedder.dimensions(); - let fragments = runtime.fragments.as_slice(); + let fragments = runtime.fragments(); let kind = if fragments.is_empty() { ChunkType::DocumentTemplate { document_template: &runtime.document_template, @@ -627,6 +606,117 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> { self.status.is_user_provided_must_regenerate(docid) } + #[allow(clippy::too_many_arguments)] + pub fn settings_change_autogenerated<'doc, D: Document<'doc> + Debug, SD: SettingsDelta>( + &mut self, + docid: DocumentId, + external_docid: &'a str, + document: D, + settings_delta: &SD, + fields_ids_map: &'a RefCell, + unused_vectors_distribution: &UnusedVectorsDistributionBump<'a>, + old_is_user_provided: bool, + full_reindex: bool, + ) -> Result<()> + where + 'a: 'doc, + { + match &mut self.kind { + ChunkType::Fragments { fragments: _, session } => { + let doc_alloc = session.doc_alloc(); + + if old_is_user_provided | full_reindex { + session.on_embed_mut().clear_vectors(docid); + } + + let mut extracted = false; + let extracted = &mut extracted; + + settings_delta.try_for_each_fragment_diff( + session.embedder_name(), + |fragment_diff| { + let extractor = RequestFragmentExtractor::new(fragment_diff.new, doc_alloc) + .ignore_errors(); + let old = if full_reindex { + None + } else { + fragment_diff.old.map(|old| { + RequestFragmentExtractor::new(old, doc_alloc).ignore_errors() + }) + }; + let metadata = Metadata { + docid, + external_docid, + extractor_id: extractor.extractor_id(), + }; + + match extractor.diff_settings(&document, &(), old.as_ref())? { + ExtractorDiff::Removed => { + OnEmbed::process_embedding_response( + session.on_embed_mut(), + crate::vector::session::EmbeddingResponse { + metadata, + embedding: None, + }, + ); + } + ExtractorDiff::Added(input) | ExtractorDiff::Updated(input) => { + *extracted = true; + session.request_embedding( + metadata, + input, + unused_vectors_distribution, + )?; + } + ExtractorDiff::Unchanged => { /* nothing to do */ } + } + + Result::Ok(()) + }, + )?; + self.set_status( + docid, + old_is_user_provided, + true, + old_is_user_provided & !*extracted, + true, + ); + } + ChunkType::DocumentTemplate { document_template, session } => { + let doc_alloc = session.doc_alloc(); + + let old_embedder = settings_delta.old_embedders().get(session.embedder_name()); + let old_document_template = if full_reindex { + None + } else { + old_embedder.as_ref().map(|old_embedder| &old_embedder.document_template) + }; + let extractor = + DocumentTemplateExtractor::new(document_template, doc_alloc, fields_ids_map); + let old_extractor = old_document_template.map(|old_document_template| { + DocumentTemplateExtractor::new(old_document_template, doc_alloc, fields_ids_map) + }); + let metadata = + Metadata { docid, external_docid, extractor_id: extractor.extractor_id() }; + + match extractor.diff_settings(document, &external_docid, old_extractor.as_ref())? { + ExtractorDiff::Removed => { + OnEmbed::process_embedding_response( + session.on_embed_mut(), + crate::vector::session::EmbeddingResponse { metadata, embedding: None }, + ); + } + ExtractorDiff::Added(input) | ExtractorDiff::Updated(input) => { + session.request_embedding(metadata, input, unused_vectors_distribution)?; + } + ExtractorDiff::Unchanged => { /* do nothing */ } + } + self.set_status(docid, old_is_user_provided, true, false, true); + } + } + Ok(()) + } + #[allow(clippy::too_many_arguments)] pub fn update_autogenerated<'doc, OD: Document<'doc> + Debug, ND: Document<'doc> + Debug>( &mut self, @@ -862,6 +952,10 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> { Ok(()) } + + fn has_fragments(&self) -> bool { + matches!(self.kind, ChunkType::Fragments { .. }) + } } #[allow(clippy::too_many_arguments)] diff --git a/crates/milli/src/update/new/indexer/extract.rs b/crates/milli/src/update/new/indexer/extract.rs index a3e7842c2..abfb4d6da 100644 --- a/crates/milli/src/update/new/indexer/extract.rs +++ b/crates/milli/src/update/new/indexer/extract.rs @@ -21,7 +21,7 @@ use crate::update::new::indexer::settings_changes::DocumentsIndentifiers; use crate::update::new::merger::merge_and_send_rtree; use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; use crate::update::settings::SettingsDelta; -use crate::vector::db::IndexEmbeddingConfig; +use crate::vector::db::{EmbedderInfo, IndexEmbeddingConfig}; use crate::vector::RuntimeEmbedders; use crate::{Index, InternalError, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; @@ -333,12 +333,11 @@ pub(super) fn extract_all_settings_changes( finished_extraction: &AtomicBool, field_distribution: &mut BTreeMap, mut index_embeddings: Vec, - modified_docids: &mut RoaringBitmap, embedder_stats: &EmbedderStats, ) -> Result> where MSP: Fn() -> bool + Sync, - SD: SettingsDelta, + SD: SettingsDelta + Sync, { // Create the list of document ids to extract let rtxn = indexing_context.index.read_txn()?; @@ -369,10 +368,7 @@ where // extract the remaining embeddings let extractor = SettingsChangeEmbeddingExtractor::new( - settings_delta.new_embedders(), - settings_delta.old_embedders(), - settings_delta.embedder_actions(), - settings_delta.new_embedder_category_id(), + settings_delta, embedder_stats, embedding_sender, field_distribution, @@ -396,14 +392,25 @@ where let span = tracing::debug_span!(target: "indexing::documents::merge", "vectors"); let _entered = span.enter(); + let embedder_configs = indexing_context.index.embedding_configs(); for config in &mut index_embeddings { + // retrieve infos for existing embedder or create a fresh one + let mut infos = + embedder_configs.embedder_info(&rtxn, &config.name)?.unwrap_or_else(|| { + let embedder_id = + *settings_delta.new_embedder_category_id().get(&config.name).unwrap(); + EmbedderInfo { embedder_id, embedding_status: Default::default() } + }); + 'data: for data in datastore.iter_mut() { let data = &mut data.get_mut().0; - let Some(deladd) = data.remove(&config.name) else { + let Some(delta) = data.remove(&config.name) else { continue 'data; }; - deladd.apply_to(&mut config.user_provided, modified_docids); + delta.apply_to(&mut infos.embedding_status); } + + extractor_sender.embeddings().embedding_status(&config.name, infos).unwrap(); } } } diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 507d1a650..a6ba3a919 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -23,7 +23,7 @@ use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; use crate::progress::{EmbedderStats, Progress}; use crate::update::settings::SettingsDelta; use crate::update::GrenadParameters; -use crate::vector::settings::{EmbedderAction, WriteBackToDocuments}; +use crate::vector::settings::{EmbedderAction, RemoveFragments, WriteBackToDocuments}; use crate::vector::{ArroyWrapper, Embedder, RuntimeEmbedders}; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort}; @@ -221,7 +221,7 @@ where MSP: Fn() -> bool + Sync, SD: SettingsDelta + Sync, { - delete_old_embedders(wtxn, index, settings_delta)?; + delete_old_embedders_and_fragments(wtxn, index, settings_delta)?; let mut bbbuffers = Vec::new(); let finished_extraction = AtomicBool::new(false); @@ -254,16 +254,14 @@ where grenad_parameters: &grenad_parameters, }; - let index_embeddings = index.embedding_configs(wtxn)?; + let index_embeddings = index.embedding_configs().embedding_configs(wtxn)?; let mut field_distribution = index.field_distribution(wtxn)?; - let mut modified_docids = roaring::RoaringBitmap::new(); let congestion = thread::scope(|s| -> Result { let indexer_span = tracing::Span::current(); let finished_extraction = &finished_extraction; // prevent moving the field_distribution and document_ids in the inner closure... let field_distribution = &mut field_distribution; - let modified_docids = &mut modified_docids; let extractor_handle = Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || { pool.install(move || { @@ -276,7 +274,6 @@ where finished_extraction, field_distribution, index_embeddings, - modified_docids, &embedder_stats, ) }) @@ -342,7 +339,7 @@ where fn arroy_writers_from_embedder_actions<'indexer>( index: &Index, embedder_actions: &'indexer BTreeMap, - embedders: &'indexer EmbeddingConfigs, + embedders: &'indexer RuntimeEmbedders, index_embedder_category_ids: &'indexer std::collections::HashMap, ) -> Result> { let vector_arroy = index.vector_arroy; @@ -350,7 +347,7 @@ fn arroy_writers_from_embedder_actions<'indexer>( embedders .inner_as_ref() .iter() - .filter_map(|(embedder_name, (embedder, _, _))| match embedder_actions.get(embedder_name) { + .filter_map(|(embedder_name, runtime)| match embedder_actions.get(embedder_name) { None => None, Some(action) if action.write_back().is_some() => None, Some(action) => { @@ -365,25 +362,65 @@ fn arroy_writers_from_embedder_actions<'indexer>( }; let writer = ArroyWrapper::new(vector_arroy, embedder_category_id, action.was_quantized); - let dimensions = embedder.dimensions(); + let dimensions = runtime.embedder.dimensions(); Some(Ok(( embedder_category_id, - (embedder_name.as_str(), embedder.as_ref(), writer, dimensions), + (embedder_name.as_str(), runtime.embedder.as_ref(), writer, dimensions), ))) } }) .collect() } -fn delete_old_embedders(wtxn: &mut RwTxn<'_>, index: &Index, settings_delta: &SD) -> Result<()> +fn delete_old_embedders_and_fragments( + wtxn: &mut RwTxn<'_>, + index: &Index, + settings_delta: &SD, +) -> Result<()> where SD: SettingsDelta, { for action in settings_delta.embedder_actions().values() { - if let Some(WriteBackToDocuments { embedder_id, .. }) = action.write_back() { - let reader = ArroyWrapper::new(index.vector_arroy, *embedder_id, action.was_quantized); - let dimensions = reader.dimensions(wtxn)?; - reader.clear(wtxn, dimensions)?; + let Some(WriteBackToDocuments { embedder_id, .. }) = action.write_back() else { + continue; + }; + let reader = ArroyWrapper::new(index.vector_arroy, *embedder_id, action.was_quantized); + let Some(dimensions) = reader.dimensions(wtxn)? else { + continue; + }; + reader.clear(wtxn, dimensions)?; + } + + // remove all vectors for the specified fragments + for (embedder_name, RemoveFragments { fragment_ids }, was_quantized) in + settings_delta.embedder_actions().iter().filter_map(|(name, action)| { + action.remove_fragments().map(|fragments| (name, fragments, action.was_quantized)) + }) + { + let Some(infos) = index.embedding_configs().embedder_info(wtxn, embedder_name)? else { + continue; + }; + let arroy = ArroyWrapper::new(index.vector_arroy, infos.embedder_id, was_quantized); + let Some(dimensions) = arroy.dimensions(wtxn)? else { + continue; + }; + for fragment_id in fragment_ids { + // we must keep the user provided embeddings that ended up in this store + + if infos.embedding_status.user_provided_docids().is_empty() { + // no user provided: clear store + arroy.clear_store(wtxn, *fragment_id, dimensions)?; + continue; + } + + // some user provided, remove only the ids that are not user provided + let to_delete = arroy.items_in_store(wtxn, *fragment_id, |items| { + items - infos.embedding_status.user_provided_docids() + })?; + + for to_delete in to_delete { + arroy.del_item_in_store(wtxn, to_delete, *fragment_id, dimensions)?; + } } }