mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-22 21:04:27 +01:00
In transform for removed embedders, write back their user provided vectors in documents, and clear the writers
This commit is contained in:
parent
d18c1f77d7
commit
d1dd7e5d09
@ -1,7 +1,7 @@
|
||||
use std::borrow::Cow;
|
||||
use std::collections::btree_map::Entry as BEntry;
|
||||
use std::collections::hash_map::Entry as HEntry;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::fs::File;
|
||||
use std::io::{Read, Seek};
|
||||
|
||||
@ -27,6 +27,7 @@ use crate::update::del_add::{
|
||||
use crate::update::index_documents::GrenadParameters;
|
||||
use crate::update::settings::{InnerIndexSettings, InnerIndexSettingsDiff};
|
||||
use crate::update::{AvailableDocumentsIds, UpdateIndexingStep};
|
||||
use crate::vector::settings::{EmbedderAction, WriteBackToDocuments};
|
||||
use crate::{
|
||||
is_faceted_by, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldsIdsMap, Index, Result,
|
||||
};
|
||||
@ -808,13 +809,13 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
let mut new_inner_settings = old_inner_settings.clone();
|
||||
new_inner_settings.fields_ids_map = fields_ids_map;
|
||||
|
||||
let embedding_configs_updated = false;
|
||||
let embedding_config_updates = Default::default();
|
||||
let settings_update_only = false;
|
||||
let settings_diff = InnerIndexSettingsDiff::new(
|
||||
old_inner_settings,
|
||||
new_inner_settings,
|
||||
primary_key_id,
|
||||
embedding_configs_updated,
|
||||
embedding_config_updates,
|
||||
settings_update_only,
|
||||
);
|
||||
|
||||
@ -835,10 +836,13 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
/// Rebind the field_ids of the provided document to their values
|
||||
/// based on the field_ids_maps difference between the old and the new settings,
|
||||
/// then fill the provided buffers with delta documents using KvWritterDelAdd.
|
||||
#[allow(clippy::too_many_arguments)] // need the vectors + fid, feel free to create a struct xo xo
|
||||
fn rebind_existing_document(
|
||||
old_obkv: KvReader<FieldId>,
|
||||
settings_diff: &InnerIndexSettingsDiff,
|
||||
modified_faceted_fields: &HashSet<String>,
|
||||
mut injected_vectors: serde_json::Map<String, serde_json::Value>,
|
||||
old_vectors_fid: Option<FieldId>,
|
||||
original_obkv_buffer: Option<&mut Vec<u8>>,
|
||||
flattened_obkv_buffer: Option<&mut Vec<u8>>,
|
||||
) -> Result<()> {
|
||||
@ -863,7 +867,36 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
let mut operations = HashMap::new();
|
||||
|
||||
let mut obkv_writer = KvWriter::<_, FieldId>::memory();
|
||||
for (id, val) in old_obkv.iter() {
|
||||
'write_fid: for (id, val) in old_obkv.iter() {
|
||||
if !injected_vectors.is_empty() {
|
||||
'inject_vectors: {
|
||||
let Some(vectors_fid) = old_vectors_fid else { break 'inject_vectors };
|
||||
|
||||
if id != vectors_fid {
|
||||
break 'inject_vectors;
|
||||
}
|
||||
|
||||
let existing_vectors: std::result::Result<
|
||||
serde_json::Map<String, serde_json::Value>,
|
||||
serde_json::Error,
|
||||
> = serde_json::from_slice(val);
|
||||
|
||||
let mut existing_vectors = match existing_vectors {
|
||||
Ok(existing_vectors) => existing_vectors,
|
||||
Err(error) => {
|
||||
tracing::error!(%error, "Unexpected `_vectors` field that is not a map. Treating as an empty map");
|
||||
Default::default()
|
||||
}
|
||||
};
|
||||
|
||||
existing_vectors.append(&mut injected_vectors);
|
||||
|
||||
operations.insert(id, DelAddOperation::DeletionAndAddition);
|
||||
obkv_writer.insert(id, serde_json::to_vec(&existing_vectors).unwrap())?;
|
||||
continue 'write_fid;
|
||||
}
|
||||
}
|
||||
|
||||
if is_primary_key(id) || necessary_faceted_field(id) || reindex_vectors {
|
||||
operations.insert(id, DelAddOperation::DeletionAndAddition);
|
||||
obkv_writer.insert(id, val)?;
|
||||
@ -937,6 +970,35 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
None
|
||||
};
|
||||
|
||||
let readers: Result<
|
||||
BTreeMap<&str, (Vec<arroy::Reader<arroy::distances::Angular>>, &RoaringBitmap)>,
|
||||
> = settings_diff
|
||||
.embedding_config_updates
|
||||
.iter()
|
||||
.filter_map(|(name, action)| {
|
||||
if let EmbedderAction::WriteBackToDocuments(WriteBackToDocuments {
|
||||
embedder_id,
|
||||
user_provided,
|
||||
}) = action
|
||||
{
|
||||
let readers: Result<Vec<_>> =
|
||||
self.index.arroy_readers(wtxn, *embedder_id).collect();
|
||||
match readers {
|
||||
Ok(readers) => Some(Ok((name.as_str(), (readers, user_provided)))),
|
||||
Err(error) => Some(Err(error)),
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
let readers = readers?;
|
||||
|
||||
let old_vectors_fid = settings_diff
|
||||
.old
|
||||
.fields_ids_map
|
||||
.id(crate::vector::parsed_vectors::RESERVED_VECTORS_FIELD_NAME);
|
||||
|
||||
// We initialize the sorter with the user indexing settings.
|
||||
let mut flattened_sorter =
|
||||
if settings_diff.reindex_searchable() || settings_diff.reindex_facets() {
|
||||
@ -963,10 +1025,41 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None },
|
||||
)?;
|
||||
|
||||
let injected_vectors: std::result::Result<
|
||||
serde_json::Map<String, serde_json::Value>,
|
||||
arroy::Error,
|
||||
> = readers
|
||||
.iter()
|
||||
.filter_map(|(name, (readers, user_provided))| {
|
||||
if !user_provided.contains(docid) {
|
||||
return None;
|
||||
}
|
||||
let mut vectors = Vec::new();
|
||||
for reader in readers {
|
||||
let Some(vector) = reader.item_vector(wtxn, docid).transpose() else {
|
||||
break;
|
||||
};
|
||||
|
||||
match vector {
|
||||
Ok(vector) => vectors.push(vector),
|
||||
Err(error) => return Some(Err(error)),
|
||||
}
|
||||
}
|
||||
if vectors.is_empty() {
|
||||
return None;
|
||||
}
|
||||
Some(Ok((name.to_string(), serde_json::to_value(vectors).unwrap())))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let injected_vectors = injected_vectors?;
|
||||
|
||||
Self::rebind_existing_document(
|
||||
old_obkv,
|
||||
&settings_diff,
|
||||
&modified_faceted_fields,
|
||||
injected_vectors,
|
||||
old_vectors_fid,
|
||||
Some(&mut original_obkv_buffer).filter(|_| original_sorter.is_some()),
|
||||
Some(&mut flattened_obkv_buffer).filter(|_| flattened_sorter.is_some()),
|
||||
)?;
|
||||
@ -983,6 +1076,23 @@ impl<'a, 'i> Transform<'a, 'i> {
|
||||
}
|
||||
}
|
||||
|
||||
let mut writers = Vec::new();
|
||||
|
||||
// delete all vectors from the embedders that need removal
|
||||
for (_, (readers, _)) in readers {
|
||||
for reader in readers {
|
||||
let dimensions = reader.dimensions();
|
||||
let arroy_index = reader.index();
|
||||
drop(reader);
|
||||
let writer = arroy::Writer::new(self.index.vector_arroy, arroy_index, dimensions);
|
||||
writers.push(writer);
|
||||
}
|
||||
}
|
||||
|
||||
for writer in writers {
|
||||
writer.clear(wtxn)?;
|
||||
}
|
||||
|
||||
let grenad_params = GrenadParameters {
|
||||
chunk_compression_type: self.indexer_settings.chunk_compression_type,
|
||||
chunk_compression_level: self.indexer_settings.chunk_compression_level,
|
||||
|
Loading…
Reference in New Issue
Block a user