new settings indexer

This commit is contained in:
Louis Dureuil 2025-07-01 23:57:14 +02:00
parent 9ce5598fef
commit b086c51a23
No known key found for this signature in database
3 changed files with 262 additions and 124 deletions

View file

@ -1,5 +1,4 @@
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::fmt::Debug;
use bumpalo::collections::Vec as BVec;
@ -16,15 +15,17 @@ use crate::update::new::indexer::settings_changes::SettingsChangeExtractor;
use crate::update::new::thread_local::MostlySend;
use crate::update::new::vector_document::VectorDocument;
use crate::update::new::DocumentChange;
use crate::update::settings::SettingsDelta;
use crate::vector::db::{EmbedderInfo, EmbeddingStatus, EmbeddingStatusDelta};
use crate::vector::error::{
EmbedErrorKind, PossibleEmbeddingMistakes, UnusedVectorsDistributionBump,
};
use crate::vector::extractor::{
DocumentTemplateExtractor, Extractor as VectorExtractor, RequestFragmentExtractor,
DocumentTemplateExtractor, Extractor as VectorExtractor, ExtractorDiff,
RequestFragmentExtractor,
};
use crate::vector::session::{EmbedSession, Input, Metadata, OnEmbed};
use crate::vector::settings::{EmbedderAction, ReindexAction};
use crate::vector::settings::ReindexAction;
use crate::vector::{Embedding, RuntimeEmbedder, RuntimeEmbedders, RuntimeFragment};
use crate::{DocumentId, FieldDistribution, InternalError, Result, ThreadPoolNoAbort, UserError};
@ -260,44 +261,31 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> {
}
}
pub struct SettingsChangeEmbeddingExtractor<'a, 'b> {
embedders: &'a EmbeddingConfigs,
old_embedders: &'a EmbeddingConfigs,
embedder_actions: &'a BTreeMap<String, EmbedderAction>,
embedder_category_id: &'a std::collections::HashMap<String, u8>,
pub struct SettingsChangeEmbeddingExtractor<'a, 'b, SD> {
settings_delta: &'a SD,
embedder_stats: &'a EmbedderStats,
sender: EmbeddingSender<'a, 'b>,
possible_embedding_mistakes: PossibleEmbeddingMistakes,
threads: &'a ThreadPoolNoAbort,
}
impl<'a, 'b> SettingsChangeEmbeddingExtractor<'a, 'b> {
impl<'a, 'b, SD: SettingsDelta> SettingsChangeEmbeddingExtractor<'a, 'b, SD> {
#[allow(clippy::too_many_arguments)]
pub fn new(
embedders: &'a EmbeddingConfigs,
old_embedders: &'a EmbeddingConfigs,
embedder_actions: &'a BTreeMap<String, EmbedderAction>,
embedder_category_id: &'a std::collections::HashMap<String, u8>,
settings_delta: &'a SD,
embedder_stats: &'a EmbedderStats,
sender: EmbeddingSender<'a, 'b>,
field_distribution: &'a FieldDistribution,
threads: &'a ThreadPoolNoAbort,
) -> Self {
let possible_embedding_mistakes = PossibleEmbeddingMistakes::new(field_distribution);
Self {
embedders,
old_embedders,
embedder_actions,
embedder_category_id,
embedder_stats,
sender,
threads,
possible_embedding_mistakes,
}
Self { settings_delta, embedder_stats, sender, threads, possible_embedding_mistakes }
}
}
impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeEmbeddingExtractor<'_, '_> {
impl<'extractor, SD: SettingsDelta + Sync> SettingsChangeExtractor<'extractor>
for SettingsChangeEmbeddingExtractor<'_, '_, SD>
{
type Data = RefCell<EmbeddingExtractorData<'extractor>>;
fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> crate::Result<Self::Data> {
@ -309,32 +297,39 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeEmbedding
documents: impl Iterator<Item = crate::Result<DocumentIdentifiers<'doc>>>,
context: &'doc DocumentContext<Self::Data>,
) -> crate::Result<()> {
let embedders = self.embedders.inner_as_ref();
let old_embedders = self.old_embedders.inner_as_ref();
let embedders = self.settings_delta.new_embedders();
let old_embedders = self.settings_delta.old_embedders();
let unused_vectors_distribution = UnusedVectorsDistributionBump::new_in(&context.doc_alloc);
let mut all_chunks = BVec::with_capacity_in(embedders.len(), &context.doc_alloc);
for (embedder_name, (embedder, prompt, _is_quantized)) in embedders {
// if the embedder is not in the embedder_actions, we don't need to reindex.
if let Some((embedder_id, reindex_action)) =
self.embedder_actions
let embedder_configs = context.index.embedding_configs();
for (embedder_name, action) in self.settings_delta.embedder_actions().iter() {
let Some(reindex_action) = action.reindex() else {
continue;
};
let runtime = embedders
.get(embedder_name)
// keep only the reindex actions
.and_then(EmbedderAction::reindex)
// map the reindex action to the embedder_id
.map(|reindex| {
let embedder_id = self.embedder_category_id.get(embedder_name).expect(
.expect("A runtime must exist for all reindexed embedder");
let embedder_info = embedder_configs
.embedder_info(&context.rtxn, embedder_name)?
.unwrap_or_else(|| {
// new embedder
EmbedderInfo {
embedder_id: *self
.settings_delta
.new_embedder_category_id()
.get(embedder_name)
.expect(
"An embedder_category_id must exist for all reindexed embedders",
);
(*embedder_id, reindex)
})
{
),
embedding_status: EmbeddingStatus::new(),
}
});
all_chunks.push((
Chunks::new(
embedder,
embedder_id,
embedder_name,
prompt,
runtime,
embedder_info,
embedder_name.as_str(),
context.data,
&self.possible_embedding_mistakes,
self.embedder_stats,
@ -343,10 +338,8 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeEmbedding
&context.doc_alloc,
),
reindex_action,
))
));
}
}
for document in documents {
let document = document?;
@ -360,6 +353,16 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeEmbedding
for (chunks, reindex_action) in &mut all_chunks {
let embedder_name = chunks.embedder_name();
let current_vectors = current_vectors.vectors_for_key(embedder_name)?;
let (old_is_user_provided, _) =
chunks.is_user_provided_must_regenerate(document.docid());
let old_has_fragments = old_embedders
.get(embedder_name)
.map(|embedder| embedder.fragments().is_empty())
.unwrap_or_default();
let new_has_fragments = chunks.has_fragments();
let fragments_changed = old_has_fragments ^ new_has_fragments;
// if the vectors for this document have been already provided, we don't need to reindex.
let (is_new_embedder, must_regenerate) =
@ -368,60 +371,33 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeEmbedding
});
match reindex_action {
ReindexAction::RegeneratePrompts => {
ReindexAction::RegeneratePrompts | ReindexAction::RegenerateFragments(_) => {
if !must_regenerate {
continue;
}
// we need to regenerate the prompts for the document
// Get the old prompt and render the document with it
let Some((_, old_prompt, _)) = old_embedders.get(embedder_name) else {
unreachable!("ReindexAction::RegeneratePrompts implies that the embedder {embedder_name} is in the old_embedders")
};
let old_rendered = old_prompt.render_document(
document.external_document_id(),
document.current(
&context.rtxn,
context.index,
context.db_fields_ids_map,
)?,
context.new_fields_ids_map,
&context.doc_alloc,
)?;
// Get the new prompt and render the document with it
let new_prompt = chunks.prompt();
let new_rendered = new_prompt.render_document(
document.external_document_id(),
document.current(
&context.rtxn,
context.index,
context.db_fields_ids_map,
)?,
context.new_fields_ids_map,
&context.doc_alloc,
)?;
// Compare the rendered documents
// if they are different, regenerate the vectors
if new_rendered != old_rendered {
chunks.set_autogenerated(
chunks.settings_change_autogenerated(
document.docid(),
document.external_document_id(),
new_rendered,
document.current(
&context.rtxn,
context.index,
context.db_fields_ids_map,
)?,
self.settings_delta,
context.new_fields_ids_map,
&unused_vectors_distribution,
old_is_user_provided,
fragments_changed,
)?;
}
}
ReindexAction::FullReindex => {
let prompt = chunks.prompt();
// if no inserted vectors, then regenerate: true + no embeddings => autogenerate
if let Some(embeddings) = current_vectors
.and_then(|vectors| vectors.embeddings)
// insert the embeddings only for new embedders
.filter(|_| is_new_embedder)
{
chunks.set_regenerate(document.docid(), must_regenerate);
chunks.set_vectors(
document.external_document_id(),
document.docid(),
@ -431,24 +407,27 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeEmbedding
error: error.to_string(),
},
)?,
old_is_user_provided,
true,
must_regenerate,
)?;
} else if must_regenerate {
let rendered = prompt.render_document(
chunks.settings_change_autogenerated(
document.docid(),
document.external_document_id(),
document.current(
&context.rtxn,
context.index,
context.db_fields_ids_map,
)?,
self.settings_delta,
context.new_fields_ids_map,
&context.doc_alloc,
)?;
chunks.set_autogenerated(
document.docid(),
document.external_document_id(),
rendered,
&unused_vectors_distribution,
old_is_user_provided,
true,
)?;
} else if is_new_embedder {
chunks.set_status(document.docid(), false, true, false, false);
}
}
}
@ -585,7 +564,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
let embedder = &runtime.embedder;
let dimensions = embedder.dimensions();
let fragments = runtime.fragments.as_slice();
let fragments = runtime.fragments();
let kind = if fragments.is_empty() {
ChunkType::DocumentTemplate {
document_template: &runtime.document_template,
@ -627,6 +606,117 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
self.status.is_user_provided_must_regenerate(docid)
}
#[allow(clippy::too_many_arguments)]
pub fn settings_change_autogenerated<'doc, D: Document<'doc> + Debug, SD: SettingsDelta>(
&mut self,
docid: DocumentId,
external_docid: &'a str,
document: D,
settings_delta: &SD,
fields_ids_map: &'a RefCell<crate::GlobalFieldsIdsMap>,
unused_vectors_distribution: &UnusedVectorsDistributionBump<'a>,
old_is_user_provided: bool,
full_reindex: bool,
) -> Result<()>
where
'a: 'doc,
{
match &mut self.kind {
ChunkType::Fragments { fragments: _, session } => {
let doc_alloc = session.doc_alloc();
if old_is_user_provided | full_reindex {
session.on_embed_mut().clear_vectors(docid);
}
let mut extracted = false;
let extracted = &mut extracted;
settings_delta.try_for_each_fragment_diff(
session.embedder_name(),
|fragment_diff| {
let extractor = RequestFragmentExtractor::new(fragment_diff.new, doc_alloc)
.ignore_errors();
let old = if full_reindex {
None
} else {
fragment_diff.old.map(|old| {
RequestFragmentExtractor::new(old, doc_alloc).ignore_errors()
})
};
let metadata = Metadata {
docid,
external_docid,
extractor_id: extractor.extractor_id(),
};
match extractor.diff_settings(&document, &(), old.as_ref())? {
ExtractorDiff::Removed => {
OnEmbed::process_embedding_response(
session.on_embed_mut(),
crate::vector::session::EmbeddingResponse {
metadata,
embedding: None,
},
);
}
ExtractorDiff::Added(input) | ExtractorDiff::Updated(input) => {
*extracted = true;
session.request_embedding(
metadata,
input,
unused_vectors_distribution,
)?;
}
ExtractorDiff::Unchanged => { /* nothing to do */ }
}
Result::Ok(())
},
)?;
self.set_status(
docid,
old_is_user_provided,
true,
old_is_user_provided & !*extracted,
true,
);
}
ChunkType::DocumentTemplate { document_template, session } => {
let doc_alloc = session.doc_alloc();
let old_embedder = settings_delta.old_embedders().get(session.embedder_name());
let old_document_template = if full_reindex {
None
} else {
old_embedder.as_ref().map(|old_embedder| &old_embedder.document_template)
};
let extractor =
DocumentTemplateExtractor::new(document_template, doc_alloc, fields_ids_map);
let old_extractor = old_document_template.map(|old_document_template| {
DocumentTemplateExtractor::new(old_document_template, doc_alloc, fields_ids_map)
});
let metadata =
Metadata { docid, external_docid, extractor_id: extractor.extractor_id() };
match extractor.diff_settings(document, &external_docid, old_extractor.as_ref())? {
ExtractorDiff::Removed => {
OnEmbed::process_embedding_response(
session.on_embed_mut(),
crate::vector::session::EmbeddingResponse { metadata, embedding: None },
);
}
ExtractorDiff::Added(input) | ExtractorDiff::Updated(input) => {
session.request_embedding(metadata, input, unused_vectors_distribution)?;
}
ExtractorDiff::Unchanged => { /* do nothing */ }
}
self.set_status(docid, old_is_user_provided, true, false, true);
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub fn update_autogenerated<'doc, OD: Document<'doc> + Debug, ND: Document<'doc> + Debug>(
&mut self,
@ -862,6 +952,10 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
Ok(())
}
fn has_fragments(&self) -> bool {
matches!(self.kind, ChunkType::Fragments { .. })
}
}
#[allow(clippy::too_many_arguments)]

View file

@ -21,7 +21,7 @@ use crate::update::new::indexer::settings_changes::DocumentsIndentifiers;
use crate::update::new::merger::merge_and_send_rtree;
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
use crate::update::settings::SettingsDelta;
use crate::vector::db::IndexEmbeddingConfig;
use crate::vector::db::{EmbedderInfo, IndexEmbeddingConfig};
use crate::vector::RuntimeEmbedders;
use crate::{Index, InternalError, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
@ -333,12 +333,11 @@ pub(super) fn extract_all_settings_changes<MSP, SD>(
finished_extraction: &AtomicBool,
field_distribution: &mut BTreeMap<String, u64>,
mut index_embeddings: Vec<IndexEmbeddingConfig>,
modified_docids: &mut RoaringBitmap,
embedder_stats: &EmbedderStats,
) -> Result<Vec<IndexEmbeddingConfig>>
where
MSP: Fn() -> bool + Sync,
SD: SettingsDelta,
SD: SettingsDelta + Sync,
{
// Create the list of document ids to extract
let rtxn = indexing_context.index.read_txn()?;
@ -369,10 +368,7 @@ where
// extract the remaining embeddings
let extractor = SettingsChangeEmbeddingExtractor::new(
settings_delta.new_embedders(),
settings_delta.old_embedders(),
settings_delta.embedder_actions(),
settings_delta.new_embedder_category_id(),
settings_delta,
embedder_stats,
embedding_sender,
field_distribution,
@ -396,14 +392,25 @@ where
let span = tracing::debug_span!(target: "indexing::documents::merge", "vectors");
let _entered = span.enter();
let embedder_configs = indexing_context.index.embedding_configs();
for config in &mut index_embeddings {
// retrieve infos for existing embedder or create a fresh one
let mut infos =
embedder_configs.embedder_info(&rtxn, &config.name)?.unwrap_or_else(|| {
let embedder_id =
*settings_delta.new_embedder_category_id().get(&config.name).unwrap();
EmbedderInfo { embedder_id, embedding_status: Default::default() }
});
'data: for data in datastore.iter_mut() {
let data = &mut data.get_mut().0;
let Some(deladd) = data.remove(&config.name) else {
let Some(delta) = data.remove(&config.name) else {
continue 'data;
};
deladd.apply_to(&mut config.user_provided, modified_docids);
delta.apply_to(&mut infos.embedding_status);
}
extractor_sender.embeddings().embedding_status(&config.name, infos).unwrap();
}
}
}

View file

@ -23,7 +23,7 @@ use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
use crate::progress::{EmbedderStats, Progress};
use crate::update::settings::SettingsDelta;
use crate::update::GrenadParameters;
use crate::vector::settings::{EmbedderAction, WriteBackToDocuments};
use crate::vector::settings::{EmbedderAction, RemoveFragments, WriteBackToDocuments};
use crate::vector::{ArroyWrapper, Embedder, RuntimeEmbedders};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort};
@ -221,7 +221,7 @@ where
MSP: Fn() -> bool + Sync,
SD: SettingsDelta + Sync,
{
delete_old_embedders(wtxn, index, settings_delta)?;
delete_old_embedders_and_fragments(wtxn, index, settings_delta)?;
let mut bbbuffers = Vec::new();
let finished_extraction = AtomicBool::new(false);
@ -254,16 +254,14 @@ where
grenad_parameters: &grenad_parameters,
};
let index_embeddings = index.embedding_configs(wtxn)?;
let index_embeddings = index.embedding_configs().embedding_configs(wtxn)?;
let mut field_distribution = index.field_distribution(wtxn)?;
let mut modified_docids = roaring::RoaringBitmap::new();
let congestion = thread::scope(|s| -> Result<ChannelCongestion> {
let indexer_span = tracing::Span::current();
let finished_extraction = &finished_extraction;
// prevent moving the field_distribution and document_ids in the inner closure...
let field_distribution = &mut field_distribution;
let modified_docids = &mut modified_docids;
let extractor_handle =
Builder::new().name(S("indexer-extractors")).spawn_scoped(s, move || {
pool.install(move || {
@ -276,7 +274,6 @@ where
finished_extraction,
field_distribution,
index_embeddings,
modified_docids,
&embedder_stats,
)
})
@ -342,7 +339,7 @@ where
fn arroy_writers_from_embedder_actions<'indexer>(
index: &Index,
embedder_actions: &'indexer BTreeMap<String, EmbedderAction>,
embedders: &'indexer EmbeddingConfigs,
embedders: &'indexer RuntimeEmbedders,
index_embedder_category_ids: &'indexer std::collections::HashMap<String, u8>,
) -> Result<HashMap<u8, (&'indexer str, &'indexer Embedder, ArroyWrapper, usize)>> {
let vector_arroy = index.vector_arroy;
@ -350,7 +347,7 @@ fn arroy_writers_from_embedder_actions<'indexer>(
embedders
.inner_as_ref()
.iter()
.filter_map(|(embedder_name, (embedder, _, _))| match embedder_actions.get(embedder_name) {
.filter_map(|(embedder_name, runtime)| match embedder_actions.get(embedder_name) {
None => None,
Some(action) if action.write_back().is_some() => None,
Some(action) => {
@ -365,26 +362,66 @@ fn arroy_writers_from_embedder_actions<'indexer>(
};
let writer =
ArroyWrapper::new(vector_arroy, embedder_category_id, action.was_quantized);
let dimensions = embedder.dimensions();
let dimensions = runtime.embedder.dimensions();
Some(Ok((
embedder_category_id,
(embedder_name.as_str(), embedder.as_ref(), writer, dimensions),
(embedder_name.as_str(), runtime.embedder.as_ref(), writer, dimensions),
)))
}
})
.collect()
}
fn delete_old_embedders<SD>(wtxn: &mut RwTxn<'_>, index: &Index, settings_delta: &SD) -> Result<()>
fn delete_old_embedders_and_fragments<SD>(
wtxn: &mut RwTxn<'_>,
index: &Index,
settings_delta: &SD,
) -> Result<()>
where
SD: SettingsDelta,
{
for action in settings_delta.embedder_actions().values() {
if let Some(WriteBackToDocuments { embedder_id, .. }) = action.write_back() {
let Some(WriteBackToDocuments { embedder_id, .. }) = action.write_back() else {
continue;
};
let reader = ArroyWrapper::new(index.vector_arroy, *embedder_id, action.was_quantized);
let dimensions = reader.dimensions(wtxn)?;
let Some(dimensions) = reader.dimensions(wtxn)? else {
continue;
};
reader.clear(wtxn, dimensions)?;
}
// remove all vectors for the specified fragments
for (embedder_name, RemoveFragments { fragment_ids }, was_quantized) in
settings_delta.embedder_actions().iter().filter_map(|(name, action)| {
action.remove_fragments().map(|fragments| (name, fragments, action.was_quantized))
})
{
let Some(infos) = index.embedding_configs().embedder_info(wtxn, embedder_name)? else {
continue;
};
let arroy = ArroyWrapper::new(index.vector_arroy, infos.embedder_id, was_quantized);
let Some(dimensions) = arroy.dimensions(wtxn)? else {
continue;
};
for fragment_id in fragment_ids {
// we must keep the user provided embeddings that ended up in this store
if infos.embedding_status.user_provided_docids().is_empty() {
// no user provided: clear store
arroy.clear_store(wtxn, *fragment_id, dimensions)?;
continue;
}
// some user provided, remove only the ids that are not user provided
let to_delete = arroy.items_in_store(wtxn, *fragment_id, |items| {
items - infos.embedding_status.user_provided_docids()
})?;
for to_delete in to_delete {
arroy.del_item_in_store(wtxn, to_delete, *fragment_id, dimensions)?;
}
}
}
Ok(())