Avoid rewritting documents that don't change

Ensure being on a reindex action before getting embedder_category_id

Fix document skip function
This commit is contained in:
ManyTheFish 2025-06-30 09:47:30 +02:00
parent 77802dabf6
commit 0687cf058a
2 changed files with 58 additions and 10 deletions

View file

@ -7,7 +7,7 @@ use hashbrown::HashMap;
use super::DelAddRoaringBitmap; use super::DelAddRoaringBitmap;
use crate::constants::RESERVED_GEO_FIELD_NAME; use crate::constants::RESERVED_GEO_FIELD_NAME;
use crate::update::new::channel::{DocumentsSender, ExtractorBbqueueSender}; use crate::update::new::channel::{DocumentsSender, ExtractorBbqueueSender};
use crate::update::new::document::{write_to_obkv, Document as _}; use crate::update::new::document::{write_to_obkv, Document};
use crate::update::new::document_change::DatabaseDocument; use crate::update::new::document_change::DatabaseDocument;
use crate::update::new::indexer::document_changes::{DocumentContext, Extractor, IndexingContext}; use crate::update::new::indexer::document_changes::{DocumentContext, Extractor, IndexingContext};
use crate::update::new::indexer::settings_changes::{ use crate::update::new::indexer::settings_changes::{
@ -15,6 +15,7 @@ use crate::update::new::indexer::settings_changes::{
}; };
use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::thread_local::{FullySend, ThreadLocal}; use crate::update::new::thread_local::{FullySend, ThreadLocal};
use crate::update::new::vector_document::VectorDocument;
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::update::settings::SettingsDelta; use crate::update::settings::SettingsDelta;
use crate::vector::settings::EmbedderAction; use crate::vector::settings::EmbedderAction;
@ -214,6 +215,11 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentE
&context.doc_alloc, &context.doc_alloc,
)?; )?;
// if the document doesn't need to be updated, we skip it
if !must_update_document(&vector_content, self.embedder_actions)? {
continue;
}
let content = write_to_obkv( let content = write_to_obkv(
&content, &content,
Some(&vector_content), Some(&vector_content),
@ -246,8 +252,7 @@ where
MSP: Fn() -> bool + Sync, MSP: Fn() -> bool + Sync,
SD: SettingsDelta, SD: SettingsDelta,
{ {
// skip if no embedder_actions if !must_update_database(settings_delta) {
if settings_delta.embedder_actions().is_empty() {
return Ok(()); return Ok(());
} }
@ -267,3 +272,42 @@ where
Ok(()) Ok(())
} }
fn must_update_database<SD: SettingsDelta>(settings_delta: &SD) -> bool {
settings_delta.embedder_actions().iter().any(|(name, action)| {
if action.reindex().is_some() {
// if action has a reindex, we need to update the documents database if the embedder is a new one
settings_delta.old_embedders().get(name).is_none()
} else {
// if action has a write_back, we need to update the documents database
action.write_back().is_some()
}
})
}
fn must_update_document<'s, 'a>(
vector_document: &'s impl VectorDocument<'s>,
embedder_actions: &'a BTreeMap<String, EmbedderAction>,
) -> Result<bool>
where
's: 'a,
{
// Check if any vector needs to be written back for the document
for (name, action) in embedder_actions {
// if the vector entry is not found, we don't need to update the document
let Some(vector_entry) = vector_document.vectors_for_key(name)? else {
continue;
};
// if the vector entry is user provided, we need to update the document by writing back vectors.
let write_back = action.write_back().is_some() && !vector_entry.regenerate;
// if the vector entry is a new embedder, we need to update the document removing the vectors from the document.
let new_embedder = action.reindex().is_some() && !vector_entry.has_configured_embedder;
if write_back || new_embedder {
return Ok(true);
}
}
Ok(false)
}

View file

@ -351,12 +351,16 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeEmbedding
for (embedder_name, (embedder, prompt, _is_quantized)) in embedders { for (embedder_name, (embedder, prompt, _is_quantized)) in embedders {
// if the embedder is not in the embedder_actions, we don't need to reindex. // if the embedder is not in the embedder_actions, we don't need to reindex.
if let Some((embedder_id, reindex_action)) = if let Some((embedder_id, reindex_action)) =
self.embedder_actions.get(embedder_name).and_then(|action| { self.embedder_actions
let embedder_id = self
.embedder_category_id
.get(embedder_name) .get(embedder_name)
.expect("embedder_category_id should be present"); // keep only the reindex actions
action.reindex().map(|reindex| (*embedder_id, reindex)) .and_then(EmbedderAction::reindex)
// map the reindex action to the embedder_id
.map(|reindex| {
let embedder_id = self.embedder_category_id.get(embedder_name).expect(
"An embedder_category_id must exist for all reindexed embedders",
);
(*embedder_id, reindex)
}) })
{ {
all_chunks.push(( all_chunks.push((