mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-04 12:27:13 +02:00
ArroyWrapper changes
This commit is contained in:
parent
0b5bc41b79
commit
836ae19bec
1 changed files with 180 additions and 61 deletions
|
@ -15,6 +15,8 @@ use utoipa::ToSchema;
|
||||||
use self::error::{EmbedError, NewEmbedderError};
|
use self::error::{EmbedError, NewEmbedderError};
|
||||||
use crate::progress::{EmbedderStats, Progress};
|
use crate::progress::{EmbedderStats, Progress};
|
||||||
use crate::prompt::{Prompt, PromptData};
|
use crate::prompt::{Prompt, PromptData};
|
||||||
|
use crate::vector::composite::SubEmbedderOptions;
|
||||||
|
use crate::vector::json_template::JsonTemplate;
|
||||||
use crate::ThreadPoolNoAbort;
|
use crate::ThreadPoolNoAbort;
|
||||||
|
|
||||||
pub mod composite;
|
pub mod composite;
|
||||||
|
@ -63,7 +65,7 @@ impl ArroyWrapper {
|
||||||
rtxn: &'a RoTxn<'a>,
|
rtxn: &'a RoTxn<'a>,
|
||||||
db: arroy::Database<D>,
|
db: arroy::Database<D>,
|
||||||
) -> impl Iterator<Item = Result<arroy::Reader<'a, D>, arroy::Error>> + 'a {
|
) -> impl Iterator<Item = Result<arroy::Reader<'a, D>, arroy::Error>> + 'a {
|
||||||
arroy_db_range_for_embedder(self.embedder_index).map_while(move |index| {
|
arroy_store_range_for_embedder(self.embedder_index).filter_map(move |index| {
|
||||||
match arroy::Reader::open(rtxn, index, db) {
|
match arroy::Reader::open(rtxn, index, db) {
|
||||||
Ok(reader) => match reader.is_empty(rtxn) {
|
Ok(reader) => match reader.is_empty(rtxn) {
|
||||||
Ok(false) => Some(Ok(reader)),
|
Ok(false) => Some(Ok(reader)),
|
||||||
|
@ -76,12 +78,57 @@ impl ArroyWrapper {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn dimensions(&self, rtxn: &RoTxn) -> Result<usize, arroy::Error> {
|
/// The item ids that are present in the store specified by its id.
|
||||||
let first_id = arroy_db_range_for_embedder(self.embedder_index).next().unwrap();
|
///
|
||||||
|
/// The ids are accessed via a lambda to avoid lifetime shenanigans.
|
||||||
|
pub fn items_in_store<F, O>(
|
||||||
|
&self,
|
||||||
|
rtxn: &RoTxn,
|
||||||
|
store_id: u8,
|
||||||
|
with_items: F,
|
||||||
|
) -> Result<O, arroy::Error>
|
||||||
|
where
|
||||||
|
F: FnOnce(&RoaringBitmap) -> O,
|
||||||
|
{
|
||||||
if self.quantized {
|
if self.quantized {
|
||||||
Ok(arroy::Reader::open(rtxn, first_id, self.quantized_db())?.dimensions())
|
self._items_in_store(rtxn, self.quantized_db(), store_id, with_items)
|
||||||
} else {
|
} else {
|
||||||
Ok(arroy::Reader::open(rtxn, first_id, self.angular_db())?.dimensions())
|
self._items_in_store(rtxn, self.angular_db(), store_id, with_items)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn _items_in_store<D: arroy::Distance, F, O>(
|
||||||
|
&self,
|
||||||
|
rtxn: &RoTxn,
|
||||||
|
db: arroy::Database<D>,
|
||||||
|
store_id: u8,
|
||||||
|
with_items: F,
|
||||||
|
) -> Result<O, arroy::Error>
|
||||||
|
where
|
||||||
|
F: FnOnce(&RoaringBitmap) -> O,
|
||||||
|
{
|
||||||
|
let index = arroy_store_for_embedder(self.embedder_index, store_id);
|
||||||
|
let reader = arroy::Reader::open(rtxn, index, db);
|
||||||
|
match reader {
|
||||||
|
Ok(reader) => Ok(with_items(reader.item_ids())),
|
||||||
|
Err(arroy::Error::MissingMetadata(_)) => Ok(with_items(&RoaringBitmap::new())),
|
||||||
|
Err(err) => Err(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn dimensions(&self, rtxn: &RoTxn) -> Result<Option<usize>, arroy::Error> {
|
||||||
|
if self.quantized {
|
||||||
|
Ok(self
|
||||||
|
.readers(rtxn, self.quantized_db())
|
||||||
|
.next()
|
||||||
|
.transpose()?
|
||||||
|
.map(|reader| reader.dimensions()))
|
||||||
|
} else {
|
||||||
|
Ok(self
|
||||||
|
.readers(rtxn, self.angular_db())
|
||||||
|
.next()
|
||||||
|
.transpose()?
|
||||||
|
.map(|reader| reader.dimensions()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,13 +143,13 @@ impl ArroyWrapper {
|
||||||
arroy_memory: Option<usize>,
|
arroy_memory: Option<usize>,
|
||||||
cancel: &(impl Fn() -> bool + Sync + Send),
|
cancel: &(impl Fn() -> bool + Sync + Send),
|
||||||
) -> Result<(), arroy::Error> {
|
) -> Result<(), arroy::Error> {
|
||||||
for index in arroy_db_range_for_embedder(self.embedder_index) {
|
for index in arroy_store_range_for_embedder(self.embedder_index) {
|
||||||
if self.quantized {
|
if self.quantized {
|
||||||
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
|
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
|
||||||
if writer.need_build(wtxn)? {
|
if writer.need_build(wtxn)? {
|
||||||
writer.builder(rng).build(wtxn)?
|
writer.builder(rng).build(wtxn)?
|
||||||
} else if writer.is_empty(wtxn)? {
|
} else if writer.is_empty(wtxn)? {
|
||||||
break;
|
continue;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
|
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
|
||||||
|
@ -127,7 +174,7 @@ impl ArroyWrapper {
|
||||||
.cancel(cancel)
|
.cancel(cancel)
|
||||||
.build(wtxn)?;
|
.build(wtxn)?;
|
||||||
} else if writer.is_empty(wtxn)? {
|
} else if writer.is_empty(wtxn)? {
|
||||||
break;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -146,7 +193,7 @@ impl ArroyWrapper {
|
||||||
) -> Result<(), arroy::Error> {
|
) -> Result<(), arroy::Error> {
|
||||||
let dimension = embeddings.dimension();
|
let dimension = embeddings.dimension();
|
||||||
for (index, vector) in
|
for (index, vector) in
|
||||||
arroy_db_range_for_embedder(self.embedder_index).zip(embeddings.iter())
|
arroy_store_range_for_embedder(self.embedder_index).zip(embeddings.iter())
|
||||||
{
|
{
|
||||||
if self.quantized {
|
if self.quantized {
|
||||||
arroy::Writer::new(self.quantized_db(), index, dimension)
|
arroy::Writer::new(self.quantized_db(), index, dimension)
|
||||||
|
@ -182,7 +229,7 @@ impl ArroyWrapper {
|
||||||
) -> Result<(), arroy::Error> {
|
) -> Result<(), arroy::Error> {
|
||||||
let dimension = vector.len();
|
let dimension = vector.len();
|
||||||
|
|
||||||
for index in arroy_db_range_for_embedder(self.embedder_index) {
|
for index in arroy_store_range_for_embedder(self.embedder_index) {
|
||||||
let writer = arroy::Writer::new(db, index, dimension);
|
let writer = arroy::Writer::new(db, index, dimension);
|
||||||
if !writer.contains_item(wtxn, item_id)? {
|
if !writer.contains_item(wtxn, item_id)? {
|
||||||
writer.add_item(wtxn, item_id, vector)?;
|
writer.add_item(wtxn, item_id, vector)?;
|
||||||
|
@ -192,6 +239,38 @@ impl ArroyWrapper {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Add a vector associated with a document in store specified by its id.
|
||||||
|
///
|
||||||
|
/// Any existing vector associated with the document in the store will be replaced by the new vector.
|
||||||
|
pub fn add_item_in_store(
|
||||||
|
&self,
|
||||||
|
wtxn: &mut RwTxn,
|
||||||
|
item_id: arroy::ItemId,
|
||||||
|
store_id: u8,
|
||||||
|
vector: &[f32],
|
||||||
|
) -> Result<(), arroy::Error> {
|
||||||
|
if self.quantized {
|
||||||
|
self._add_item_in_store(wtxn, self.quantized_db(), item_id, store_id, vector)
|
||||||
|
} else {
|
||||||
|
self._add_item_in_store(wtxn, self.angular_db(), item_id, store_id, vector)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn _add_item_in_store<D: arroy::Distance>(
|
||||||
|
&self,
|
||||||
|
wtxn: &mut RwTxn,
|
||||||
|
db: arroy::Database<D>,
|
||||||
|
item_id: arroy::ItemId,
|
||||||
|
store_id: u8,
|
||||||
|
vector: &[f32],
|
||||||
|
) -> Result<(), arroy::Error> {
|
||||||
|
let dimension = vector.len();
|
||||||
|
|
||||||
|
let index = arroy_store_for_embedder(self.embedder_index, store_id);
|
||||||
|
let writer = arroy::Writer::new(db, index, dimension);
|
||||||
|
writer.add_item(wtxn, item_id, vector)
|
||||||
|
}
|
||||||
|
|
||||||
/// Delete all embeddings from a specific `item_id`
|
/// Delete all embeddings from a specific `item_id`
|
||||||
pub fn del_items(
|
pub fn del_items(
|
||||||
&self,
|
&self,
|
||||||
|
@ -199,24 +278,84 @@ impl ArroyWrapper {
|
||||||
dimension: usize,
|
dimension: usize,
|
||||||
item_id: arroy::ItemId,
|
item_id: arroy::ItemId,
|
||||||
) -> Result<(), arroy::Error> {
|
) -> Result<(), arroy::Error> {
|
||||||
for index in arroy_db_range_for_embedder(self.embedder_index) {
|
for index in arroy_store_range_for_embedder(self.embedder_index) {
|
||||||
if self.quantized {
|
if self.quantized {
|
||||||
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
|
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
|
||||||
if !writer.del_item(wtxn, item_id)? {
|
writer.del_item(wtxn, item_id)?;
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
|
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
|
||||||
if !writer.del_item(wtxn, item_id)? {
|
writer.del_item(wtxn, item_id)?;
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Delete one item.
|
/// Removes the item specified by its id from the store specified by its id.
|
||||||
|
///
|
||||||
|
/// Returns whether the item was removed.
|
||||||
|
///
|
||||||
|
/// # Warning
|
||||||
|
///
|
||||||
|
/// - This function will silently fail to remove the item if used against an arroy database that was never built.
|
||||||
|
pub fn del_item_in_store(
|
||||||
|
&self,
|
||||||
|
wtxn: &mut RwTxn,
|
||||||
|
item_id: arroy::ItemId,
|
||||||
|
store_id: u8,
|
||||||
|
dimensions: usize,
|
||||||
|
) -> Result<bool, arroy::Error> {
|
||||||
|
if self.quantized {
|
||||||
|
self._del_item_in_store(wtxn, self.quantized_db(), item_id, store_id, dimensions)
|
||||||
|
} else {
|
||||||
|
self._del_item_in_store(wtxn, self.angular_db(), item_id, store_id, dimensions)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn _del_item_in_store<D: arroy::Distance>(
|
||||||
|
&self,
|
||||||
|
wtxn: &mut RwTxn,
|
||||||
|
db: arroy::Database<D>,
|
||||||
|
item_id: arroy::ItemId,
|
||||||
|
store_id: u8,
|
||||||
|
dimensions: usize,
|
||||||
|
) -> Result<bool, arroy::Error> {
|
||||||
|
let index = arroy_store_for_embedder(self.embedder_index, store_id);
|
||||||
|
let writer = arroy::Writer::new(db, index, dimensions);
|
||||||
|
writer.del_item(wtxn, item_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Removes all items from the store specified by its id.
|
||||||
|
///
|
||||||
|
/// # Warning
|
||||||
|
///
|
||||||
|
/// - This function will silently fail to remove the items if used against an arroy database that was never built.
|
||||||
|
pub fn clear_store(
|
||||||
|
&self,
|
||||||
|
wtxn: &mut RwTxn,
|
||||||
|
store_id: u8,
|
||||||
|
dimensions: usize,
|
||||||
|
) -> Result<(), arroy::Error> {
|
||||||
|
if self.quantized {
|
||||||
|
self._clear_store(wtxn, self.quantized_db(), store_id, dimensions)
|
||||||
|
} else {
|
||||||
|
self._clear_store(wtxn, self.angular_db(), store_id, dimensions)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn _clear_store<D: arroy::Distance>(
|
||||||
|
&self,
|
||||||
|
wtxn: &mut RwTxn,
|
||||||
|
db: arroy::Database<D>,
|
||||||
|
store_id: u8,
|
||||||
|
dimensions: usize,
|
||||||
|
) -> Result<(), arroy::Error> {
|
||||||
|
let index = arroy_store_for_embedder(self.embedder_index, store_id);
|
||||||
|
let writer = arroy::Writer::new(db, index, dimensions);
|
||||||
|
writer.clear(wtxn)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Delete one item from its value.
|
||||||
pub fn del_item(
|
pub fn del_item(
|
||||||
&self,
|
&self,
|
||||||
wtxn: &mut RwTxn,
|
wtxn: &mut RwTxn,
|
||||||
|
@ -238,54 +377,31 @@ impl ArroyWrapper {
|
||||||
vector: &[f32],
|
vector: &[f32],
|
||||||
) -> Result<bool, arroy::Error> {
|
) -> Result<bool, arroy::Error> {
|
||||||
let dimension = vector.len();
|
let dimension = vector.len();
|
||||||
let mut deleted_index = None;
|
|
||||||
|
|
||||||
for index in arroy_db_range_for_embedder(self.embedder_index) {
|
for index in arroy_store_range_for_embedder(self.embedder_index) {
|
||||||
let writer = arroy::Writer::new(db, index, dimension);
|
let writer = arroy::Writer::new(db, index, dimension);
|
||||||
let Some(candidate) = writer.item_vector(wtxn, item_id)? else {
|
let Some(candidate) = writer.item_vector(wtxn, item_id)? else {
|
||||||
// uses invariant: vectors are packed in the first writers.
|
continue;
|
||||||
break;
|
|
||||||
};
|
};
|
||||||
if candidate == vector {
|
if candidate == vector {
|
||||||
writer.del_item(wtxn, item_id)?;
|
return writer.del_item(wtxn, item_id);
|
||||||
deleted_index = Some(index);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(false)
|
||||||
// 🥲 enforce invariant: vectors are packed in the first writers.
|
|
||||||
if let Some(deleted_index) = deleted_index {
|
|
||||||
let mut last_index_with_a_vector = None;
|
|
||||||
for index in
|
|
||||||
arroy_db_range_for_embedder(self.embedder_index).skip(deleted_index as usize)
|
|
||||||
{
|
|
||||||
let writer = arroy::Writer::new(db, index, dimension);
|
|
||||||
let Some(candidate) = writer.item_vector(wtxn, item_id)? else {
|
|
||||||
break;
|
|
||||||
};
|
|
||||||
last_index_with_a_vector = Some((index, candidate));
|
|
||||||
}
|
|
||||||
if let Some((last_index, vector)) = last_index_with_a_vector {
|
|
||||||
let writer = arroy::Writer::new(db, last_index, dimension);
|
|
||||||
writer.del_item(wtxn, item_id)?;
|
|
||||||
let writer = arroy::Writer::new(db, deleted_index, dimension);
|
|
||||||
writer.add_item(wtxn, item_id, &vector)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(deleted_index.is_some())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn clear(&self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), arroy::Error> {
|
pub fn clear(&self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), arroy::Error> {
|
||||||
for index in arroy_db_range_for_embedder(self.embedder_index) {
|
for index in arroy_store_range_for_embedder(self.embedder_index) {
|
||||||
if self.quantized {
|
if self.quantized {
|
||||||
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
|
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
|
||||||
if writer.is_empty(wtxn)? {
|
if writer.is_empty(wtxn)? {
|
||||||
break;
|
continue;
|
||||||
}
|
}
|
||||||
writer.clear(wtxn)?;
|
writer.clear(wtxn)?;
|
||||||
} else {
|
} else {
|
||||||
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
|
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
|
||||||
if writer.is_empty(wtxn)? {
|
if writer.is_empty(wtxn)? {
|
||||||
break;
|
continue;
|
||||||
}
|
}
|
||||||
writer.clear(wtxn)?;
|
writer.clear(wtxn)?;
|
||||||
}
|
}
|
||||||
|
@ -299,17 +415,17 @@ impl ArroyWrapper {
|
||||||
dimension: usize,
|
dimension: usize,
|
||||||
item: arroy::ItemId,
|
item: arroy::ItemId,
|
||||||
) -> Result<bool, arroy::Error> {
|
) -> Result<bool, arroy::Error> {
|
||||||
for index in arroy_db_range_for_embedder(self.embedder_index) {
|
for index in arroy_store_range_for_embedder(self.embedder_index) {
|
||||||
let contains = if self.quantized {
|
let contains = if self.quantized {
|
||||||
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
|
let writer = arroy::Writer::new(self.quantized_db(), index, dimension);
|
||||||
if writer.is_empty(rtxn)? {
|
if writer.is_empty(rtxn)? {
|
||||||
break;
|
continue;
|
||||||
}
|
}
|
||||||
writer.contains_item(rtxn, item)?
|
writer.contains_item(rtxn, item)?
|
||||||
} else {
|
} else {
|
||||||
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
|
let writer = arroy::Writer::new(self.angular_db(), index, dimension);
|
||||||
if writer.is_empty(rtxn)? {
|
if writer.is_empty(rtxn)? {
|
||||||
break;
|
continue;
|
||||||
}
|
}
|
||||||
writer.contains_item(rtxn, item)?
|
writer.contains_item(rtxn, item)?
|
||||||
};
|
};
|
||||||
|
@ -348,13 +464,14 @@ impl ArroyWrapper {
|
||||||
let reader = reader?;
|
let reader = reader?;
|
||||||
let mut searcher = reader.nns(limit);
|
let mut searcher = reader.nns(limit);
|
||||||
if let Some(filter) = filter {
|
if let Some(filter) = filter {
|
||||||
|
if reader.item_ids().is_disjoint(filter) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
searcher.candidates(filter);
|
searcher.candidates(filter);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(mut ret) = searcher.by_item(rtxn, item)? {
|
if let Some(mut ret) = searcher.by_item(rtxn, item)? {
|
||||||
results.append(&mut ret);
|
results.append(&mut ret);
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance));
|
results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance));
|
||||||
|
@ -389,6 +506,9 @@ impl ArroyWrapper {
|
||||||
let reader = reader?;
|
let reader = reader?;
|
||||||
let mut searcher = reader.nns(limit);
|
let mut searcher = reader.nns(limit);
|
||||||
if let Some(filter) = filter {
|
if let Some(filter) = filter {
|
||||||
|
if reader.item_ids().is_disjoint(filter) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
searcher.candidates(filter);
|
searcher.candidates(filter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -407,16 +527,12 @@ impl ArroyWrapper {
|
||||||
for reader in self.readers(rtxn, self.quantized_db()) {
|
for reader in self.readers(rtxn, self.quantized_db()) {
|
||||||
if let Some(vec) = reader?.item_vector(rtxn, item_id)? {
|
if let Some(vec) = reader?.item_vector(rtxn, item_id)? {
|
||||||
vectors.push(vec);
|
vectors.push(vec);
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for reader in self.readers(rtxn, self.angular_db()) {
|
for reader in self.readers(rtxn, self.angular_db()) {
|
||||||
if let Some(vec) = reader?.item_vector(rtxn, item_id)? {
|
if let Some(vec) = reader?.item_vector(rtxn, item_id)? {
|
||||||
vectors.push(vec);
|
vectors.push(vec);
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -989,8 +1105,11 @@ pub const fn is_cuda_enabled() -> bool {
|
||||||
cfg!(feature = "cuda")
|
cfg!(feature = "cuda")
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn arroy_db_range_for_embedder(embedder_id: u8) -> impl Iterator<Item = u16> {
|
fn arroy_store_range_for_embedder(embedder_id: u8) -> impl Iterator<Item = u16> {
|
||||||
let embedder_id = (embedder_id as u16) << 8;
|
(0..=u8::MAX).map(move |store_id| arroy_store_for_embedder(embedder_id, store_id))
|
||||||
|
}
|
||||||
(0..=u8::MAX).map(move |k| embedder_id | (k as u16))
|
|
||||||
|
fn arroy_store_for_embedder(embedder_id: u8, store_id: u8) -> u16 {
|
||||||
|
let embedder_id = (embedder_id as u16) << 8;
|
||||||
|
embedder_id | (store_id as u16)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue