mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-04 04:17:10 +02:00
always push the user defined vectors in arroy
This commit is contained in:
parent
a73ccc78a6
commit
5d50850e12
15 changed files with 189 additions and 450 deletions
|
@ -8,7 +8,6 @@ use std::sync::Arc;
|
|||
|
||||
use bytemuck::cast_slice;
|
||||
use grenad::Writer;
|
||||
use itertools::EitherOrBoth;
|
||||
use ordered_float::OrderedFloat;
|
||||
use roaring::RoaringBitmap;
|
||||
use serde_json::Value;
|
||||
|
@ -50,7 +49,7 @@ enum VectorStateDelta {
|
|||
// Note: changing the value of the manually specified vector **should not record** this delta
|
||||
WasGeneratedNowManual(Vec<Vec<f32>>),
|
||||
|
||||
ManualDelta(Vec<Vec<f32>>, Vec<Vec<f32>>),
|
||||
ManualDelta(Vec<Vec<f32>>),
|
||||
|
||||
// Add the vector computed from the specified prompt
|
||||
// Remove any previous vector
|
||||
|
@ -59,14 +58,12 @@ enum VectorStateDelta {
|
|||
}
|
||||
|
||||
impl VectorStateDelta {
|
||||
fn into_values(self) -> (bool, String, (Vec<Vec<f32>>, Vec<Vec<f32>>)) {
|
||||
fn into_values(self) -> (bool, String, Vec<Vec<f32>>) {
|
||||
match self {
|
||||
VectorStateDelta::NoChange => Default::default(),
|
||||
VectorStateDelta::NowRemoved => (true, Default::default(), Default::default()),
|
||||
VectorStateDelta::WasGeneratedNowManual(add) => {
|
||||
(true, Default::default(), (Default::default(), add))
|
||||
}
|
||||
VectorStateDelta::ManualDelta(del, add) => (false, Default::default(), (del, add)),
|
||||
VectorStateDelta::WasGeneratedNowManual(add) => (true, Default::default(), add),
|
||||
VectorStateDelta::ManualDelta(add) => (false, Default::default(), add),
|
||||
VectorStateDelta::NowGenerated(prompt) => (true, prompt, Default::default()),
|
||||
}
|
||||
}
|
||||
|
@ -166,8 +163,14 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
|
|||
// lazily get it when needed
|
||||
let document_id = || -> Value { from_utf8(external_id_bytes).unwrap().into() };
|
||||
|
||||
let mut parsed_vectors = ParsedVectorsDiff::new(obkv, old_vectors_fid, new_vectors_fid)
|
||||
.map_err(|error| error.to_crate_error(document_id().to_string()))?;
|
||||
let mut parsed_vectors = ParsedVectorsDiff::new(
|
||||
docid,
|
||||
embedders_configs,
|
||||
obkv,
|
||||
old_vectors_fid,
|
||||
new_vectors_fid,
|
||||
)
|
||||
.map_err(|error| error.to_crate_error(document_id().to_string()))?;
|
||||
|
||||
for EmbedderVectorExtractor {
|
||||
embedder_name,
|
||||
|
@ -182,7 +185,7 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
|
|||
{
|
||||
let delta = match parsed_vectors.remove(embedder_name) {
|
||||
(Some(old), Some(new)) => {
|
||||
match (old.is_user_provided(), new.is_user_provided()) {
|
||||
match (old.map_or(true, |old| old.is_user_provided()), new.is_user_provided()) {
|
||||
(true, true) | (false, false) => (),
|
||||
(true, false) => {
|
||||
remove_from_user_defined.insert(docid);
|
||||
|
@ -193,7 +196,6 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
|
|||
}
|
||||
|
||||
// no autogeneration
|
||||
let del_vectors = old.into_array_of_vectors();
|
||||
let add_vectors = new.into_array_of_vectors();
|
||||
|
||||
if add_vectors.len() > usize::from(u8::MAX) {
|
||||
|
@ -203,15 +205,15 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
|
|||
)));
|
||||
}
|
||||
|
||||
VectorStateDelta::ManualDelta(del_vectors, add_vectors)
|
||||
VectorStateDelta::ManualDelta(add_vectors)
|
||||
}
|
||||
(Some(_old), None) => {
|
||||
(Some(old), None) => {
|
||||
// Do we keep this document?
|
||||
let document_is_kept = obkv
|
||||
.iter()
|
||||
.map(|(_, deladd)| KvReaderDelAdd::new(deladd))
|
||||
.any(|deladd| deladd.get(DelAdd::Addition).is_some());
|
||||
if document_is_kept {
|
||||
if document_is_kept && old.is_some() {
|
||||
remove_from_user_defined.insert(docid);
|
||||
// becomes autogenerated
|
||||
VectorStateDelta::NowGenerated(prompt.render(
|
||||
|
@ -219,6 +221,8 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
|
|||
DelAdd::Addition,
|
||||
new_fields_ids_map,
|
||||
)?)
|
||||
} else if document_is_kept && old.is_none() {
|
||||
VectorStateDelta::NoChange
|
||||
} else {
|
||||
VectorStateDelta::NowRemoved
|
||||
}
|
||||
|
@ -315,8 +319,8 @@ pub fn extract_vector_points<R: io::Read + io::Seek>(
|
|||
Ok(results)
|
||||
}
|
||||
|
||||
/// Computes the diff between both Del and Add numbers and
|
||||
/// only inserts the parts that differ in the sorter.
|
||||
/// We cannot compute the diff between both Del and Add vectors.
|
||||
/// We'll push every vector and compute the difference later in TypedChunk.
|
||||
fn push_vectors_diff(
|
||||
remove_vectors_writer: &mut Writer<BufWriter<File>>,
|
||||
prompts_writer: &mut Writer<BufWriter<File>>,
|
||||
|
@ -325,7 +329,7 @@ fn push_vectors_diff(
|
|||
delta: VectorStateDelta,
|
||||
reindex_vectors: bool,
|
||||
) -> Result<()> {
|
||||
let (must_remove, prompt, (mut del_vectors, mut add_vectors)) = delta.into_values();
|
||||
let (must_remove, prompt, mut add_vectors) = delta.into_values();
|
||||
if must_remove
|
||||
// TODO: the below condition works because we erase the vec database when a embedding setting changes.
|
||||
// When vector pipeline will be optimized, this should be removed.
|
||||
|
@ -340,44 +344,25 @@ fn push_vectors_diff(
|
|||
}
|
||||
|
||||
// We sort and dedup the vectors
|
||||
del_vectors.sort_unstable_by(|a, b| compare_vectors(a, b));
|
||||
add_vectors.sort_unstable_by(|a, b| compare_vectors(a, b));
|
||||
del_vectors.dedup_by(|a, b| compare_vectors(a, b).is_eq());
|
||||
add_vectors.dedup_by(|a, b| compare_vectors(a, b).is_eq());
|
||||
|
||||
let merged_vectors_iter =
|
||||
itertools::merge_join_by(del_vectors, add_vectors, |del, add| compare_vectors(del, add));
|
||||
// let merged_vectors_iter =
|
||||
// itertools::merge_join_by(del_vectors, add_vectors, |del, add| compare_vectors(del, add));
|
||||
|
||||
// insert vectors into the writer
|
||||
for (i, eob) in merged_vectors_iter.into_iter().enumerate().take(u16::MAX as usize) {
|
||||
for (i, vector) in add_vectors.into_iter().enumerate().take(u16::MAX as usize) {
|
||||
// Generate the key by extending the unique index to it.
|
||||
key_buffer.truncate(TRUNCATE_SIZE);
|
||||
let index = u16::try_from(i).unwrap();
|
||||
key_buffer.extend_from_slice(&index.to_be_bytes());
|
||||
|
||||
match eob {
|
||||
EitherOrBoth::Both(_, _) => (), // no need to touch anything
|
||||
EitherOrBoth::Left(vector) => {
|
||||
// TODO: the below condition works because we erase the vec database when a embedding setting changes.
|
||||
// When vector pipeline will be optimized, this should be removed.
|
||||
if !reindex_vectors {
|
||||
// We insert only the Del part of the Obkv to inform
|
||||
// that we only want to remove all those vectors.
|
||||
let mut obkv = KvWriterDelAdd::memory();
|
||||
obkv.insert(DelAdd::Deletion, cast_slice(&vector))?;
|
||||
let bytes = obkv.into_inner()?;
|
||||
manual_vectors_writer.insert(&key_buffer, bytes)?;
|
||||
}
|
||||
}
|
||||
EitherOrBoth::Right(vector) => {
|
||||
// We insert only the Add part of the Obkv to inform
|
||||
// that we only want to remove all those vectors.
|
||||
let mut obkv = KvWriterDelAdd::memory();
|
||||
obkv.insert(DelAdd::Addition, cast_slice(&vector))?;
|
||||
let bytes = obkv.into_inner()?;
|
||||
manual_vectors_writer.insert(&key_buffer, bytes)?;
|
||||
}
|
||||
}
|
||||
// We insert only the Add part of the Obkv to inform
|
||||
// that we only want to remove all those vectors.
|
||||
let mut obkv = KvWriterDelAdd::memory();
|
||||
obkv.insert(DelAdd::Addition, cast_slice(&vector))?;
|
||||
let bytes = obkv.into_inner()?;
|
||||
manual_vectors_writer.insert(&key_buffer, bytes)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue