diff --git a/crates/milli/src/update/new/document.rs b/crates/milli/src/update/new/document.rs index 1ef44fc8d..c7156c120 100644 --- a/crates/milli/src/update/new/document.rs +++ b/crates/milli/src/update/new/document.rs @@ -9,6 +9,7 @@ use super::vector_document::VectorDocument; use super::{KvReaderFieldId, KvWriterFieldId}; use crate::constants::{RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME}; use crate::documents::FieldIdMapper; +use crate::vector::settings::EmbedderAction; use crate::{DocumentId, GlobalFieldsIdsMap, Index, InternalError, Result, UserError}; /// A view into a document that can represent either the current version from the DB, @@ -309,6 +310,7 @@ where pub fn write_to_obkv<'s, 'a, 'map, 'buffer>( document: &'s impl Document<'s>, vector_document: Option<&'s impl VectorDocument<'s>>, + embedder_actions: &'a BTreeMap, fields_ids_map: &'a mut GlobalFieldsIdsMap<'map>, mut document_buffer: &'a mut bumpalo::collections::Vec<'buffer, u8>, ) -> Result<&'a KvReaderFieldId> @@ -338,20 +340,39 @@ where for res in vector_document.iter_vectors() { let (name, entry) = res?; if entry.has_configured_embedder { - continue; // we don't write vectors with configured embedder in documents + if let Some(action) = embedder_actions.get(name) { + if action.write_back().is_some() && !entry.regenerate { + vectors.insert( + name, + serde_json::json!({ + "regenerate": entry.regenerate, + // TODO: consider optimizing the shape of embedders here to store an array of f32 rather than a JSON object + "embeddings": entry.embeddings, + }), + ); + } + } + } else { + match embedder_actions.get(name) { + Some(action) if action.write_back().is_none() => { + continue; + } + _ => { + vectors.insert( + name, + if entry.implicit { + serde_json::json!(entry.embeddings) + } else { + serde_json::json!({ + "regenerate": entry.regenerate, + // TODO: consider optimizing the shape of embedders here to store an array of f32 rather than a JSON object + "embeddings": entry.embeddings, + }) + }, + ); + } + } } - vectors.insert( - name, - if entry.implicit { - serde_json::json!(entry.embeddings) - } else { - serde_json::json!({ - "regenerate": entry.regenerate, - // TODO: consider optimizing the shape of embedders here to store an array of f32 rather than a JSON object - "embeddings": entry.embeddings, - }) - }, - ); } if vectors.is_empty() { diff --git a/crates/milli/src/update/new/extract/documents.rs b/crates/milli/src/update/new/extract/documents.rs index d1c92919b..0ef782c5c 100644 --- a/crates/milli/src/update/new/extract/documents.rs +++ b/crates/milli/src/update/new/extract/documents.rs @@ -1,16 +1,25 @@ use std::cell::RefCell; +use std::collections::BTreeMap; use bumpalo::Bump; use hashbrown::HashMap; use super::DelAddRoaringBitmap; use crate::constants::RESERVED_GEO_FIELD_NAME; -use crate::update::new::channel::DocumentsSender; +use crate::update::new::channel::{DocumentsSender, ExtractorBbqueueSender}; use crate::update::new::document::{write_to_obkv, Document as _}; -use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor}; +use crate::update::new::document_change::DatabaseDocument; +use crate::update::new::indexer::document_changes::{ + DocumentChangeContext, Extractor, IndexingContext, +}; +use crate::update::new::indexer::settings_changes::{ + settings_change_extract, DatabaseDocuments, SettingsChangeExtractor, +}; use crate::update::new::ref_cell_ext::RefCellExt as _; -use crate::update::new::thread_local::FullySend; +use crate::update::new::thread_local::{FullySend, ThreadLocal}; use crate::update::new::DocumentChange; +use crate::update::settings::SettingsDelta; +use crate::vector::settings::EmbedderAction; use crate::vector::EmbeddingConfigs; use crate::Result; @@ -45,6 +54,7 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> { ) -> Result<()> { let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc); let mut document_extractor_data = context.data.0.borrow_mut_or_yield(); + let embedder_actions = &Default::default(); for change in changes { let change = change?; @@ -121,9 +131,11 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> { let content = write_to_obkv( &content, vector_content.as_ref(), + embedder_actions, &mut new_fields_ids_map, &mut document_buffer, )?; + self.document_sender.uncompressed(docid, external_docid, content).unwrap(); } DocumentChange::Insertion(insertion) => { @@ -146,6 +158,7 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> { let content = write_to_obkv( &content, inserted_vectors.as_ref(), + embedder_actions, &mut new_fields_ids_map, &mut document_buffer, )?; @@ -158,3 +171,101 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> { Ok(()) } } + +pub struct SettingsChangeDocumentExtractor<'a, 'b> { + document_sender: DocumentsSender<'a, 'b>, + embedder_actions: &'a BTreeMap, +} + +impl<'a, 'b> SettingsChangeDocumentExtractor<'a, 'b> { + pub fn new( + document_sender: DocumentsSender<'a, 'b>, + embedder_actions: &'a BTreeMap, + ) -> Self { + Self { document_sender, embedder_actions } + } +} + +impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentExtractor<'_, '_> { + type Data = FullySend>; + + fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result { + Ok(FullySend(Default::default())) + } + + fn process<'doc>( + &self, + documents: impl Iterator>>, + context: &DocumentChangeContext, + ) -> Result<()> { + let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc); + + for document in documents { + let document = document?; + // **WARNING**: the exclusive borrow on `new_fields_ids_map` needs to be taken **inside** of the `for change in changes` loop + // Otherwise, `BorrowMutError` will occur for document changes that also need the new_fields_ids_map (e.g.: UpdateByFunction) + let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield(); + + let external_docid = document.external_document_id().to_owned(); + let content = + document.current(&context.rtxn, context.index, &context.db_fields_ids_map)?; + let vector_content = document.current_vectors( + &context.rtxn, + context.index, + &context.db_fields_ids_map, + &context.doc_alloc, + )?; + + let content = write_to_obkv( + &content, + Some(&vector_content), + &self.embedder_actions, + &mut new_fields_ids_map, + &mut document_buffer, + )?; + + self.document_sender.uncompressed(document.docid(), external_docid, content).unwrap(); + } + + Ok(()) + } +} + +/// Modify the database documents based on the settings changes. +/// +/// This function extracts the documents from the database, +/// modifies them by adding or removing vector fields based on embedder actions, +/// and then updates the database. +#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents::extract")] +pub fn update_database_documents<'indexer, 'extractor, MSP, SD>( + documents: &'indexer DatabaseDocuments<'indexer>, + indexing_context: IndexingContext, + extractor_sender: &ExtractorBbqueueSender, + settings_delta: &SD, + extractor_allocs: &'extractor mut ThreadLocal>, +) -> Result<()> +where + MSP: Fn() -> bool + Sync, + SD: SettingsDelta, +{ + // skip if no embedder_actions + if settings_delta.embedder_actions().is_empty() { + return Ok(()); + } + + let document_sender = extractor_sender.documents(); + let document_extractor = + SettingsChangeDocumentExtractor::new(document_sender, settings_delta.embedder_actions()); + let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); + + settings_change_extract( + documents, + &document_extractor, + indexing_context, + extractor_allocs, + &datastore, + crate::update::new::steps::IndexingStep::ExtractingDocuments, + )?; + + Ok(()) +} diff --git a/crates/milli/src/update/new/indexer/extract.rs b/crates/milli/src/update/new/indexer/extract.rs index 328a1e25f..246416503 100644 --- a/crates/milli/src/update/new/indexer/extract.rs +++ b/crates/milli/src/update/new/indexer/extract.rs @@ -341,6 +341,18 @@ where primary_key_from_db(&indexing_context.index, &rtxn, &indexing_context.db_fields_ids_map)?; let documents = DatabaseDocuments::new(&all_document_ids, primary_key); + let span = + tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); + let _entered = span.enter(); + + update_database_documents( + &documents, + indexing_context, + &extractor_sender, + settings_delta, + extractor_allocs, + )?; + indexing_context.progress.update_progress(IndexingStep::WaitingForDatabaseWrites); finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed);