mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-04-14 02:21:41 +02:00
Remove the Vector variant and use the Vectors
This commit is contained in:
parent
096a28656e
commit
b57dd5c58e
@ -100,7 +100,6 @@ pub enum ReceiverAction {
|
|||||||
/// Wake up, you have frames to read for the BBQueue buffers.
|
/// Wake up, you have frames to read for the BBQueue buffers.
|
||||||
WakeUp,
|
WakeUp,
|
||||||
LargeEntry(LargeEntry),
|
LargeEntry(LargeEntry),
|
||||||
LargeVector(LargeVector),
|
|
||||||
LargeVectors(LargeVectors),
|
LargeVectors(LargeVectors),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -120,24 +119,6 @@ pub struct LargeEntry {
|
|||||||
pub value: Mmap,
|
pub value: Mmap,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// When an embedding is larger than the available
|
|
||||||
/// BBQueue space it arrives here.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct LargeVector {
|
|
||||||
/// The document id associated to the large embedding.
|
|
||||||
pub docid: DocumentId,
|
|
||||||
/// The embedder id in which to insert the large embedding.
|
|
||||||
pub embedder_id: u8,
|
|
||||||
/// The large embedding that must be written.
|
|
||||||
pub embedding: Mmap,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl LargeVector {
|
|
||||||
pub fn read_embedding(&self) -> &[f32] {
|
|
||||||
bytemuck::cast_slice(&self.embedding)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// When embeddings are larger than the available
|
/// When embeddings are larger than the available
|
||||||
/// BBQueue space it arrives here.
|
/// BBQueue space it arrives here.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -225,35 +206,6 @@ pub struct ArroyDeleteVector {
|
|||||||
pub docid: DocumentId,
|
pub docid: DocumentId,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)]
|
|
||||||
#[repr(C)]
|
|
||||||
/// The embedding is the remaining space and represents a non-aligned [f32].
|
|
||||||
pub struct ArroySetVector {
|
|
||||||
pub docid: DocumentId,
|
|
||||||
pub embedder_id: u8,
|
|
||||||
_padding: [u8; 3],
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ArroySetVector {
|
|
||||||
pub fn read_embedding_into_vec<'v>(
|
|
||||||
&self,
|
|
||||||
frame: &FrameGrantR<'_>,
|
|
||||||
vec: &'v mut Vec<f32>,
|
|
||||||
) -> Option<&'v [f32]> {
|
|
||||||
vec.clear();
|
|
||||||
let skip = EntryHeader::variant_size() + mem::size_of::<Self>();
|
|
||||||
let bytes = &frame[skip..];
|
|
||||||
if bytes.is_empty() {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
bytes.chunks_exact(mem::size_of::<f32>()).for_each(|bytes| {
|
|
||||||
let f = bytes.try_into().map(f32::from_ne_bytes).unwrap();
|
|
||||||
vec.push(f);
|
|
||||||
});
|
|
||||||
Some(&vec[..])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)]
|
#[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)]
|
||||||
#[repr(C)]
|
#[repr(C)]
|
||||||
/// The embeddings are in the remaining space and represents
|
/// The embeddings are in the remaining space and represents
|
||||||
@ -290,7 +242,6 @@ impl ArroySetVectors {
|
|||||||
pub enum EntryHeader {
|
pub enum EntryHeader {
|
||||||
DbOperation(DbOperation),
|
DbOperation(DbOperation),
|
||||||
ArroyDeleteVector(ArroyDeleteVector),
|
ArroyDeleteVector(ArroyDeleteVector),
|
||||||
ArroySetVector(ArroySetVector),
|
|
||||||
ArroySetVectors(ArroySetVectors),
|
ArroySetVectors(ArroySetVectors),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -303,8 +254,7 @@ impl EntryHeader {
|
|||||||
match self {
|
match self {
|
||||||
EntryHeader::DbOperation(_) => 0,
|
EntryHeader::DbOperation(_) => 0,
|
||||||
EntryHeader::ArroyDeleteVector(_) => 1,
|
EntryHeader::ArroyDeleteVector(_) => 1,
|
||||||
EntryHeader::ArroySetVector(_) => 2,
|
EntryHeader::ArroySetVectors(_) => 2,
|
||||||
EntryHeader::ArroySetVectors(_) => 3,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -323,11 +273,6 @@ impl EntryHeader {
|
|||||||
Self::variant_size() + mem::size_of::<ArroyDeleteVector>()
|
Self::variant_size() + mem::size_of::<ArroyDeleteVector>()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The `dimensions` corresponds to the number of `f32` in the embedding.
|
|
||||||
fn total_set_vector_size(dimensions: usize) -> usize {
|
|
||||||
Self::variant_size() + mem::size_of::<ArroySetVector>() + dimensions * mem::size_of::<f32>()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The `dimensions` corresponds to the number of `f32` in the embedding.
|
/// The `dimensions` corresponds to the number of `f32` in the embedding.
|
||||||
fn total_set_vectors_size(count: usize, dimensions: usize) -> usize {
|
fn total_set_vectors_size(count: usize, dimensions: usize) -> usize {
|
||||||
let embedding_size = dimensions * mem::size_of::<f32>();
|
let embedding_size = dimensions * mem::size_of::<f32>();
|
||||||
@ -338,7 +283,6 @@ impl EntryHeader {
|
|||||||
let payload_size = match self {
|
let payload_size = match self {
|
||||||
EntryHeader::DbOperation(op) => mem::size_of_val(op),
|
EntryHeader::DbOperation(op) => mem::size_of_val(op),
|
||||||
EntryHeader::ArroyDeleteVector(adv) => mem::size_of_val(adv),
|
EntryHeader::ArroyDeleteVector(adv) => mem::size_of_val(adv),
|
||||||
EntryHeader::ArroySetVector(asv) => mem::size_of_val(asv),
|
|
||||||
EntryHeader::ArroySetVectors(asvs) => mem::size_of_val(asvs),
|
EntryHeader::ArroySetVectors(asvs) => mem::size_of_val(asvs),
|
||||||
};
|
};
|
||||||
Self::variant_size() + payload_size
|
Self::variant_size() + payload_size
|
||||||
@ -358,11 +302,6 @@ impl EntryHeader {
|
|||||||
EntryHeader::ArroyDeleteVector(header)
|
EntryHeader::ArroyDeleteVector(header)
|
||||||
}
|
}
|
||||||
2 => {
|
2 => {
|
||||||
let header_bytes = &remaining[..mem::size_of::<ArroySetVector>()];
|
|
||||||
let header = checked::pod_read_unaligned(header_bytes);
|
|
||||||
EntryHeader::ArroySetVector(header)
|
|
||||||
}
|
|
||||||
3 => {
|
|
||||||
let header_bytes = &remaining[..mem::size_of::<ArroySetVectors>()];
|
let header_bytes = &remaining[..mem::size_of::<ArroySetVectors>()];
|
||||||
let header = checked::pod_read_unaligned(header_bytes);
|
let header = checked::pod_read_unaligned(header_bytes);
|
||||||
EntryHeader::ArroySetVectors(header)
|
EntryHeader::ArroySetVectors(header)
|
||||||
@ -376,7 +315,6 @@ impl EntryHeader {
|
|||||||
let payload_bytes = match self {
|
let payload_bytes = match self {
|
||||||
EntryHeader::DbOperation(op) => bytemuck::bytes_of(op),
|
EntryHeader::DbOperation(op) => bytemuck::bytes_of(op),
|
||||||
EntryHeader::ArroyDeleteVector(adv) => bytemuck::bytes_of(adv),
|
EntryHeader::ArroyDeleteVector(adv) => bytemuck::bytes_of(adv),
|
||||||
EntryHeader::ArroySetVector(asv) => bytemuck::bytes_of(asv),
|
|
||||||
EntryHeader::ArroySetVectors(asvs) => bytemuck::bytes_of(asvs),
|
EntryHeader::ArroySetVectors(asvs) => bytemuck::bytes_of(asvs),
|
||||||
};
|
};
|
||||||
*first = self.variant_id();
|
*first = self.variant_id();
|
||||||
@ -520,59 +458,6 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_vector(
|
|
||||||
&self,
|
|
||||||
docid: DocumentId,
|
|
||||||
embedder_id: u8,
|
|
||||||
embedding: &[f32],
|
|
||||||
) -> crate::Result<()> {
|
|
||||||
let capacity = self.capacity;
|
|
||||||
let refcell = self.producers.get().unwrap();
|
|
||||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
|
||||||
|
|
||||||
let arroy_set_vector = ArroySetVector { docid, embedder_id, _padding: [0; 3] };
|
|
||||||
let payload_header = EntryHeader::ArroySetVector(arroy_set_vector);
|
|
||||||
let total_length = EntryHeader::total_set_vector_size(embedding.len());
|
|
||||||
if total_length > capacity {
|
|
||||||
let mut embedding_bytes = bytemuck::cast_slice(embedding);
|
|
||||||
let mut value_file = tempfile::tempfile().map(BufWriter::new)?;
|
|
||||||
io::copy(&mut embedding_bytes, &mut value_file)?;
|
|
||||||
let value_file = value_file.into_inner().map_err(|ie| ie.into_error())?;
|
|
||||||
value_file.sync_all()?;
|
|
||||||
let embedding = unsafe { Mmap::map(&value_file)? };
|
|
||||||
|
|
||||||
let large_vector = LargeVector { docid, embedder_id, embedding };
|
|
||||||
self.sender.send(ReceiverAction::LargeVector(large_vector)).unwrap();
|
|
||||||
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Spin loop to have a frame the size we requested.
|
|
||||||
let mut grant = loop {
|
|
||||||
match producer.grant(total_length) {
|
|
||||||
Ok(grant) => break grant,
|
|
||||||
Err(bbqueue::Error::InsufficientSize) => continue,
|
|
||||||
Err(e) => unreachable!("{e:?}"),
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let header_size = payload_header.header_size();
|
|
||||||
let (header_bytes, remaining) = grant.split_at_mut(header_size);
|
|
||||||
payload_header.serialize_into(header_bytes);
|
|
||||||
remaining.copy_from_slice(bytemuck::cast_slice(embedding));
|
|
||||||
|
|
||||||
// We could commit only the used memory.
|
|
||||||
grant.commit(total_length);
|
|
||||||
|
|
||||||
// We only send a wake up message when the channel is empty
|
|
||||||
// so that we don't fill the channel with too many WakeUps.
|
|
||||||
if self.sender.is_empty() {
|
|
||||||
self.sender.send(ReceiverAction::WakeUp).unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn set_vectors(
|
fn set_vectors(
|
||||||
&self,
|
&self,
|
||||||
docid: u32,
|
docid: u32,
|
||||||
@ -583,12 +468,9 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
|||||||
let refcell = self.producers.get().unwrap();
|
let refcell = self.producers.get().unwrap();
|
||||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||||
|
|
||||||
// If there are no vector we specify the dimensions
|
// If there are no vectors we specify the dimensions
|
||||||
// to zero to allocate no extra space at all
|
// to zero to allocate no extra space at all
|
||||||
let dimensions = match embeddings.first() {
|
let dimensions = embeddings.first().map_or(0, |emb| emb.len());
|
||||||
Some(embedding) => embedding.len(),
|
|
||||||
None => 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
let arroy_set_vector = ArroySetVectors { docid, embedder_id, _padding: [0; 3] };
|
let arroy_set_vector = ArroySetVectors { docid, embedder_id, _padding: [0; 3] };
|
||||||
let payload_header = EntryHeader::ArroySetVectors(arroy_set_vector);
|
let payload_header = EntryHeader::ArroySetVectors(arroy_set_vector);
|
||||||
@ -954,7 +836,7 @@ impl EmbeddingSender<'_, '_> {
|
|||||||
embedder_id: u8,
|
embedder_id: u8,
|
||||||
embedding: Embedding,
|
embedding: Embedding,
|
||||||
) -> crate::Result<()> {
|
) -> crate::Result<()> {
|
||||||
self.0.set_vector(docid, embedder_id, &embedding[..])
|
self.0.set_vectors(docid, embedder_id, &[embedding])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,7 +16,6 @@ use rand::SeedableRng as _;
|
|||||||
use raw_collections::RawMap;
|
use raw_collections::RawMap;
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
pub use update_by_function::UpdateByFunction;
|
pub use update_by_function::UpdateByFunction;
|
||||||
use {LargeEntry, LargeVector};
|
|
||||||
|
|
||||||
use super::channel::*;
|
use super::channel::*;
|
||||||
use super::extract::*;
|
use super::extract::*;
|
||||||
@ -430,14 +429,6 @@ where
|
|||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ReceiverAction::LargeVector(large_vector) => {
|
|
||||||
let embedding = large_vector.read_embedding();
|
|
||||||
let LargeVector { docid, embedder_id, .. } = large_vector;
|
|
||||||
let (_, _, writer, dimensions) =
|
|
||||||
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
|
|
||||||
writer.del_items(wtxn, *dimensions, docid)?;
|
|
||||||
writer.add_item(wtxn, docid, embedding)?;
|
|
||||||
}
|
|
||||||
ReceiverAction::LargeVectors(large_vectors) => {
|
ReceiverAction::LargeVectors(large_vectors) => {
|
||||||
let LargeVectors { docid, embedder_id, .. } = large_vectors;
|
let LargeVectors { docid, embedder_id, .. } = large_vectors;
|
||||||
let (_, _, writer, dimensions) =
|
let (_, _, writer, dimensions) =
|
||||||
@ -594,16 +585,6 @@ fn write_from_bbqueue(
|
|||||||
writer.del_items(wtxn, dimensions, docid)?;
|
writer.del_items(wtxn, dimensions, docid)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
EntryHeader::ArroySetVector(asv) => {
|
|
||||||
let ArroySetVector { docid, embedder_id, .. } = asv;
|
|
||||||
let frame = frame_with_header.frame();
|
|
||||||
let (_, _, writer, dimensions) =
|
|
||||||
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
|
|
||||||
writer.del_items(wtxn, *dimensions, docid)?;
|
|
||||||
if let Some(embedding) = asv.read_embedding_into_vec(frame, aligned_embedding) {
|
|
||||||
writer.add_item(wtxn, docid, embedding)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
EntryHeader::ArroySetVectors(asvs) => {
|
EntryHeader::ArroySetVectors(asvs) => {
|
||||||
let ArroySetVectors { docid, embedder_id, .. } = asvs;
|
let ArroySetVectors { docid, embedder_id, .. } = asvs;
|
||||||
let frame = frame_with_header.frame();
|
let frame = frame_with_header.frame();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user