diff --git a/milli/src/index.rs b/milli/src/index.rs index c47896df7..5b7a9c58c 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -1610,24 +1610,6 @@ impl Index { .unwrap_or_default()) } - pub fn arroy_readers<'a>( - &'a self, - rtxn: &'a RoTxn<'a>, - embedder_id: u8, - quantized: bool, - ) -> impl Iterator> + 'a { - crate::vector::arroy_db_range_for_embedder(embedder_id).map_while(move |k| { - let reader = ArroyWrapper::new(self.vector_arroy, k, quantized); - // Here we don't care about the dimensions, but we want to know if we can read - // in the database or if its metadata are missing because there is no document with that many vectors. - match reader.dimensions(rtxn) { - Ok(_) => Some(Ok(reader)), - Err(arroy::Error::MissingMetadata(_)) => None, - Err(e) => Some(Err(e.into())), - } - }) - } - pub(crate) fn put_search_cutoff(&self, wtxn: &mut RwTxn<'_>, cutoff: u64) -> heed::Result<()> { self.main.remap_types::().put(wtxn, main_key::SEARCH_CUTOFF, &cutoff) } @@ -1649,14 +1631,9 @@ impl Index { let embedding_configs = self.embedding_configs(rtxn)?; for config in embedding_configs { let embedder_id = self.embedder_category_id.get(rtxn, &config.name)?.unwrap(); - let embeddings = self - .arroy_readers(rtxn, embedder_id, config.config.quantized()) - .map_while(|reader| { - reader - .and_then(|r| r.item_vector(rtxn, docid).map_err(|e| e.into())) - .transpose() - }) - .collect::>>()?; + let reader = + ArroyWrapper::new(self.vector_arroy, embedder_id, config.config.quantized()); + let embeddings = reader.item_vectors(rtxn, docid)?; res.insert(config.name.to_owned(), embeddings); } Ok(res) diff --git a/milli/src/search/new/vector_sort.rs b/milli/src/search/new/vector_sort.rs index de1dacbe7..90377c09c 100644 --- a/milli/src/search/new/vector_sort.rs +++ b/milli/src/search/new/vector_sort.rs @@ -1,11 +1,10 @@ use std::iter::FromIterator; -use ordered_float::OrderedFloat; use roaring::RoaringBitmap; use super::ranking_rules::{RankingRule, RankingRuleOutput, RankingRuleQueryTrait}; use crate::score_details::{self, ScoreDetails}; -use crate::vector::{DistributionShift, Embedder}; +use crate::vector::{ArroyWrapper, DistributionShift, Embedder}; use crate::{DocumentId, Result, SearchContext, SearchLogger}; pub struct VectorSort { @@ -53,14 +52,9 @@ impl VectorSort { vector_candidates: &RoaringBitmap, ) -> Result<()> { let target = &self.target; - let mut results = Vec::new(); - for reader in ctx.index.arroy_readers(ctx.txn, self.embedder_index, self.quantized) { - let nns_by_vector = - reader?.nns_by_vector(ctx.txn, target, self.limit, Some(vector_candidates))?; - results.extend(nns_by_vector.into_iter()); - } - results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); + let reader = ArroyWrapper::new(ctx.index.vector_arroy, self.embedder_index, self.quantized); + let results = reader.nns_by_vector(ctx.txn, target, self.limit, Some(vector_candidates))?; self.cached_sorted_docids = results.into_iter(); Ok(()) diff --git a/milli/src/search/similar.rs b/milli/src/search/similar.rs index e408c94b1..5547d800e 100644 --- a/milli/src/search/similar.rs +++ b/milli/src/search/similar.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use ordered_float::OrderedFloat; use roaring::RoaringBitmap; use crate::score_details::{self, ScoreDetails}; diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index bb2cfe56c..763f30d0f 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -990,27 +990,24 @@ impl<'a, 'i> Transform<'a, 'i> { None }; - let readers: Result, &RoaringBitmap)>> = settings_diff + let readers: BTreeMap<&str, (ArroyWrapper, &RoaringBitmap)> = settings_diff .embedding_config_updates .iter() .filter_map(|(name, action)| { if let Some(WriteBackToDocuments { embedder_id, user_provided }) = action.write_back() { - let readers: Result> = self - .index - .arroy_readers(wtxn, *embedder_id, action.was_quantized) - .collect(); - match readers { - Ok(readers) => Some(Ok((name.as_str(), (readers, user_provided)))), - Err(error) => Some(Err(error)), - } + let reader = ArroyWrapper::new( + self.index.vector_arroy, + *embedder_id, + action.was_quantized, + ); + Some((name.as_str(), (reader, user_provided))) } else { None } }) .collect(); - let readers = readers?; let old_vectors_fid = settings_diff .old @@ -1048,34 +1045,24 @@ impl<'a, 'i> Transform<'a, 'i> { arroy::Error, > = readers .iter() - .filter_map(|(name, (readers, user_provided))| { + .filter_map(|(name, (reader, user_provided))| { if !user_provided.contains(docid) { return None; } - let mut vectors = Vec::new(); - for reader in readers { - let Some(vector) = reader.item_vector(wtxn, docid).transpose() else { - break; - }; - - match vector { - Ok(vector) => vectors.push(vector), - Err(error) => return Some(Err(error)), - } + match reader.item_vectors(wtxn, docid) { + Ok(vectors) if vectors.is_empty() => None, + Ok(vectors) => Some(Ok(( + name.to_string(), + serde_json::to_value(ExplicitVectors { + embeddings: Some( + VectorOrArrayOfVectors::from_array_of_vectors(vectors), + ), + regenerate: false, + }) + .unwrap(), + ))), + Err(e) => Some(Err(e)), } - if vectors.is_empty() { - return None; - } - Some(Ok(( - name.to_string(), - serde_json::to_value(ExplicitVectors { - embeddings: Some(VectorOrArrayOfVectors::from_array_of_vectors( - vectors, - )), - regenerate: false, - }) - .unwrap(), - ))) }) .collect(); @@ -1104,11 +1091,9 @@ impl<'a, 'i> Transform<'a, 'i> { } // delete all vectors from the embedders that need removal - for (_, (readers, _)) in readers { - for reader in readers { - let dimensions = reader.dimensions(wtxn)?; - reader.clear(wtxn, dimensions)?; - } + for (_, (reader, _)) in readers { + let dimensions = reader.dimensions(wtxn)?; + reader.clear(wtxn, dimensions)?; } let grenad_params = GrenadParameters { diff --git a/milli/src/vector/mod.rs b/milli/src/vector/mod.rs index 54765cfef..b5b6cd953 100644 --- a/milli/src/vector/mod.rs +++ b/milli/src/vector/mod.rs @@ -45,6 +45,20 @@ impl ArroyWrapper { self.index } + fn readers<'a, D: arroy::Distance>( + &'a self, + rtxn: &'a RoTxn<'a>, + db: arroy::Database, + ) -> impl Iterator, arroy::Error>> + 'a { + arroy_db_range_for_embedder(self.index).map_while(move |index| { + match arroy::Reader::open(rtxn, index, db) { + Ok(reader) => Some(Ok(reader)), + Err(arroy::Error::MissingMetadata(_)) => None, + Err(e) => Some(Err(e)), + } + }) + } + pub fn dimensions(&self, rtxn: &RoTxn) -> Result { let first_id = arroy_db_range_for_embedder(self.index).next().unwrap(); if self.quantized { @@ -97,6 +111,7 @@ impl ArroyWrapper { Ok(()) } + /// Overwrite all the embeddings associated to the index and item id. pub fn add_items( &self, wtxn: &mut RwTxn, @@ -116,30 +131,41 @@ impl ArroyWrapper { Ok(()) } + /// Add one document int for this index where we can find an empty spot. pub fn add_item( &self, wtxn: &mut RwTxn, item_id: arroy::ItemId, vector: &[f32], + ) -> Result<(), arroy::Error> { + if self.quantized { + self._add_item(wtxn, self.quantized_db(), item_id, vector) + } else { + self._add_item(wtxn, self.angular_db(), item_id, vector) + } + } + + fn _add_item( + &self, + wtxn: &mut RwTxn, + db: arroy::Database, + item_id: arroy::ItemId, + vector: &[f32], ) -> Result<(), arroy::Error> { let dimension = vector.len(); for index in arroy_db_range_for_embedder(self.index) { - if self.quantized { - let writer = arroy::Writer::new(self.quantized_db(), index, dimension); - if !writer.contains_item(wtxn, item_id)? { - writer.add_item(wtxn, item_id, &vector)?; - break; - } - } else { - arroy::Writer::new(self.angular_db(), index, dimension) - .add_item(wtxn, item_id, vector)? + let writer = arroy::Writer::new(db, index, dimension); + if !writer.contains_item(wtxn, item_id)? { + writer.add_item(wtxn, item_id, vector)?; + break; } } - Ok(()) } + /// Delete an item from the index. It **does not** take care of fixing the hole + /// made after deleting the item. pub fn del_item_raw( &self, wtxn: &mut RwTxn, @@ -163,36 +189,39 @@ impl ArroyWrapper { Ok(false) } + /// Delete one item. pub fn del_item( &self, wtxn: &mut RwTxn, - itemid: arroy::ItemId, + item_id: arroy::ItemId, + vector: &[f32], + ) -> Result { + if self.quantized { + self._del_item(wtxn, self.quantized_db(), item_id, vector) + } else { + self._del_item(wtxn, self.angular_db(), item_id, vector) + } + } + + fn _del_item( + &self, + wtxn: &mut RwTxn, + db: arroy::Database, + item_id: arroy::ItemId, vector: &[f32], ) -> Result { let dimension = vector.len(); let mut deleted_index = None; for index in arroy_db_range_for_embedder(self.index) { - if self.quantized { - let writer = arroy::Writer::new(self.quantized_db(), index, dimension); - let Some(candidate) = writer.item_vector(wtxn, itemid)? else { - // uses invariant: vectors are packed in the first writers. - break; - }; - if candidate == vector { - writer.del_item(wtxn, itemid)?; - deleted_index = Some(index); - } - } else { - let writer = arroy::Writer::new(self.angular_db(), index, dimension); - let Some(candidate) = writer.item_vector(wtxn, itemid)? else { - // uses invariant: vectors are packed in the first writers. - break; - }; - if candidate == vector { - writer.del_item(wtxn, itemid)?; - deleted_index = Some(index); - } + let writer = arroy::Writer::new(db, index, dimension); + let Some(candidate) = writer.item_vector(wtxn, item_id)? else { + // uses invariant: vectors are packed in the first writers. + break; + }; + if candidate == vector { + writer.del_item(wtxn, item_id)?; + deleted_index = Some(index); } } @@ -200,34 +229,18 @@ impl ArroyWrapper { if let Some(deleted_index) = deleted_index { let mut last_index_with_a_vector = None; for index in arroy_db_range_for_embedder(self.index).skip(deleted_index as usize) { - if self.quantized { - let writer = arroy::Writer::new(self.quantized_db(), index, dimension); - let Some(candidate) = writer.item_vector(wtxn, itemid)? else { - break; - }; - last_index_with_a_vector = Some((index, candidate)); - } else { - let writer = arroy::Writer::new(self.angular_db(), index, dimension); - let Some(candidate) = writer.item_vector(wtxn, itemid)? else { - break; - }; - last_index_with_a_vector = Some((index, candidate)); - } + let writer = arroy::Writer::new(db, index, dimension); + let Some(candidate) = writer.item_vector(wtxn, item_id)? else { + break; + }; + last_index_with_a_vector = Some((index, candidate)); } if let Some((last_index, vector)) = last_index_with_a_vector { - if self.quantized { - // unwrap: computed the index from the list of writers - let writer = arroy::Writer::new(self.quantized_db(), last_index, dimension); - writer.del_item(wtxn, itemid)?; - let writer = arroy::Writer::new(self.quantized_db(), deleted_index, dimension); - writer.add_item(wtxn, itemid, &vector)?; - } else { - // unwrap: computed the index from the list of writers - let writer = arroy::Writer::new(self.angular_db(), last_index, dimension); - writer.del_item(wtxn, itemid)?; - let writer = arroy::Writer::new(self.angular_db(), deleted_index, dimension); - writer.add_item(wtxn, itemid, &vector)?; - } + // unwrap: computed the index from the list of writers + let writer = arroy::Writer::new(db, last_index, dimension); + writer.del_item(wtxn, item_id)?; + let writer = arroy::Writer::new(db, deleted_index, dimension); + writer.add_item(wtxn, item_id, &vector)?; } } Ok(deleted_index.is_some()) @@ -284,17 +297,26 @@ impl ArroyWrapper { item: ItemId, limit: usize, filter: Option<&RoaringBitmap>, + ) -> Result, arroy::Error> { + if self.quantized { + self._nns_by_item(rtxn, self.quantized_db(), item, limit, filter) + } else { + self._nns_by_item(rtxn, self.angular_db(), item, limit, filter) + } + } + + fn _nns_by_item( + &self, + rtxn: &RoTxn, + db: arroy::Database, + item: ItemId, + limit: usize, + filter: Option<&RoaringBitmap>, ) -> Result, arroy::Error> { let mut results = Vec::new(); - for index in arroy_db_range_for_embedder(self.index) { - let ret = if self.quantized { - arroy::Reader::open(rtxn, index, self.quantized_db())? - .nns_by_item(rtxn, item, limit, None, None, filter)? - } else { - arroy::Reader::open(rtxn, index, self.angular_db())? - .nns_by_item(rtxn, item, limit, None, None, filter)? - }; + for reader in self.readers(rtxn, db) { + let ret = reader?.nns_by_item(rtxn, item, limit, None, None, filter)?; if let Some(mut ret) = ret { results.append(&mut ret); } else { @@ -302,27 +324,35 @@ impl ArroyWrapper { } } results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); - Ok(results) } pub fn nns_by_vector( &self, - txn: &RoTxn, - item: &[f32], + rtxn: &RoTxn, + vector: &[f32], + limit: usize, + filter: Option<&RoaringBitmap>, + ) -> Result, arroy::Error> { + if self.quantized { + self._nns_by_vector(rtxn, self.quantized_db(), vector, limit, filter) + } else { + self._nns_by_vector(rtxn, self.angular_db(), vector, limit, filter) + } + } + + fn _nns_by_vector( + &self, + rtxn: &RoTxn, + db: arroy::Database, + vector: &[f32], limit: usize, filter: Option<&RoaringBitmap>, ) -> Result, arroy::Error> { let mut results = Vec::new(); - for index in arroy_db_range_for_embedder(self.index) { - let mut ret = if self.quantized { - arroy::Reader::open(txn, index, self.quantized_db())? - .nns_by_vector(txn, item, limit, None, None, filter)? - } else { - arroy::Reader::open(txn, index, self.angular_db())? - .nns_by_vector(txn, item, limit, None, None, filter)? - }; + for reader in self.readers(rtxn, db) { + let mut ret = reader?.nns_by_vector(rtxn, vector, limit, None, None, filter)?; results.append(&mut ret); } @@ -331,18 +361,27 @@ impl ArroyWrapper { Ok(results) } - pub fn item_vector(&self, rtxn: &RoTxn, docid: u32) -> Result>, arroy::Error> { - for index in arroy_db_range_for_embedder(self.index) { - let ret = if self.quantized { - arroy::Reader::open(rtxn, index, self.quantized_db())?.item_vector(rtxn, docid)? - } else { - arroy::Reader::open(rtxn, index, self.angular_db())?.item_vector(rtxn, docid)? - }; - if ret.is_some() { - return Ok(ret); + pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> Result>, arroy::Error> { + let mut vectors = Vec::new(); + + if self.quantized { + for reader in self.readers(rtxn, self.quantized_db()) { + if let Some(vec) = reader?.item_vector(rtxn, item_id)? { + vectors.push(vec); + } else { + break; + } + } + } else { + for reader in self.readers(rtxn, self.angular_db()) { + if let Some(vec) = reader?.item_vector(rtxn, item_id)? { + vectors.push(vec); + } else { + break; + } } } - Ok(None) + Ok(vectors) } fn angular_db(&self) -> arroy::Database {