mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-25 22:34:28 +01:00
Merge #4953
4953: Move the multi arroy index logic to the arroy wrapper r=irevoire a=irevoire # Pull Request ## Related issue Fixes https://github.com/meilisearch/meilisearch/issues/4948 ## What does this PR do? - Make the `ArroyWrapper` we introduced in the last PR handle all the embedded for a specific docid itself. Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
commit
efdc5739d7
@ -1610,24 +1610,6 @@ impl Index {
|
|||||||
.unwrap_or_default())
|
.unwrap_or_default())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn arroy_readers<'a>(
|
|
||||||
&'a self,
|
|
||||||
rtxn: &'a RoTxn<'a>,
|
|
||||||
embedder_id: u8,
|
|
||||||
quantized: bool,
|
|
||||||
) -> impl Iterator<Item = Result<ArroyWrapper>> + 'a {
|
|
||||||
crate::vector::arroy_db_range_for_embedder(embedder_id).map_while(move |k| {
|
|
||||||
let reader = ArroyWrapper::new(self.vector_arroy, k, quantized);
|
|
||||||
// Here we don't care about the dimensions, but we want to know if we can read
|
|
||||||
// in the database or if its metadata are missing because there is no document with that many vectors.
|
|
||||||
match reader.dimensions(rtxn) {
|
|
||||||
Ok(_) => Some(Ok(reader)),
|
|
||||||
Err(arroy::Error::MissingMetadata(_)) => None,
|
|
||||||
Err(e) => Some(Err(e.into())),
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn put_search_cutoff(&self, wtxn: &mut RwTxn<'_>, cutoff: u64) -> heed::Result<()> {
|
pub(crate) fn put_search_cutoff(&self, wtxn: &mut RwTxn<'_>, cutoff: u64) -> heed::Result<()> {
|
||||||
self.main.remap_types::<Str, BEU64>().put(wtxn, main_key::SEARCH_CUTOFF, &cutoff)
|
self.main.remap_types::<Str, BEU64>().put(wtxn, main_key::SEARCH_CUTOFF, &cutoff)
|
||||||
}
|
}
|
||||||
@ -1649,14 +1631,9 @@ impl Index {
|
|||||||
let embedding_configs = self.embedding_configs(rtxn)?;
|
let embedding_configs = self.embedding_configs(rtxn)?;
|
||||||
for config in embedding_configs {
|
for config in embedding_configs {
|
||||||
let embedder_id = self.embedder_category_id.get(rtxn, &config.name)?.unwrap();
|
let embedder_id = self.embedder_category_id.get(rtxn, &config.name)?.unwrap();
|
||||||
let embeddings = self
|
let reader =
|
||||||
.arroy_readers(rtxn, embedder_id, config.config.quantized())
|
ArroyWrapper::new(self.vector_arroy, embedder_id, config.config.quantized());
|
||||||
.map_while(|reader| {
|
let embeddings = reader.item_vectors(rtxn, docid)?;
|
||||||
reader
|
|
||||||
.and_then(|r| r.item_vector(rtxn, docid).map_err(|e| e.into()))
|
|
||||||
.transpose()
|
|
||||||
})
|
|
||||||
.collect::<Result<Vec<_>>>()?;
|
|
||||||
res.insert(config.name.to_owned(), embeddings);
|
res.insert(config.name.to_owned(), embeddings);
|
||||||
}
|
}
|
||||||
Ok(res)
|
Ok(res)
|
||||||
|
@ -1,11 +1,10 @@
|
|||||||
use std::iter::FromIterator;
|
use std::iter::FromIterator;
|
||||||
|
|
||||||
use ordered_float::OrderedFloat;
|
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
|
|
||||||
use super::ranking_rules::{RankingRule, RankingRuleOutput, RankingRuleQueryTrait};
|
use super::ranking_rules::{RankingRule, RankingRuleOutput, RankingRuleQueryTrait};
|
||||||
use crate::score_details::{self, ScoreDetails};
|
use crate::score_details::{self, ScoreDetails};
|
||||||
use crate::vector::{DistributionShift, Embedder};
|
use crate::vector::{ArroyWrapper, DistributionShift, Embedder};
|
||||||
use crate::{DocumentId, Result, SearchContext, SearchLogger};
|
use crate::{DocumentId, Result, SearchContext, SearchLogger};
|
||||||
|
|
||||||
pub struct VectorSort<Q: RankingRuleQueryTrait> {
|
pub struct VectorSort<Q: RankingRuleQueryTrait> {
|
||||||
@ -53,14 +52,9 @@ impl<Q: RankingRuleQueryTrait> VectorSort<Q> {
|
|||||||
vector_candidates: &RoaringBitmap,
|
vector_candidates: &RoaringBitmap,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let target = &self.target;
|
let target = &self.target;
|
||||||
let mut results = Vec::new();
|
|
||||||
|
|
||||||
for reader in ctx.index.arroy_readers(ctx.txn, self.embedder_index, self.quantized) {
|
let reader = ArroyWrapper::new(ctx.index.vector_arroy, self.embedder_index, self.quantized);
|
||||||
let nns_by_vector =
|
let results = reader.nns_by_vector(ctx.txn, target, self.limit, Some(vector_candidates))?;
|
||||||
reader?.nns_by_vector(ctx.txn, target, self.limit, Some(vector_candidates))?;
|
|
||||||
results.extend(nns_by_vector.into_iter());
|
|
||||||
}
|
|
||||||
results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance));
|
|
||||||
self.cached_sorted_docids = results.into_iter();
|
self.cached_sorted_docids = results.into_iter();
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -1,10 +1,9 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use ordered_float::OrderedFloat;
|
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
|
|
||||||
use crate::score_details::{self, ScoreDetails};
|
use crate::score_details::{self, ScoreDetails};
|
||||||
use crate::vector::Embedder;
|
use crate::vector::{ArroyWrapper, Embedder};
|
||||||
use crate::{filtered_universe, DocumentId, Filter, Index, Result, SearchResult};
|
use crate::{filtered_universe, DocumentId, Filter, Index, Result, SearchResult};
|
||||||
|
|
||||||
pub struct Similar<'a> {
|
pub struct Similar<'a> {
|
||||||
@ -71,23 +70,13 @@ impl<'a> Similar<'a> {
|
|||||||
.get(self.rtxn, &self.embedder_name)?
|
.get(self.rtxn, &self.embedder_name)?
|
||||||
.ok_or_else(|| crate::UserError::InvalidEmbedder(self.embedder_name.to_owned()))?;
|
.ok_or_else(|| crate::UserError::InvalidEmbedder(self.embedder_name.to_owned()))?;
|
||||||
|
|
||||||
let mut results = Vec::new();
|
let reader = ArroyWrapper::new(self.index.vector_arroy, embedder_index, self.quantized);
|
||||||
|
let results = reader.nns_by_item(
|
||||||
for reader in self.index.arroy_readers(self.rtxn, embedder_index, self.quantized) {
|
|
||||||
let nns_by_item = reader?.nns_by_item(
|
|
||||||
self.rtxn,
|
self.rtxn,
|
||||||
self.id,
|
self.id,
|
||||||
self.limit + self.offset + 1,
|
self.limit + self.offset + 1,
|
||||||
Some(&universe),
|
Some(&universe),
|
||||||
)?;
|
)?;
|
||||||
if let Some(mut nns_by_item) = nns_by_item {
|
|
||||||
results.append(&mut nns_by_item);
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance));
|
|
||||||
|
|
||||||
let mut documents_ids = Vec::with_capacity(self.limit);
|
let mut documents_ids = Vec::with_capacity(self.limit);
|
||||||
let mut document_scores = Vec::with_capacity(self.limit);
|
let mut document_scores = Vec::with_capacity(self.limit);
|
||||||
|
@ -689,9 +689,8 @@ where
|
|||||||
key: None,
|
key: None,
|
||||||
},
|
},
|
||||||
)?;
|
)?;
|
||||||
let first_id = crate::vector::arroy_db_range_for_embedder(index).next().unwrap();
|
|
||||||
let reader =
|
let reader =
|
||||||
ArroyWrapper::new(self.index.vector_arroy, first_id, action.was_quantized);
|
ArroyWrapper::new(self.index.vector_arroy, index, action.was_quantized);
|
||||||
let dim = reader.dimensions(self.wtxn)?;
|
let dim = reader.dimensions(self.wtxn)?;
|
||||||
dimension.insert(name.to_string(), dim);
|
dimension.insert(name.to_string(), dim);
|
||||||
}
|
}
|
||||||
@ -713,17 +712,8 @@ where
|
|||||||
let is_quantizing = embedder_config.map_or(false, |action| action.is_being_quantized);
|
let is_quantizing = embedder_config.map_or(false, |action| action.is_being_quantized);
|
||||||
|
|
||||||
pool.install(|| {
|
pool.install(|| {
|
||||||
for k in crate::vector::arroy_db_range_for_embedder(embedder_index) {
|
let mut writer = ArroyWrapper::new(vector_arroy, embedder_index, was_quantized);
|
||||||
let mut writer = ArroyWrapper::new(vector_arroy, k, was_quantized);
|
writer.build_and_quantize(wtxn, &mut rng, dimension, is_quantizing)?;
|
||||||
if is_quantizing {
|
|
||||||
writer.quantize(wtxn, k, dimension)?;
|
|
||||||
}
|
|
||||||
if writer.need_build(wtxn, dimension)? {
|
|
||||||
writer.build(wtxn, &mut rng, dimension)?;
|
|
||||||
} else if writer.is_empty(wtxn, dimension)? {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Result::Ok(())
|
Result::Ok(())
|
||||||
})
|
})
|
||||||
.map_err(InternalError::from)??;
|
.map_err(InternalError::from)??;
|
||||||
|
@ -990,27 +990,24 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
let readers: Result<BTreeMap<&str, (Vec<ArroyWrapper>, &RoaringBitmap)>> = settings_diff
|
let readers: BTreeMap<&str, (ArroyWrapper, &RoaringBitmap)> = settings_diff
|
||||||
.embedding_config_updates
|
.embedding_config_updates
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|(name, action)| {
|
.filter_map(|(name, action)| {
|
||||||
if let Some(WriteBackToDocuments { embedder_id, user_provided }) =
|
if let Some(WriteBackToDocuments { embedder_id, user_provided }) =
|
||||||
action.write_back()
|
action.write_back()
|
||||||
{
|
{
|
||||||
let readers: Result<Vec<_>> = self
|
let reader = ArroyWrapper::new(
|
||||||
.index
|
self.index.vector_arroy,
|
||||||
.arroy_readers(wtxn, *embedder_id, action.was_quantized)
|
*embedder_id,
|
||||||
.collect();
|
action.was_quantized,
|
||||||
match readers {
|
);
|
||||||
Ok(readers) => Some(Ok((name.as_str(), (readers, user_provided)))),
|
Some((name.as_str(), (reader, user_provided)))
|
||||||
Err(error) => Some(Err(error)),
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
let readers = readers?;
|
|
||||||
|
|
||||||
let old_vectors_fid = settings_diff
|
let old_vectors_fid = settings_diff
|
||||||
.old
|
.old
|
||||||
@ -1048,34 +1045,24 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
arroy::Error,
|
arroy::Error,
|
||||||
> = readers
|
> = readers
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|(name, (readers, user_provided))| {
|
.filter_map(|(name, (reader, user_provided))| {
|
||||||
if !user_provided.contains(docid) {
|
if !user_provided.contains(docid) {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
let mut vectors = Vec::new();
|
match reader.item_vectors(wtxn, docid) {
|
||||||
for reader in readers {
|
Ok(vectors) if vectors.is_empty() => None,
|
||||||
let Some(vector) = reader.item_vector(wtxn, docid).transpose() else {
|
Ok(vectors) => Some(Ok((
|
||||||
break;
|
|
||||||
};
|
|
||||||
|
|
||||||
match vector {
|
|
||||||
Ok(vector) => vectors.push(vector),
|
|
||||||
Err(error) => return Some(Err(error)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if vectors.is_empty() {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
Some(Ok((
|
|
||||||
name.to_string(),
|
name.to_string(),
|
||||||
serde_json::to_value(ExplicitVectors {
|
serde_json::to_value(ExplicitVectors {
|
||||||
embeddings: Some(VectorOrArrayOfVectors::from_array_of_vectors(
|
embeddings: Some(
|
||||||
vectors,
|
VectorOrArrayOfVectors::from_array_of_vectors(vectors),
|
||||||
)),
|
),
|
||||||
regenerate: false,
|
regenerate: false,
|
||||||
})
|
})
|
||||||
.unwrap(),
|
.unwrap(),
|
||||||
)))
|
))),
|
||||||
|
Err(e) => Some(Err(e)),
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
@ -1104,12 +1091,10 @@ impl<'a, 'i> Transform<'a, 'i> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// delete all vectors from the embedders that need removal
|
// delete all vectors from the embedders that need removal
|
||||||
for (_, (readers, _)) in readers {
|
for (_, (reader, _)) in readers {
|
||||||
for reader in readers {
|
|
||||||
let dimensions = reader.dimensions(wtxn)?;
|
let dimensions = reader.dimensions(wtxn)?;
|
||||||
reader.clear(wtxn, dimensions)?;
|
reader.clear(wtxn, dimensions)?;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
let grenad_params = GrenadParameters {
|
let grenad_params = GrenadParameters {
|
||||||
chunk_compression_type: self.indexer_settings.chunk_compression_type,
|
chunk_compression_type: self.indexer_settings.chunk_compression_type,
|
||||||
|
@ -673,22 +673,14 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||||||
.get(&embedder_name)
|
.get(&embedder_name)
|
||||||
.map_or(false, |conf| conf.2);
|
.map_or(false, |conf| conf.2);
|
||||||
// FIXME: allow customizing distance
|
// FIXME: allow customizing distance
|
||||||
let writers: Vec<_> = crate::vector::arroy_db_range_for_embedder(embedder_index)
|
let writer = ArroyWrapper::new(index.vector_arroy, embedder_index, binary_quantized);
|
||||||
.map(|k| ArroyWrapper::new(index.vector_arroy, k, binary_quantized))
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
// remove vectors for docids we want them removed
|
// remove vectors for docids we want them removed
|
||||||
let merger = remove_vectors_builder.build();
|
let merger = remove_vectors_builder.build();
|
||||||
let mut iter = merger.into_stream_merger_iter()?;
|
let mut iter = merger.into_stream_merger_iter()?;
|
||||||
while let Some((key, _)) = iter.next()? {
|
while let Some((key, _)) = iter.next()? {
|
||||||
let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap();
|
let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap();
|
||||||
|
writer.del_items(wtxn, expected_dimension, docid)?;
|
||||||
for writer in &writers {
|
|
||||||
// Uses invariant: vectors are packed in the first writers.
|
|
||||||
if !writer.del_item(wtxn, expected_dimension, docid)? {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// add generated embeddings
|
// add generated embeddings
|
||||||
@ -716,9 +708,7 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||||||
embeddings.embedding_count(),
|
embeddings.embedding_count(),
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
for (embedding, writer) in embeddings.iter().zip(&writers) {
|
writer.add_items(wtxn, docid, &embeddings)?;
|
||||||
writer.add_item(wtxn, expected_dimension, docid, embedding)?;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// perform the manual diff
|
// perform the manual diff
|
||||||
@ -733,51 +723,14 @@ pub(crate) fn write_typed_chunk_into_index(
|
|||||||
if let Some(value) = vector_deladd_obkv.get(DelAdd::Deletion) {
|
if let Some(value) = vector_deladd_obkv.get(DelAdd::Deletion) {
|
||||||
let vector: Vec<f32> = pod_collect_to_vec(value);
|
let vector: Vec<f32> = pod_collect_to_vec(value);
|
||||||
|
|
||||||
let mut deleted_index = None;
|
writer.del_item(wtxn, docid, &vector)?;
|
||||||
for (index, writer) in writers.iter().enumerate() {
|
|
||||||
let Some(candidate) = writer.item_vector(wtxn, docid)? else {
|
|
||||||
// uses invariant: vectors are packed in the first writers.
|
|
||||||
break;
|
|
||||||
};
|
|
||||||
if candidate == vector {
|
|
||||||
writer.del_item(wtxn, expected_dimension, docid)?;
|
|
||||||
deleted_index = Some(index);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// 🥲 enforce invariant: vectors are packed in the first writers.
|
|
||||||
if let Some(deleted_index) = deleted_index {
|
|
||||||
let mut last_index_with_a_vector = None;
|
|
||||||
for (index, writer) in writers.iter().enumerate().skip(deleted_index) {
|
|
||||||
let Some(candidate) = writer.item_vector(wtxn, docid)? else {
|
|
||||||
break;
|
|
||||||
};
|
|
||||||
last_index_with_a_vector = Some((index, candidate));
|
|
||||||
}
|
|
||||||
if let Some((last_index, vector)) = last_index_with_a_vector {
|
|
||||||
// unwrap: computed the index from the list of writers
|
|
||||||
let writer = writers.get(last_index).unwrap();
|
|
||||||
writer.del_item(wtxn, expected_dimension, docid)?;
|
|
||||||
writers.get(deleted_index).unwrap().add_item(
|
|
||||||
wtxn,
|
|
||||||
expected_dimension,
|
|
||||||
docid,
|
|
||||||
&vector,
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(value) = vector_deladd_obkv.get(DelAdd::Addition) {
|
if let Some(value) = vector_deladd_obkv.get(DelAdd::Addition) {
|
||||||
let vector = pod_collect_to_vec(value);
|
let vector = pod_collect_to_vec(value);
|
||||||
|
|
||||||
// overflow was detected during vector extraction.
|
// overflow was detected during vector extraction.
|
||||||
for writer in &writers {
|
writer.add_item(wtxn, docid, &vector)?;
|
||||||
if !writer.contains_item(wtxn, expected_dimension, docid)? {
|
|
||||||
writer.add_item(wtxn, expected_dimension, docid, &vector)?;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -32,105 +32,243 @@ pub const REQUEST_PARALLELISM: usize = 40;
|
|||||||
|
|
||||||
pub struct ArroyWrapper {
|
pub struct ArroyWrapper {
|
||||||
quantized: bool,
|
quantized: bool,
|
||||||
index: u16,
|
embedder_index: u8,
|
||||||
database: arroy::Database<Unspecified>,
|
database: arroy::Database<Unspecified>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ArroyWrapper {
|
impl ArroyWrapper {
|
||||||
pub fn new(database: arroy::Database<Unspecified>, index: u16, quantized: bool) -> Self {
|
pub fn new(
|
||||||
Self { database, index, quantized }
|
database: arroy::Database<Unspecified>,
|
||||||
|
embedder_index: u8,
|
||||||
|
quantized: bool,
|
||||||
|
) -> Self {
|
||||||
|
Self { database, embedder_index, quantized }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn index(&self) -> u16 {
|
pub fn embedder_index(&self) -> u8 {
|
||||||
self.index
|
self.embedder_index
|
||||||
|
}
|
||||||
|
|
||||||
|
fn readers<'a, D: arroy::Distance>(
|
||||||
|
&'a self,
|
||||||
|
rtxn: &'a RoTxn<'a>,
|
||||||
|
db: arroy::Database<D>,
|
||||||
|
) -> impl Iterator<Item = Result<arroy::Reader<D>, arroy::Error>> + 'a {
|
||||||
|
arroy_db_range_for_embedder(self.embedder_index).map_while(move |index| {
|
||||||
|
match arroy::Reader::open(rtxn, index, db) {
|
||||||
|
Ok(reader) => match reader.is_empty(rtxn) {
|
||||||
|
Ok(false) => Some(Ok(reader)),
|
||||||
|
Ok(true) => None,
|
||||||
|
Err(e) => Some(Err(e)),
|
||||||
|
},
|
||||||
|
Err(arroy::Error::MissingMetadata(_)) => None,
|
||||||
|
Err(e) => Some(Err(e)),
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn dimensions(&self, rtxn: &RoTxn) -> Result<usize, arroy::Error> {
|
pub fn dimensions(&self, rtxn: &RoTxn) -> Result<usize, arroy::Error> {
|
||||||
|
let first_id = arroy_db_range_for_embedder(self.embedder_index).next().unwrap();
|
||||||
if self.quantized {
|
if self.quantized {
|
||||||
Ok(arroy::Reader::open(rtxn, self.index, self.quantized_db())?.dimensions())
|
Ok(arroy::Reader::open(rtxn, first_id, self.quantized_db())?.dimensions())
|
||||||
} else {
|
} else {
|
||||||
Ok(arroy::Reader::open(rtxn, self.index, self.angular_db())?.dimensions())
|
Ok(arroy::Reader::open(rtxn, first_id, self.angular_db())?.dimensions())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn quantize(
|
pub fn build_and_quantize<R: rand::Rng + rand::SeedableRng>(
|
||||||
&mut self,
|
&mut self,
|
||||||
wtxn: &mut RwTxn,
|
wtxn: &mut RwTxn,
|
||||||
index: u16,
|
rng: &mut R,
|
||||||
dimension: usize,
|
dimension: usize,
|
||||||
|
quantizing: bool,
|
||||||
) -> Result<(), arroy::Error> {
|
) -> Result<(), arroy::Error> {
|
||||||
if !self.quantized {
|
for index in arroy_db_range_for_embedder(self.embedder_index) {
|
||||||
|
if self.quantized {
|
||||||
|
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
|
||||||
|
if writer.need_build(wtxn)? {
|
||||||
|
writer.build(wtxn, rng, None)?
|
||||||
|
} else if writer.is_empty(wtxn)? {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
|
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
|
||||||
|
// If we are quantizing the databases, we can't know from meilisearch
|
||||||
|
// if the db was empty but still contained the wrong metadata, thus we need
|
||||||
|
// to quantize everything and can't stop early. Since this operation can
|
||||||
|
// only happens once in the life of an embedder, it's not very performances
|
||||||
|
// sensitive.
|
||||||
|
if quantizing && !self.quantized {
|
||||||
|
let writer =
|
||||||
writer.prepare_changing_distance::<BinaryQuantizedAngular>(wtxn)?;
|
writer.prepare_changing_distance::<BinaryQuantizedAngular>(wtxn)?;
|
||||||
self.quantized = true;
|
writer.build(wtxn, rng, None)?
|
||||||
|
} else if writer.need_build(wtxn)? {
|
||||||
|
writer.build(wtxn, rng, None)?
|
||||||
|
} else if writer.is_empty(wtxn)? {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn need_build(&self, rtxn: &RoTxn, dimension: usize) -> Result<bool, arroy::Error> {
|
/// Overwrite all the embeddings associated with the index and item ID.
|
||||||
if self.quantized {
|
/// /!\ It won't remove embeddings after the last passed embedding, which can leave stale embeddings.
|
||||||
arroy::Writer::new(self.quantized_db(), self.index, dimension).need_build(rtxn)
|
/// You should call `del_items` on the `item_id` before calling this method.
|
||||||
} else {
|
/// /!\ Cannot insert more than u8::MAX embeddings; after inserting u8::MAX embeddings, all the remaining ones will be silently ignored.
|
||||||
arroy::Writer::new(self.angular_db(), self.index, dimension).need_build(rtxn)
|
pub fn add_items(
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn build<R: rand::Rng + rand::SeedableRng>(
|
|
||||||
&self,
|
&self,
|
||||||
wtxn: &mut RwTxn,
|
wtxn: &mut RwTxn,
|
||||||
rng: &mut R,
|
item_id: arroy::ItemId,
|
||||||
dimension: usize,
|
embeddings: &Embeddings<f32>,
|
||||||
) -> Result<(), arroy::Error> {
|
) -> Result<(), arroy::Error> {
|
||||||
|
let dimension = embeddings.dimension();
|
||||||
|
for (index, vector) in
|
||||||
|
arroy_db_range_for_embedder(self.embedder_index).zip(embeddings.iter())
|
||||||
|
{
|
||||||
if self.quantized {
|
if self.quantized {
|
||||||
arroy::Writer::new(self.quantized_db(), self.index, dimension).build(wtxn, rng, None)
|
arroy::Writer::new(self.quantized_db(), index, dimension)
|
||||||
|
.add_item(wtxn, item_id, vector)?
|
||||||
} else {
|
} else {
|
||||||
arroy::Writer::new(self.angular_db(), self.index, dimension).build(wtxn, rng, None)
|
arroy::Writer::new(self.angular_db(), index, dimension)
|
||||||
|
.add_item(wtxn, item_id, vector)?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add one document int for this index where we can find an empty spot.
|
||||||
pub fn add_item(
|
pub fn add_item(
|
||||||
&self,
|
&self,
|
||||||
wtxn: &mut RwTxn,
|
wtxn: &mut RwTxn,
|
||||||
dimension: usize,
|
|
||||||
item_id: arroy::ItemId,
|
item_id: arroy::ItemId,
|
||||||
vector: &[f32],
|
vector: &[f32],
|
||||||
) -> Result<(), arroy::Error> {
|
) -> Result<(), arroy::Error> {
|
||||||
if self.quantized {
|
if self.quantized {
|
||||||
arroy::Writer::new(self.quantized_db(), self.index, dimension)
|
self._add_item(wtxn, self.quantized_db(), item_id, vector)
|
||||||
.add_item(wtxn, item_id, vector)
|
|
||||||
} else {
|
} else {
|
||||||
arroy::Writer::new(self.angular_db(), self.index, dimension)
|
self._add_item(wtxn, self.angular_db(), item_id, vector)
|
||||||
.add_item(wtxn, item_id, vector)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn del_item(
|
fn _add_item<D: arroy::Distance>(
|
||||||
|
&self,
|
||||||
|
wtxn: &mut RwTxn,
|
||||||
|
db: arroy::Database<D>,
|
||||||
|
item_id: arroy::ItemId,
|
||||||
|
vector: &[f32],
|
||||||
|
) -> Result<(), arroy::Error> {
|
||||||
|
let dimension = vector.len();
|
||||||
|
|
||||||
|
for index in arroy_db_range_for_embedder(self.embedder_index) {
|
||||||
|
let writer = arroy::Writer::new(db, index, dimension);
|
||||||
|
if !writer.contains_item(wtxn, item_id)? {
|
||||||
|
writer.add_item(wtxn, item_id, vector)?;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete all embeddings from a specific `item_id`
|
||||||
|
pub fn del_items(
|
||||||
&self,
|
&self,
|
||||||
wtxn: &mut RwTxn,
|
wtxn: &mut RwTxn,
|
||||||
dimension: usize,
|
dimension: usize,
|
||||||
item_id: arroy::ItemId,
|
item_id: arroy::ItemId,
|
||||||
|
) -> Result<(), arroy::Error> {
|
||||||
|
for index in arroy_db_range_for_embedder(self.embedder_index) {
|
||||||
|
if self.quantized {
|
||||||
|
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
|
||||||
|
if !writer.del_item(wtxn, item_id)? {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
|
||||||
|
if !writer.del_item(wtxn, item_id)? {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete one item.
|
||||||
|
pub fn del_item(
|
||||||
|
&self,
|
||||||
|
wtxn: &mut RwTxn,
|
||||||
|
item_id: arroy::ItemId,
|
||||||
|
vector: &[f32],
|
||||||
) -> Result<bool, arroy::Error> {
|
) -> Result<bool, arroy::Error> {
|
||||||
if self.quantized {
|
if self.quantized {
|
||||||
arroy::Writer::new(self.quantized_db(), self.index, dimension).del_item(wtxn, item_id)
|
self._del_item(wtxn, self.quantized_db(), item_id, vector)
|
||||||
} else {
|
} else {
|
||||||
arroy::Writer::new(self.angular_db(), self.index, dimension).del_item(wtxn, item_id)
|
self._del_item(wtxn, self.angular_db(), item_id, vector)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn _del_item<D: arroy::Distance>(
|
||||||
|
&self,
|
||||||
|
wtxn: &mut RwTxn,
|
||||||
|
db: arroy::Database<D>,
|
||||||
|
item_id: arroy::ItemId,
|
||||||
|
vector: &[f32],
|
||||||
|
) -> Result<bool, arroy::Error> {
|
||||||
|
let dimension = vector.len();
|
||||||
|
let mut deleted_index = None;
|
||||||
|
|
||||||
|
for index in arroy_db_range_for_embedder(self.embedder_index) {
|
||||||
|
let writer = arroy::Writer::new(db, index, dimension);
|
||||||
|
let Some(candidate) = writer.item_vector(wtxn, item_id)? else {
|
||||||
|
// uses invariant: vectors are packed in the first writers.
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
if candidate == vector {
|
||||||
|
writer.del_item(wtxn, item_id)?;
|
||||||
|
deleted_index = Some(index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 🥲 enforce invariant: vectors are packed in the first writers.
|
||||||
|
if let Some(deleted_index) = deleted_index {
|
||||||
|
let mut last_index_with_a_vector = None;
|
||||||
|
for index in
|
||||||
|
arroy_db_range_for_embedder(self.embedder_index).skip(deleted_index as usize)
|
||||||
|
{
|
||||||
|
let writer = arroy::Writer::new(db, index, dimension);
|
||||||
|
let Some(candidate) = writer.item_vector(wtxn, item_id)? else {
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
last_index_with_a_vector = Some((index, candidate));
|
||||||
|
}
|
||||||
|
if let Some((last_index, vector)) = last_index_with_a_vector {
|
||||||
|
let writer = arroy::Writer::new(db, last_index, dimension);
|
||||||
|
writer.del_item(wtxn, item_id)?;
|
||||||
|
let writer = arroy::Writer::new(db, deleted_index, dimension);
|
||||||
|
writer.add_item(wtxn, item_id, &vector)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(deleted_index.is_some())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn clear(&self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), arroy::Error> {
|
pub fn clear(&self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), arroy::Error> {
|
||||||
|
for index in arroy_db_range_for_embedder(self.embedder_index) {
|
||||||
if self.quantized {
|
if self.quantized {
|
||||||
arroy::Writer::new(self.quantized_db(), self.index, dimension).clear(wtxn)
|
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
|
||||||
|
if writer.is_empty(wtxn)? {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
writer.clear(wtxn)?;
|
||||||
} else {
|
} else {
|
||||||
arroy::Writer::new(self.angular_db(), self.index, dimension).clear(wtxn)
|
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
|
||||||
|
if writer.is_empty(wtxn)? {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
writer.clear(wtxn)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
pub fn is_empty(&self, rtxn: &RoTxn, dimension: usize) -> Result<bool, arroy::Error> {
|
|
||||||
if self.quantized {
|
|
||||||
arroy::Writer::new(self.quantized_db(), self.index, dimension).is_empty(rtxn)
|
|
||||||
} else {
|
|
||||||
arroy::Writer::new(self.angular_db(), self.index, dimension).is_empty(rtxn)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn contains_item(
|
pub fn contains_item(
|
||||||
@ -139,11 +277,25 @@ impl ArroyWrapper {
|
|||||||
dimension: usize,
|
dimension: usize,
|
||||||
item: arroy::ItemId,
|
item: arroy::ItemId,
|
||||||
) -> Result<bool, arroy::Error> {
|
) -> Result<bool, arroy::Error> {
|
||||||
if self.quantized {
|
for index in arroy_db_range_for_embedder(self.embedder_index) {
|
||||||
arroy::Writer::new(self.quantized_db(), self.index, dimension).contains_item(rtxn, item)
|
let contains = if self.quantized {
|
||||||
} else {
|
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
|
||||||
arroy::Writer::new(self.angular_db(), self.index, dimension).contains_item(rtxn, item)
|
if writer.is_empty(rtxn)? {
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
writer.contains_item(rtxn, item)?
|
||||||
|
} else {
|
||||||
|
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
|
||||||
|
if writer.is_empty(rtxn)? {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
writer.contains_item(rtxn, item)?
|
||||||
|
};
|
||||||
|
if contains {
|
||||||
|
return Ok(contains);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn nns_by_item(
|
pub fn nns_by_item(
|
||||||
@ -152,38 +304,91 @@ impl ArroyWrapper {
|
|||||||
item: ItemId,
|
item: ItemId,
|
||||||
limit: usize,
|
limit: usize,
|
||||||
filter: Option<&RoaringBitmap>,
|
filter: Option<&RoaringBitmap>,
|
||||||
) -> Result<Option<Vec<(ItemId, f32)>>, arroy::Error> {
|
) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
|
||||||
if self.quantized {
|
if self.quantized {
|
||||||
arroy::Reader::open(rtxn, self.index, self.quantized_db())?
|
self._nns_by_item(rtxn, self.quantized_db(), item, limit, filter)
|
||||||
.nns_by_item(rtxn, item, limit, None, None, filter)
|
|
||||||
} else {
|
} else {
|
||||||
arroy::Reader::open(rtxn, self.index, self.angular_db())?
|
self._nns_by_item(rtxn, self.angular_db(), item, limit, filter)
|
||||||
.nns_by_item(rtxn, item, limit, None, None, filter)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn _nns_by_item<D: arroy::Distance>(
|
||||||
|
&self,
|
||||||
|
rtxn: &RoTxn,
|
||||||
|
db: arroy::Database<D>,
|
||||||
|
item: ItemId,
|
||||||
|
limit: usize,
|
||||||
|
filter: Option<&RoaringBitmap>,
|
||||||
|
) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
|
||||||
|
let mut results = Vec::new();
|
||||||
|
|
||||||
|
for reader in self.readers(rtxn, db) {
|
||||||
|
let ret = reader?.nns_by_item(rtxn, item, limit, None, None, filter)?;
|
||||||
|
if let Some(mut ret) = ret {
|
||||||
|
results.append(&mut ret);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance));
|
||||||
|
Ok(results)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn nns_by_vector(
|
pub fn nns_by_vector(
|
||||||
&self,
|
&self,
|
||||||
txn: &RoTxn,
|
rtxn: &RoTxn,
|
||||||
item: &[f32],
|
vector: &[f32],
|
||||||
limit: usize,
|
limit: usize,
|
||||||
filter: Option<&RoaringBitmap>,
|
filter: Option<&RoaringBitmap>,
|
||||||
) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
|
) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
|
||||||
if self.quantized {
|
if self.quantized {
|
||||||
arroy::Reader::open(txn, self.index, self.quantized_db())?
|
self._nns_by_vector(rtxn, self.quantized_db(), vector, limit, filter)
|
||||||
.nns_by_vector(txn, item, limit, None, None, filter)
|
|
||||||
} else {
|
} else {
|
||||||
arroy::Reader::open(txn, self.index, self.angular_db())?
|
self._nns_by_vector(rtxn, self.angular_db(), vector, limit, filter)
|
||||||
.nns_by_vector(txn, item, limit, None, None, filter)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn item_vector(&self, rtxn: &RoTxn, docid: u32) -> Result<Option<Vec<f32>>, arroy::Error> {
|
fn _nns_by_vector<D: arroy::Distance>(
|
||||||
if self.quantized {
|
&self,
|
||||||
arroy::Reader::open(rtxn, self.index, self.quantized_db())?.item_vector(rtxn, docid)
|
rtxn: &RoTxn,
|
||||||
} else {
|
db: arroy::Database<D>,
|
||||||
arroy::Reader::open(rtxn, self.index, self.angular_db())?.item_vector(rtxn, docid)
|
vector: &[f32],
|
||||||
|
limit: usize,
|
||||||
|
filter: Option<&RoaringBitmap>,
|
||||||
|
) -> Result<Vec<(ItemId, f32)>, arroy::Error> {
|
||||||
|
let mut results = Vec::new();
|
||||||
|
|
||||||
|
for reader in self.readers(rtxn, db) {
|
||||||
|
let mut ret = reader?.nns_by_vector(rtxn, vector, limit, None, None, filter)?;
|
||||||
|
results.append(&mut ret);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance));
|
||||||
|
|
||||||
|
Ok(results)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> Result<Vec<Vec<f32>>, arroy::Error> {
|
||||||
|
let mut vectors = Vec::new();
|
||||||
|
|
||||||
|
if self.quantized {
|
||||||
|
for reader in self.readers(rtxn, self.quantized_db()) {
|
||||||
|
if let Some(vec) = reader?.item_vector(rtxn, item_id)? {
|
||||||
|
vectors.push(vec);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for reader in self.readers(rtxn, self.angular_db()) {
|
||||||
|
if let Some(vec) = reader?.item_vector(rtxn, item_id)? {
|
||||||
|
vectors.push(vec);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(vectors)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn angular_db(&self) -> arroy::Database<Angular> {
|
fn angular_db(&self) -> arroy::Database<Angular> {
|
||||||
|
Loading…
Reference in New Issue
Block a user