diff --git a/milli/src/index.rs b/milli/src/index.rs index c47896df7..5b7a9c58c 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -1610,24 +1610,6 @@ impl Index { .unwrap_or_default()) } - pub fn arroy_readers<'a>( - &'a self, - rtxn: &'a RoTxn<'a>, - embedder_id: u8, - quantized: bool, - ) -> impl Iterator> + 'a { - crate::vector::arroy_db_range_for_embedder(embedder_id).map_while(move |k| { - let reader = ArroyWrapper::new(self.vector_arroy, k, quantized); - // Here we don't care about the dimensions, but we want to know if we can read - // in the database or if its metadata are missing because there is no document with that many vectors. - match reader.dimensions(rtxn) { - Ok(_) => Some(Ok(reader)), - Err(arroy::Error::MissingMetadata(_)) => None, - Err(e) => Some(Err(e.into())), - } - }) - } - pub(crate) fn put_search_cutoff(&self, wtxn: &mut RwTxn<'_>, cutoff: u64) -> heed::Result<()> { self.main.remap_types::().put(wtxn, main_key::SEARCH_CUTOFF, &cutoff) } @@ -1649,14 +1631,9 @@ impl Index { let embedding_configs = self.embedding_configs(rtxn)?; for config in embedding_configs { let embedder_id = self.embedder_category_id.get(rtxn, &config.name)?.unwrap(); - let embeddings = self - .arroy_readers(rtxn, embedder_id, config.config.quantized()) - .map_while(|reader| { - reader - .and_then(|r| r.item_vector(rtxn, docid).map_err(|e| e.into())) - .transpose() - }) - .collect::>>()?; + let reader = + ArroyWrapper::new(self.vector_arroy, embedder_id, config.config.quantized()); + let embeddings = reader.item_vectors(rtxn, docid)?; res.insert(config.name.to_owned(), embeddings); } Ok(res) diff --git a/milli/src/search/new/vector_sort.rs b/milli/src/search/new/vector_sort.rs index de1dacbe7..90377c09c 100644 --- a/milli/src/search/new/vector_sort.rs +++ b/milli/src/search/new/vector_sort.rs @@ -1,11 +1,10 @@ use std::iter::FromIterator; -use ordered_float::OrderedFloat; use roaring::RoaringBitmap; use super::ranking_rules::{RankingRule, RankingRuleOutput, RankingRuleQueryTrait}; use crate::score_details::{self, ScoreDetails}; -use crate::vector::{DistributionShift, Embedder}; +use crate::vector::{ArroyWrapper, DistributionShift, Embedder}; use crate::{DocumentId, Result, SearchContext, SearchLogger}; pub struct VectorSort { @@ -53,14 +52,9 @@ impl VectorSort { vector_candidates: &RoaringBitmap, ) -> Result<()> { let target = &self.target; - let mut results = Vec::new(); - for reader in ctx.index.arroy_readers(ctx.txn, self.embedder_index, self.quantized) { - let nns_by_vector = - reader?.nns_by_vector(ctx.txn, target, self.limit, Some(vector_candidates))?; - results.extend(nns_by_vector.into_iter()); - } - results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); + let reader = ArroyWrapper::new(ctx.index.vector_arroy, self.embedder_index, self.quantized); + let results = reader.nns_by_vector(ctx.txn, target, self.limit, Some(vector_candidates))?; self.cached_sorted_docids = results.into_iter(); Ok(()) diff --git a/milli/src/search/similar.rs b/milli/src/search/similar.rs index 0cb8d723d..5547d800e 100644 --- a/milli/src/search/similar.rs +++ b/milli/src/search/similar.rs @@ -1,10 +1,9 @@ use std::sync::Arc; -use ordered_float::OrderedFloat; use roaring::RoaringBitmap; use crate::score_details::{self, ScoreDetails}; -use crate::vector::Embedder; +use crate::vector::{ArroyWrapper, Embedder}; use crate::{filtered_universe, DocumentId, Filter, Index, Result, SearchResult}; pub struct Similar<'a> { @@ -71,23 +70,13 @@ impl<'a> Similar<'a> { .get(self.rtxn, &self.embedder_name)? .ok_or_else(|| crate::UserError::InvalidEmbedder(self.embedder_name.to_owned()))?; - let mut results = Vec::new(); - - for reader in self.index.arroy_readers(self.rtxn, embedder_index, self.quantized) { - let nns_by_item = reader?.nns_by_item( - self.rtxn, - self.id, - self.limit + self.offset + 1, - Some(&universe), - )?; - if let Some(mut nns_by_item) = nns_by_item { - results.append(&mut nns_by_item); - } else { - break; - } - } - - results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); + let reader = ArroyWrapper::new(self.index.vector_arroy, embedder_index, self.quantized); + let results = reader.nns_by_item( + self.rtxn, + self.id, + self.limit + self.offset + 1, + Some(&universe), + )?; let mut documents_ids = Vec::with_capacity(self.limit); let mut document_scores = Vec::with_capacity(self.limit); diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 326dd842d..e164a0817 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -689,9 +689,8 @@ where key: None, }, )?; - let first_id = crate::vector::arroy_db_range_for_embedder(index).next().unwrap(); let reader = - ArroyWrapper::new(self.index.vector_arroy, first_id, action.was_quantized); + ArroyWrapper::new(self.index.vector_arroy, index, action.was_quantized); let dim = reader.dimensions(self.wtxn)?; dimension.insert(name.to_string(), dim); } @@ -713,17 +712,8 @@ where let is_quantizing = embedder_config.map_or(false, |action| action.is_being_quantized); pool.install(|| { - for k in crate::vector::arroy_db_range_for_embedder(embedder_index) { - let mut writer = ArroyWrapper::new(vector_arroy, k, was_quantized); - if is_quantizing { - writer.quantize(wtxn, k, dimension)?; - } - if writer.need_build(wtxn, dimension)? { - writer.build(wtxn, &mut rng, dimension)?; - } else if writer.is_empty(wtxn, dimension)? { - break; - } - } + let mut writer = ArroyWrapper::new(vector_arroy, embedder_index, was_quantized); + writer.build_and_quantize(wtxn, &mut rng, dimension, is_quantizing)?; Result::Ok(()) }) .map_err(InternalError::from)??; diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index bb2cfe56c..763f30d0f 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -990,27 +990,24 @@ impl<'a, 'i> Transform<'a, 'i> { None }; - let readers: Result, &RoaringBitmap)>> = settings_diff + let readers: BTreeMap<&str, (ArroyWrapper, &RoaringBitmap)> = settings_diff .embedding_config_updates .iter() .filter_map(|(name, action)| { if let Some(WriteBackToDocuments { embedder_id, user_provided }) = action.write_back() { - let readers: Result> = self - .index - .arroy_readers(wtxn, *embedder_id, action.was_quantized) - .collect(); - match readers { - Ok(readers) => Some(Ok((name.as_str(), (readers, user_provided)))), - Err(error) => Some(Err(error)), - } + let reader = ArroyWrapper::new( + self.index.vector_arroy, + *embedder_id, + action.was_quantized, + ); + Some((name.as_str(), (reader, user_provided))) } else { None } }) .collect(); - let readers = readers?; let old_vectors_fid = settings_diff .old @@ -1048,34 +1045,24 @@ impl<'a, 'i> Transform<'a, 'i> { arroy::Error, > = readers .iter() - .filter_map(|(name, (readers, user_provided))| { + .filter_map(|(name, (reader, user_provided))| { if !user_provided.contains(docid) { return None; } - let mut vectors = Vec::new(); - for reader in readers { - let Some(vector) = reader.item_vector(wtxn, docid).transpose() else { - break; - }; - - match vector { - Ok(vector) => vectors.push(vector), - Err(error) => return Some(Err(error)), - } + match reader.item_vectors(wtxn, docid) { + Ok(vectors) if vectors.is_empty() => None, + Ok(vectors) => Some(Ok(( + name.to_string(), + serde_json::to_value(ExplicitVectors { + embeddings: Some( + VectorOrArrayOfVectors::from_array_of_vectors(vectors), + ), + regenerate: false, + }) + .unwrap(), + ))), + Err(e) => Some(Err(e)), } - if vectors.is_empty() { - return None; - } - Some(Ok(( - name.to_string(), - serde_json::to_value(ExplicitVectors { - embeddings: Some(VectorOrArrayOfVectors::from_array_of_vectors( - vectors, - )), - regenerate: false, - }) - .unwrap(), - ))) }) .collect(); @@ -1104,11 +1091,9 @@ impl<'a, 'i> Transform<'a, 'i> { } // delete all vectors from the embedders that need removal - for (_, (readers, _)) in readers { - for reader in readers { - let dimensions = reader.dimensions(wtxn)?; - reader.clear(wtxn, dimensions)?; - } + for (_, (reader, _)) in readers { + let dimensions = reader.dimensions(wtxn)?; + reader.clear(wtxn, dimensions)?; } let grenad_params = GrenadParameters { diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index 97a4bf712..20e70b2a6 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -673,22 +673,14 @@ pub(crate) fn write_typed_chunk_into_index( .get(&embedder_name) .map_or(false, |conf| conf.2); // FIXME: allow customizing distance - let writers: Vec<_> = crate::vector::arroy_db_range_for_embedder(embedder_index) - .map(|k| ArroyWrapper::new(index.vector_arroy, k, binary_quantized)) - .collect(); + let writer = ArroyWrapper::new(index.vector_arroy, embedder_index, binary_quantized); // remove vectors for docids we want them removed let merger = remove_vectors_builder.build(); let mut iter = merger.into_stream_merger_iter()?; while let Some((key, _)) = iter.next()? { let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap(); - - for writer in &writers { - // Uses invariant: vectors are packed in the first writers. - if !writer.del_item(wtxn, expected_dimension, docid)? { - break; - } - } + writer.del_items(wtxn, expected_dimension, docid)?; } // add generated embeddings @@ -716,9 +708,7 @@ pub(crate) fn write_typed_chunk_into_index( embeddings.embedding_count(), ))); } - for (embedding, writer) in embeddings.iter().zip(&writers) { - writer.add_item(wtxn, expected_dimension, docid, embedding)?; - } + writer.add_items(wtxn, docid, &embeddings)?; } // perform the manual diff @@ -733,51 +723,14 @@ pub(crate) fn write_typed_chunk_into_index( if let Some(value) = vector_deladd_obkv.get(DelAdd::Deletion) { let vector: Vec = pod_collect_to_vec(value); - let mut deleted_index = None; - for (index, writer) in writers.iter().enumerate() { - let Some(candidate) = writer.item_vector(wtxn, docid)? else { - // uses invariant: vectors are packed in the first writers. - break; - }; - if candidate == vector { - writer.del_item(wtxn, expected_dimension, docid)?; - deleted_index = Some(index); - } - } - - // 🥲 enforce invariant: vectors are packed in the first writers. - if let Some(deleted_index) = deleted_index { - let mut last_index_with_a_vector = None; - for (index, writer) in writers.iter().enumerate().skip(deleted_index) { - let Some(candidate) = writer.item_vector(wtxn, docid)? else { - break; - }; - last_index_with_a_vector = Some((index, candidate)); - } - if let Some((last_index, vector)) = last_index_with_a_vector { - // unwrap: computed the index from the list of writers - let writer = writers.get(last_index).unwrap(); - writer.del_item(wtxn, expected_dimension, docid)?; - writers.get(deleted_index).unwrap().add_item( - wtxn, - expected_dimension, - docid, - &vector, - )?; - } - } + writer.del_item(wtxn, docid, &vector)?; } if let Some(value) = vector_deladd_obkv.get(DelAdd::Addition) { let vector = pod_collect_to_vec(value); // overflow was detected during vector extraction. - for writer in &writers { - if !writer.contains_item(wtxn, expected_dimension, docid)? { - writer.add_item(wtxn, expected_dimension, docid, &vector)?; - break; - } - } + writer.add_item(wtxn, docid, &vector)?; } } diff --git a/milli/src/vector/mod.rs b/milli/src/vector/mod.rs index d52e68bbe..b6d6510af 100644 --- a/milli/src/vector/mod.rs +++ b/milli/src/vector/mod.rs @@ -32,105 +32,243 @@ pub const REQUEST_PARALLELISM: usize = 40; pub struct ArroyWrapper { quantized: bool, - index: u16, + embedder_index: u8, database: arroy::Database, } impl ArroyWrapper { - pub fn new(database: arroy::Database, index: u16, quantized: bool) -> Self { - Self { database, index, quantized } + pub fn new( + database: arroy::Database, + embedder_index: u8, + quantized: bool, + ) -> Self { + Self { database, embedder_index, quantized } } - pub fn index(&self) -> u16 { - self.index + pub fn embedder_index(&self) -> u8 { + self.embedder_index + } + + fn readers<'a, D: arroy::Distance>( + &'a self, + rtxn: &'a RoTxn<'a>, + db: arroy::Database, + ) -> impl Iterator, arroy::Error>> + 'a { + arroy_db_range_for_embedder(self.embedder_index).map_while(move |index| { + match arroy::Reader::open(rtxn, index, db) { + Ok(reader) => match reader.is_empty(rtxn) { + Ok(false) => Some(Ok(reader)), + Ok(true) => None, + Err(e) => Some(Err(e)), + }, + Err(arroy::Error::MissingMetadata(_)) => None, + Err(e) => Some(Err(e)), + } + }) } pub fn dimensions(&self, rtxn: &RoTxn) -> Result { + let first_id = arroy_db_range_for_embedder(self.embedder_index).next().unwrap(); if self.quantized { - Ok(arroy::Reader::open(rtxn, self.index, self.quantized_db())?.dimensions()) + Ok(arroy::Reader::open(rtxn, first_id, self.quantized_db())?.dimensions()) } else { - Ok(arroy::Reader::open(rtxn, self.index, self.angular_db())?.dimensions()) + Ok(arroy::Reader::open(rtxn, first_id, self.angular_db())?.dimensions()) } } - pub fn quantize( + pub fn build_and_quantize( &mut self, wtxn: &mut RwTxn, - index: u16, + rng: &mut R, dimension: usize, + quantizing: bool, ) -> Result<(), arroy::Error> { - if !self.quantized { - let writer = arroy::Writer::new(self.angular_db(), index, dimension); - writer.prepare_changing_distance::(wtxn)?; - self.quantized = true; + for index in arroy_db_range_for_embedder(self.embedder_index) { + if self.quantized { + let writer = arroy::Writer::new(self.quantized_db(), index, dimension); + if writer.need_build(wtxn)? { + writer.build(wtxn, rng, None)? + } else if writer.is_empty(wtxn)? { + break; + } + } else { + let writer = arroy::Writer::new(self.angular_db(), index, dimension); + // If we are quantizing the databases, we can't know from meilisearch + // if the db was empty but still contained the wrong metadata, thus we need + // to quantize everything and can't stop early. Since this operation can + // only happens once in the life of an embedder, it's not very performances + // sensitive. + if quantizing && !self.quantized { + let writer = + writer.prepare_changing_distance::(wtxn)?; + writer.build(wtxn, rng, None)? + } else if writer.need_build(wtxn)? { + writer.build(wtxn, rng, None)? + } else if writer.is_empty(wtxn)? { + break; + } + } } Ok(()) } - pub fn need_build(&self, rtxn: &RoTxn, dimension: usize) -> Result { - if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension).need_build(rtxn) - } else { - arroy::Writer::new(self.angular_db(), self.index, dimension).need_build(rtxn) - } - } - - pub fn build( + /// Overwrite all the embeddings associated with the index and item ID. + /// /!\ It won't remove embeddings after the last passed embedding, which can leave stale embeddings. + /// You should call `del_items` on the `item_id` before calling this method. + /// /!\ Cannot insert more than u8::MAX embeddings; after inserting u8::MAX embeddings, all the remaining ones will be silently ignored. + pub fn add_items( &self, wtxn: &mut RwTxn, - rng: &mut R, - dimension: usize, + item_id: arroy::ItemId, + embeddings: &Embeddings, ) -> Result<(), arroy::Error> { - if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension).build(wtxn, rng, None) - } else { - arroy::Writer::new(self.angular_db(), self.index, dimension).build(wtxn, rng, None) + let dimension = embeddings.dimension(); + for (index, vector) in + arroy_db_range_for_embedder(self.embedder_index).zip(embeddings.iter()) + { + if self.quantized { + arroy::Writer::new(self.quantized_db(), index, dimension) + .add_item(wtxn, item_id, vector)? + } else { + arroy::Writer::new(self.angular_db(), index, dimension) + .add_item(wtxn, item_id, vector)? + } } + Ok(()) } + /// Add one document int for this index where we can find an empty spot. pub fn add_item( &self, wtxn: &mut RwTxn, - dimension: usize, item_id: arroy::ItemId, vector: &[f32], ) -> Result<(), arroy::Error> { if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension) - .add_item(wtxn, item_id, vector) + self._add_item(wtxn, self.quantized_db(), item_id, vector) } else { - arroy::Writer::new(self.angular_db(), self.index, dimension) - .add_item(wtxn, item_id, vector) + self._add_item(wtxn, self.angular_db(), item_id, vector) } } - pub fn del_item( + fn _add_item( + &self, + wtxn: &mut RwTxn, + db: arroy::Database, + item_id: arroy::ItemId, + vector: &[f32], + ) -> Result<(), arroy::Error> { + let dimension = vector.len(); + + for index in arroy_db_range_for_embedder(self.embedder_index) { + let writer = arroy::Writer::new(db, index, dimension); + if !writer.contains_item(wtxn, item_id)? { + writer.add_item(wtxn, item_id, vector)?; + break; + } + } + Ok(()) + } + + /// Delete all embeddings from a specific `item_id` + pub fn del_items( &self, wtxn: &mut RwTxn, dimension: usize, item_id: arroy::ItemId, + ) -> Result<(), arroy::Error> { + for index in arroy_db_range_for_embedder(self.embedder_index) { + if self.quantized { + let writer = arroy::Writer::new(self.quantized_db(), index, dimension); + if !writer.del_item(wtxn, item_id)? { + break; + } + } else { + let writer = arroy::Writer::new(self.angular_db(), index, dimension); + if !writer.del_item(wtxn, item_id)? { + break; + } + } + } + + Ok(()) + } + + /// Delete one item. + pub fn del_item( + &self, + wtxn: &mut RwTxn, + item_id: arroy::ItemId, + vector: &[f32], ) -> Result { if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension).del_item(wtxn, item_id) + self._del_item(wtxn, self.quantized_db(), item_id, vector) } else { - arroy::Writer::new(self.angular_db(), self.index, dimension).del_item(wtxn, item_id) + self._del_item(wtxn, self.angular_db(), item_id, vector) } } + fn _del_item( + &self, + wtxn: &mut RwTxn, + db: arroy::Database, + item_id: arroy::ItemId, + vector: &[f32], + ) -> Result { + let dimension = vector.len(); + let mut deleted_index = None; + + for index in arroy_db_range_for_embedder(self.embedder_index) { + let writer = arroy::Writer::new(db, index, dimension); + let Some(candidate) = writer.item_vector(wtxn, item_id)? else { + // uses invariant: vectors are packed in the first writers. + break; + }; + if candidate == vector { + writer.del_item(wtxn, item_id)?; + deleted_index = Some(index); + } + } + + // 🥲 enforce invariant: vectors are packed in the first writers. + if let Some(deleted_index) = deleted_index { + let mut last_index_with_a_vector = None; + for index in + arroy_db_range_for_embedder(self.embedder_index).skip(deleted_index as usize) + { + let writer = arroy::Writer::new(db, index, dimension); + let Some(candidate) = writer.item_vector(wtxn, item_id)? else { + break; + }; + last_index_with_a_vector = Some((index, candidate)); + } + if let Some((last_index, vector)) = last_index_with_a_vector { + let writer = arroy::Writer::new(db, last_index, dimension); + writer.del_item(wtxn, item_id)?; + let writer = arroy::Writer::new(db, deleted_index, dimension); + writer.add_item(wtxn, item_id, &vector)?; + } + } + Ok(deleted_index.is_some()) + } + pub fn clear(&self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), arroy::Error> { - if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension).clear(wtxn) - } else { - arroy::Writer::new(self.angular_db(), self.index, dimension).clear(wtxn) - } - } - - pub fn is_empty(&self, rtxn: &RoTxn, dimension: usize) -> Result { - if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension).is_empty(rtxn) - } else { - arroy::Writer::new(self.angular_db(), self.index, dimension).is_empty(rtxn) + for index in arroy_db_range_for_embedder(self.embedder_index) { + if self.quantized { + let writer = arroy::Writer::new(self.quantized_db(), index, dimension); + if writer.is_empty(wtxn)? { + break; + } + writer.clear(wtxn)?; + } else { + let writer = arroy::Writer::new(self.angular_db(), index, dimension); + if writer.is_empty(wtxn)? { + break; + } + writer.clear(wtxn)?; + } } + Ok(()) } pub fn contains_item( @@ -139,11 +277,25 @@ impl ArroyWrapper { dimension: usize, item: arroy::ItemId, ) -> Result { - if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension).contains_item(rtxn, item) - } else { - arroy::Writer::new(self.angular_db(), self.index, dimension).contains_item(rtxn, item) + for index in arroy_db_range_for_embedder(self.embedder_index) { + let contains = if self.quantized { + let writer = arroy::Writer::new(self.quantized_db(), index, dimension); + if writer.is_empty(rtxn)? { + break; + } + writer.contains_item(rtxn, item)? + } else { + let writer = arroy::Writer::new(self.angular_db(), index, dimension); + if writer.is_empty(rtxn)? { + break; + } + writer.contains_item(rtxn, item)? + }; + if contains { + return Ok(contains); + } } + Ok(false) } pub fn nns_by_item( @@ -152,38 +304,91 @@ impl ArroyWrapper { item: ItemId, limit: usize, filter: Option<&RoaringBitmap>, - ) -> Result>, arroy::Error> { + ) -> Result, arroy::Error> { if self.quantized { - arroy::Reader::open(rtxn, self.index, self.quantized_db())? - .nns_by_item(rtxn, item, limit, None, None, filter) + self._nns_by_item(rtxn, self.quantized_db(), item, limit, filter) } else { - arroy::Reader::open(rtxn, self.index, self.angular_db())? - .nns_by_item(rtxn, item, limit, None, None, filter) + self._nns_by_item(rtxn, self.angular_db(), item, limit, filter) } } + fn _nns_by_item( + &self, + rtxn: &RoTxn, + db: arroy::Database, + item: ItemId, + limit: usize, + filter: Option<&RoaringBitmap>, + ) -> Result, arroy::Error> { + let mut results = Vec::new(); + + for reader in self.readers(rtxn, db) { + let ret = reader?.nns_by_item(rtxn, item, limit, None, None, filter)?; + if let Some(mut ret) = ret { + results.append(&mut ret); + } else { + break; + } + } + results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); + Ok(results) + } + pub fn nns_by_vector( &self, - txn: &RoTxn, - item: &[f32], + rtxn: &RoTxn, + vector: &[f32], limit: usize, filter: Option<&RoaringBitmap>, ) -> Result, arroy::Error> { if self.quantized { - arroy::Reader::open(txn, self.index, self.quantized_db())? - .nns_by_vector(txn, item, limit, None, None, filter) + self._nns_by_vector(rtxn, self.quantized_db(), vector, limit, filter) } else { - arroy::Reader::open(txn, self.index, self.angular_db())? - .nns_by_vector(txn, item, limit, None, None, filter) + self._nns_by_vector(rtxn, self.angular_db(), vector, limit, filter) } } - pub fn item_vector(&self, rtxn: &RoTxn, docid: u32) -> Result>, arroy::Error> { - if self.quantized { - arroy::Reader::open(rtxn, self.index, self.quantized_db())?.item_vector(rtxn, docid) - } else { - arroy::Reader::open(rtxn, self.index, self.angular_db())?.item_vector(rtxn, docid) + fn _nns_by_vector( + &self, + rtxn: &RoTxn, + db: arroy::Database, + vector: &[f32], + limit: usize, + filter: Option<&RoaringBitmap>, + ) -> Result, arroy::Error> { + let mut results = Vec::new(); + + for reader in self.readers(rtxn, db) { + let mut ret = reader?.nns_by_vector(rtxn, vector, limit, None, None, filter)?; + results.append(&mut ret); } + + results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); + + Ok(results) + } + + pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> Result>, arroy::Error> { + let mut vectors = Vec::new(); + + if self.quantized { + for reader in self.readers(rtxn, self.quantized_db()) { + if let Some(vec) = reader?.item_vector(rtxn, item_id)? { + vectors.push(vec); + } else { + break; + } + } + } else { + for reader in self.readers(rtxn, self.angular_db()) { + if let Some(vec) = reader?.item_vector(rtxn, item_id)? { + vectors.push(vec); + } else { + break; + } + } + } + Ok(vectors) } fn angular_db(&self) -> arroy::Database {