mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-27 13:47:29 +01:00
Reduce the maximum grant possible we can store in the BBQueue
This commit is contained in:
parent
ad9d8e10f2
commit
34dea863e5
@ -27,6 +27,12 @@ use crate::update::new::KvReaderFieldId;
|
||||
use crate::vector::Embedding;
|
||||
use crate::{CboRoaringBitmapCodec, DocumentId, Error, Index, InternalError};
|
||||
|
||||
/// Note that the FrameProducer requires up to 9 bytes to
|
||||
/// encode the length, the max grant has been computed accordingly.
|
||||
///
|
||||
/// <https://docs.rs/bbqueue/latest/bbqueue/framed/index.html#frame-header>
|
||||
const MAX_FRAME_HEADER_SIZE: usize = 9;
|
||||
|
||||
/// Creates a tuple of senders/receiver to be used by
|
||||
/// the extractors and the writer loop.
|
||||
///
|
||||
@ -53,8 +59,8 @@ pub fn extractor_writer_bbqueue(
|
||||
bbbuffers.resize_with(current_num_threads, || BBBuffer::new(bbbuffer_capacity));
|
||||
|
||||
let capacity = bbbuffers.first().unwrap().capacity();
|
||||
// Read the field description to understand this
|
||||
let capacity = capacity.checked_sub(9).unwrap();
|
||||
// Read the const description to understand this
|
||||
let max_grant = capacity.saturating_div(2).checked_sub(MAX_FRAME_HEADER_SIZE).unwrap();
|
||||
|
||||
let producers = ThreadLocal::with_capacity(bbbuffers.len());
|
||||
let consumers = rayon::broadcast(|bi| {
|
||||
@ -65,7 +71,7 @@ pub fn extractor_writer_bbqueue(
|
||||
});
|
||||
|
||||
let (sender, receiver) = flume::bounded(channel_capacity);
|
||||
let sender = ExtractorBbqueueSender { sender, producers, capacity };
|
||||
let sender = ExtractorBbqueueSender { sender, producers, max_grant };
|
||||
let receiver = WriterBbqueueReceiver {
|
||||
receiver,
|
||||
look_at_consumer: (0..consumers.len()).cycle(),
|
||||
@ -81,13 +87,10 @@ pub struct ExtractorBbqueueSender<'a> {
|
||||
/// A memory buffer, one by thread, is used to serialize
|
||||
/// the entries directly in this shared, lock-free space.
|
||||
producers: ThreadLocal<FullySend<RefCell<FrameProducer<'a>>>>,
|
||||
/// The capacity of this frame producer, will never be able to store more than that.
|
||||
///
|
||||
/// Note that the FrameProducer requires up to 9 bytes to encode the length,
|
||||
/// the capacity has been shrunk accordingly.
|
||||
///
|
||||
/// <https://docs.rs/bbqueue/latest/bbqueue/framed/index.html#frame-header>
|
||||
capacity: usize,
|
||||
/// The maximum frame grant that a producer can reserve.
|
||||
/// It will never be able to store more than that as the
|
||||
/// buffer cannot split data into two parts.
|
||||
max_grant: usize,
|
||||
}
|
||||
|
||||
pub struct WriterBbqueueReceiver<'a> {
|
||||
@ -443,14 +446,14 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
||||
}
|
||||
|
||||
fn delete_vector(&self, docid: DocumentId) -> crate::Result<()> {
|
||||
let capacity = self.capacity;
|
||||
let max_grant = self.max_grant;
|
||||
let refcell = self.producers.get().unwrap();
|
||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||
|
||||
let payload_header = EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid });
|
||||
let total_length = EntryHeader::total_delete_vector_size();
|
||||
if total_length > capacity {
|
||||
panic!("The entry is larger ({total_length} bytes) than the BBQueue capacity ({capacity} bytes)");
|
||||
if total_length > max_grant {
|
||||
panic!("The entry is larger ({total_length} bytes) than the BBQueue max grant ({max_grant} bytes)");
|
||||
}
|
||||
|
||||
// Spin loop to have a frame the size we requested.
|
||||
@ -468,7 +471,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
||||
embedder_id: u8,
|
||||
embeddings: &[Vec<f32>],
|
||||
) -> crate::Result<()> {
|
||||
let capacity = self.capacity;
|
||||
let max_grant = self.max_grant;
|
||||
let refcell = self.producers.get().unwrap();
|
||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||
|
||||
@ -479,7 +482,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
||||
let arroy_set_vector = ArroySetVectors { docid, embedder_id, _padding: [0; 3] };
|
||||
let payload_header = EntryHeader::ArroySetVectors(arroy_set_vector);
|
||||
let total_length = EntryHeader::total_set_vectors_size(embeddings.len(), dimensions);
|
||||
if total_length > capacity {
|
||||
if total_length > max_grant {
|
||||
let mut value_file = tempfile::tempfile().map(BufWriter::new)?;
|
||||
for embedding in embeddings {
|
||||
let mut embedding_bytes = bytemuck::cast_slice(embedding);
|
||||
@ -540,14 +543,14 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
||||
where
|
||||
F: FnOnce(&mut [u8], &mut [u8]) -> crate::Result<()>,
|
||||
{
|
||||
let capacity = self.capacity;
|
||||
let max_grant = self.max_grant;
|
||||
let refcell = self.producers.get().unwrap();
|
||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||
|
||||
let operation = DbOperation { database, key_length: Some(key_length) };
|
||||
let payload_header = EntryHeader::DbOperation(operation);
|
||||
let total_length = EntryHeader::total_key_value_size(key_length, value_length);
|
||||
if total_length > capacity {
|
||||
if total_length > max_grant {
|
||||
let mut key_buffer = vec![0; key_length.get() as usize].into_boxed_slice();
|
||||
let value_file = tempfile::tempfile()?;
|
||||
value_file.set_len(value_length.try_into().unwrap())?;
|
||||
@ -601,7 +604,7 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
||||
where
|
||||
F: FnOnce(&mut [u8]) -> crate::Result<()>,
|
||||
{
|
||||
let capacity = self.capacity;
|
||||
let max_grant = self.max_grant;
|
||||
let refcell = self.producers.get().unwrap();
|
||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||
|
||||
@ -610,8 +613,8 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
||||
let operation = DbOperation { database, key_length: None };
|
||||
let payload_header = EntryHeader::DbOperation(operation);
|
||||
let total_length = EntryHeader::total_key_size(key_length);
|
||||
if total_length > capacity {
|
||||
panic!("The entry is larger ({total_length} bytes) than the BBQueue capacity ({capacity} bytes)");
|
||||
if total_length > max_grant {
|
||||
panic!("The entry is larger ({total_length} bytes) than the BBQueue max grant ({max_grant} bytes)");
|
||||
}
|
||||
|
||||
// Spin loop to have a frame the size we requested.
|
||||
|
Loading…
x
Reference in New Issue
Block a user