mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-06-26 16:38:30 +02:00
Write back user provided vectors from deleted embedders
This commit is contained in:
parent
8cacc021fb
commit
729e02493f
@ -9,6 +9,7 @@ use super::vector_document::VectorDocument;
|
|||||||
use super::{KvReaderFieldId, KvWriterFieldId};
|
use super::{KvReaderFieldId, KvWriterFieldId};
|
||||||
use crate::constants::{RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME};
|
use crate::constants::{RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME};
|
||||||
use crate::documents::FieldIdMapper;
|
use crate::documents::FieldIdMapper;
|
||||||
|
use crate::vector::settings::EmbedderAction;
|
||||||
use crate::{DocumentId, GlobalFieldsIdsMap, Index, InternalError, Result, UserError};
|
use crate::{DocumentId, GlobalFieldsIdsMap, Index, InternalError, Result, UserError};
|
||||||
|
|
||||||
/// A view into a document that can represent either the current version from the DB,
|
/// A view into a document that can represent either the current version from the DB,
|
||||||
@ -309,6 +310,7 @@ where
|
|||||||
pub fn write_to_obkv<'s, 'a, 'map, 'buffer>(
|
pub fn write_to_obkv<'s, 'a, 'map, 'buffer>(
|
||||||
document: &'s impl Document<'s>,
|
document: &'s impl Document<'s>,
|
||||||
vector_document: Option<&'s impl VectorDocument<'s>>,
|
vector_document: Option<&'s impl VectorDocument<'s>>,
|
||||||
|
embedder_actions: &'a BTreeMap<String, EmbedderAction>,
|
||||||
fields_ids_map: &'a mut GlobalFieldsIdsMap<'map>,
|
fields_ids_map: &'a mut GlobalFieldsIdsMap<'map>,
|
||||||
mut document_buffer: &'a mut bumpalo::collections::Vec<'buffer, u8>,
|
mut document_buffer: &'a mut bumpalo::collections::Vec<'buffer, u8>,
|
||||||
) -> Result<&'a KvReaderFieldId>
|
) -> Result<&'a KvReaderFieldId>
|
||||||
@ -338,20 +340,39 @@ where
|
|||||||
for res in vector_document.iter_vectors() {
|
for res in vector_document.iter_vectors() {
|
||||||
let (name, entry) = res?;
|
let (name, entry) = res?;
|
||||||
if entry.has_configured_embedder {
|
if entry.has_configured_embedder {
|
||||||
continue; // we don't write vectors with configured embedder in documents
|
if let Some(action) = embedder_actions.get(name) {
|
||||||
|
if action.write_back().is_some() && !entry.regenerate {
|
||||||
|
vectors.insert(
|
||||||
|
name,
|
||||||
|
serde_json::json!({
|
||||||
|
"regenerate": entry.regenerate,
|
||||||
|
// TODO: consider optimizing the shape of embedders here to store an array of f32 rather than a JSON object
|
||||||
|
"embeddings": entry.embeddings,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
match embedder_actions.get(name) {
|
||||||
|
Some(action) if action.write_back().is_none() => {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
vectors.insert(
|
||||||
|
name,
|
||||||
|
if entry.implicit {
|
||||||
|
serde_json::json!(entry.embeddings)
|
||||||
|
} else {
|
||||||
|
serde_json::json!({
|
||||||
|
"regenerate": entry.regenerate,
|
||||||
|
// TODO: consider optimizing the shape of embedders here to store an array of f32 rather than a JSON object
|
||||||
|
"embeddings": entry.embeddings,
|
||||||
|
})
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
vectors.insert(
|
|
||||||
name,
|
|
||||||
if entry.implicit {
|
|
||||||
serde_json::json!(entry.embeddings)
|
|
||||||
} else {
|
|
||||||
serde_json::json!({
|
|
||||||
"regenerate": entry.regenerate,
|
|
||||||
// TODO: consider optimizing the shape of embedders here to store an array of f32 rather than a JSON object
|
|
||||||
"embeddings": entry.embeddings,
|
|
||||||
})
|
|
||||||
},
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if vectors.is_empty() {
|
if vectors.is_empty() {
|
||||||
|
@ -1,16 +1,25 @@
|
|||||||
use std::cell::RefCell;
|
use std::cell::RefCell;
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
|
||||||
use bumpalo::Bump;
|
use bumpalo::Bump;
|
||||||
use hashbrown::HashMap;
|
use hashbrown::HashMap;
|
||||||
|
|
||||||
use super::DelAddRoaringBitmap;
|
use super::DelAddRoaringBitmap;
|
||||||
use crate::constants::RESERVED_GEO_FIELD_NAME;
|
use crate::constants::RESERVED_GEO_FIELD_NAME;
|
||||||
use crate::update::new::channel::DocumentsSender;
|
use crate::update::new::channel::{DocumentsSender, ExtractorBbqueueSender};
|
||||||
use crate::update::new::document::{write_to_obkv, Document as _};
|
use crate::update::new::document::{write_to_obkv, Document as _};
|
||||||
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor};
|
use crate::update::new::document_change::DatabaseDocument;
|
||||||
|
use crate::update::new::indexer::document_changes::{
|
||||||
|
DocumentChangeContext, Extractor, IndexingContext,
|
||||||
|
};
|
||||||
|
use crate::update::new::indexer::settings_changes::{
|
||||||
|
settings_change_extract, DatabaseDocuments, SettingsChangeExtractor,
|
||||||
|
};
|
||||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||||
use crate::update::new::thread_local::FullySend;
|
use crate::update::new::thread_local::{FullySend, ThreadLocal};
|
||||||
use crate::update::new::DocumentChange;
|
use crate::update::new::DocumentChange;
|
||||||
|
use crate::update::settings::SettingsDelta;
|
||||||
|
use crate::vector::settings::EmbedderAction;
|
||||||
use crate::vector::EmbeddingConfigs;
|
use crate::vector::EmbeddingConfigs;
|
||||||
use crate::Result;
|
use crate::Result;
|
||||||
|
|
||||||
@ -45,6 +54,7 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> {
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc);
|
let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc);
|
||||||
let mut document_extractor_data = context.data.0.borrow_mut_or_yield();
|
let mut document_extractor_data = context.data.0.borrow_mut_or_yield();
|
||||||
|
let embedder_actions = &Default::default();
|
||||||
|
|
||||||
for change in changes {
|
for change in changes {
|
||||||
let change = change?;
|
let change = change?;
|
||||||
@ -121,9 +131,11 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> {
|
|||||||
let content = write_to_obkv(
|
let content = write_to_obkv(
|
||||||
&content,
|
&content,
|
||||||
vector_content.as_ref(),
|
vector_content.as_ref(),
|
||||||
|
embedder_actions,
|
||||||
&mut new_fields_ids_map,
|
&mut new_fields_ids_map,
|
||||||
&mut document_buffer,
|
&mut document_buffer,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
self.document_sender.uncompressed(docid, external_docid, content).unwrap();
|
self.document_sender.uncompressed(docid, external_docid, content).unwrap();
|
||||||
}
|
}
|
||||||
DocumentChange::Insertion(insertion) => {
|
DocumentChange::Insertion(insertion) => {
|
||||||
@ -146,6 +158,7 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> {
|
|||||||
let content = write_to_obkv(
|
let content = write_to_obkv(
|
||||||
&content,
|
&content,
|
||||||
inserted_vectors.as_ref(),
|
inserted_vectors.as_ref(),
|
||||||
|
embedder_actions,
|
||||||
&mut new_fields_ids_map,
|
&mut new_fields_ids_map,
|
||||||
&mut document_buffer,
|
&mut document_buffer,
|
||||||
)?;
|
)?;
|
||||||
@ -158,3 +171,101 @@ impl<'extractor> Extractor<'extractor> for DocumentsExtractor<'_, '_> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub struct SettingsChangeDocumentExtractor<'a, 'b> {
|
||||||
|
document_sender: DocumentsSender<'a, 'b>,
|
||||||
|
embedder_actions: &'a BTreeMap<String, EmbedderAction>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'b> SettingsChangeDocumentExtractor<'a, 'b> {
|
||||||
|
pub fn new(
|
||||||
|
document_sender: DocumentsSender<'a, 'b>,
|
||||||
|
embedder_actions: &'a BTreeMap<String, EmbedderAction>,
|
||||||
|
) -> Self {
|
||||||
|
Self { document_sender, embedder_actions }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentExtractor<'_, '_> {
|
||||||
|
type Data = FullySend<RefCell<DocumentExtractorData>>;
|
||||||
|
|
||||||
|
fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
|
||||||
|
Ok(FullySend(Default::default()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn process<'doc>(
|
||||||
|
&self,
|
||||||
|
documents: impl Iterator<Item = Result<DatabaseDocument<'doc>>>,
|
||||||
|
context: &DocumentChangeContext<Self::Data>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc);
|
||||||
|
|
||||||
|
for document in documents {
|
||||||
|
let document = document?;
|
||||||
|
// **WARNING**: the exclusive borrow on `new_fields_ids_map` needs to be taken **inside** of the `for change in changes` loop
|
||||||
|
// Otherwise, `BorrowMutError` will occur for document changes that also need the new_fields_ids_map (e.g.: UpdateByFunction)
|
||||||
|
let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
|
||||||
|
|
||||||
|
let external_docid = document.external_document_id().to_owned();
|
||||||
|
let content =
|
||||||
|
document.current(&context.rtxn, context.index, &context.db_fields_ids_map)?;
|
||||||
|
let vector_content = document.current_vectors(
|
||||||
|
&context.rtxn,
|
||||||
|
context.index,
|
||||||
|
&context.db_fields_ids_map,
|
||||||
|
&context.doc_alloc,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let content = write_to_obkv(
|
||||||
|
&content,
|
||||||
|
Some(&vector_content),
|
||||||
|
&self.embedder_actions,
|
||||||
|
&mut new_fields_ids_map,
|
||||||
|
&mut document_buffer,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
self.document_sender.uncompressed(document.docid(), external_docid, content).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Modify the database documents based on the settings changes.
|
||||||
|
///
|
||||||
|
/// This function extracts the documents from the database,
|
||||||
|
/// modifies them by adding or removing vector fields based on embedder actions,
|
||||||
|
/// and then updates the database.
|
||||||
|
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents::extract")]
|
||||||
|
pub fn update_database_documents<'indexer, 'extractor, MSP, SD>(
|
||||||
|
documents: &'indexer DatabaseDocuments<'indexer>,
|
||||||
|
indexing_context: IndexingContext<MSP>,
|
||||||
|
extractor_sender: &ExtractorBbqueueSender,
|
||||||
|
settings_delta: &SD,
|
||||||
|
extractor_allocs: &'extractor mut ThreadLocal<FullySend<Bump>>,
|
||||||
|
) -> Result<()>
|
||||||
|
where
|
||||||
|
MSP: Fn() -> bool + Sync,
|
||||||
|
SD: SettingsDelta,
|
||||||
|
{
|
||||||
|
// skip if no embedder_actions
|
||||||
|
if settings_delta.embedder_actions().is_empty() {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let document_sender = extractor_sender.documents();
|
||||||
|
let document_extractor =
|
||||||
|
SettingsChangeDocumentExtractor::new(document_sender, settings_delta.embedder_actions());
|
||||||
|
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
|
||||||
|
|
||||||
|
settings_change_extract(
|
||||||
|
documents,
|
||||||
|
&document_extractor,
|
||||||
|
indexing_context,
|
||||||
|
extractor_allocs,
|
||||||
|
&datastore,
|
||||||
|
crate::update::new::steps::IndexingStep::ExtractingDocuments,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
@ -341,6 +341,18 @@ where
|
|||||||
primary_key_from_db(&indexing_context.index, &rtxn, &indexing_context.db_fields_ids_map)?;
|
primary_key_from_db(&indexing_context.index, &rtxn, &indexing_context.db_fields_ids_map)?;
|
||||||
let documents = DatabaseDocuments::new(&all_document_ids, primary_key);
|
let documents = DatabaseDocuments::new(&all_document_ids, primary_key);
|
||||||
|
|
||||||
|
let span =
|
||||||
|
tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract");
|
||||||
|
let _entered = span.enter();
|
||||||
|
|
||||||
|
update_database_documents(
|
||||||
|
&documents,
|
||||||
|
indexing_context,
|
||||||
|
&extractor_sender,
|
||||||
|
settings_delta,
|
||||||
|
extractor_allocs,
|
||||||
|
)?;
|
||||||
|
|
||||||
indexing_context.progress.update_progress(IndexingStep::WaitingForDatabaseWrites);
|
indexing_context.progress.update_progress(IndexingStep::WaitingForDatabaseWrites);
|
||||||
finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed);
|
finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user