5118: Change the reserve and grant function to accept a closure r=ManyTheFish a=Kerollmops

This simplifies the usage of the grant and commits it at the right time, just after having written in it.

Co-authored-by: Kerollmops <clement@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2024-12-04 10:12:39 +00:00 committed by GitHub
commit 54341c2e80
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -7,7 +7,7 @@ use std::num::NonZeroU16;
use std::ops::Range; use std::ops::Range;
use std::time::Duration; use std::time::Duration;
use bbqueue::framed::{FrameGrantR, FrameGrantW, FrameProducer}; use bbqueue::framed::{FrameGrantR, FrameProducer};
use bbqueue::BBBuffer; use bbqueue::BBBuffer;
use bytemuck::{checked, CheckedBitPattern, NoUninit}; use bytemuck::{checked, CheckedBitPattern, NoUninit};
use flume::{RecvTimeoutError, SendError}; use flume::{RecvTimeoutError, SendError};
@ -454,14 +454,10 @@ impl<'b> ExtractorBbqueueSender<'b> {
} }
// Spin loop to have a frame the size we requested. // Spin loop to have a frame the size we requested.
let mut grant = reserve_grant(&mut producer, total_length, &self.sender)?; reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
payload_header.serialize_into(&mut grant); payload_header.serialize_into(grant);
Ok(())
// We only send a wake up message when the channel is empty })?;
// so that we don't fill the channel with too many WakeUps.
if self.sender.is_empty() {
self.sender.send(ReceiverAction::WakeUp).unwrap();
}
Ok(()) Ok(())
} }
@ -500,24 +496,20 @@ impl<'b> ExtractorBbqueueSender<'b> {
} }
// Spin loop to have a frame the size we requested. // Spin loop to have a frame the size we requested.
let mut grant = reserve_grant(&mut producer, total_length, &self.sender)?; reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
let header_size = payload_header.header_size(); if dimensions != 0 {
let (header_bytes, remaining) = grant.split_at_mut(header_size); let output_iter = remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>());
payload_header.serialize_into(header_bytes); for (embedding, output) in embeddings.iter().zip(output_iter) {
output.copy_from_slice(bytemuck::cast_slice(embedding));
if dimensions != 0 { }
let output_iter = remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>());
for (embedding, output) in embeddings.iter().zip(output_iter) {
output.copy_from_slice(bytemuck::cast_slice(embedding));
} }
}
// We only send a wake up message when the channel is empty Ok(())
// so that we don't fill the channel with too many WakeUps. })?;
if self.sender.is_empty() {
self.sender.send(ReceiverAction::WakeUp).unwrap();
}
Ok(()) Ok(())
} }
@ -575,19 +567,13 @@ impl<'b> ExtractorBbqueueSender<'b> {
} }
// Spin loop to have a frame the size we requested. // Spin loop to have a frame the size we requested.
let mut grant = reserve_grant(&mut producer, total_length, &self.sender)?; reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
let header_size = payload_header.header_size();
let header_size = payload_header.header_size(); let (header_bytes, remaining) = grant.split_at_mut(header_size);
let (header_bytes, remaining) = grant.split_at_mut(header_size); payload_header.serialize_into(header_bytes);
payload_header.serialize_into(header_bytes); let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize);
let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize); key_value_writer(key_buffer, value_buffer)
key_value_writer(key_buffer, value_buffer)?; })?;
// We only send a wake up message when the channel is empty
// so that we don't fill the channel with too many WakeUps.
if self.sender.is_empty() {
self.sender.send(ReceiverAction::WakeUp).unwrap();
}
Ok(()) Ok(())
} }
@ -629,37 +615,44 @@ impl<'b> ExtractorBbqueueSender<'b> {
} }
// Spin loop to have a frame the size we requested. // Spin loop to have a frame the size we requested.
let mut grant = reserve_grant(&mut producer, total_length, &self.sender)?; reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
let header_size = payload_header.header_size();
let header_size = payload_header.header_size(); let (header_bytes, remaining) = grant.split_at_mut(header_size);
let (header_bytes, remaining) = grant.split_at_mut(header_size); payload_header.serialize_into(header_bytes);
payload_header.serialize_into(header_bytes); key_writer(remaining)
key_writer(remaining)?; })?;
// We only send a wake up message when the channel is empty
// so that we don't fill the channel with too many WakeUps.
if self.sender.is_empty() {
self.sender.send(ReceiverAction::WakeUp).unwrap();
}
Ok(()) Ok(())
} }
} }
/// Try to reserve a frame grant of `total_length` by spin looping /// Try to reserve a frame grant of `total_length` by spin
/// on the BBQueue buffer and panics if the receiver has been disconnected. /// looping on the BBQueue buffer, panics if the receiver
fn reserve_grant<'b>( /// has been disconnected or send a WakeUp message if necessary.
producer: &mut FrameProducer<'b>, fn reserve_and_write_grant<F>(
producer: &mut FrameProducer,
total_length: usize, total_length: usize,
sender: &flume::Sender<ReceiverAction>, sender: &flume::Sender<ReceiverAction>,
) -> crate::Result<FrameGrantW<'b>> { f: F,
) -> crate::Result<()>
where
F: FnOnce(&mut [u8]) -> crate::Result<()>,
{
loop { loop {
for _ in 0..10_000 { for _ in 0..10_000 {
match producer.grant(total_length) { match producer.grant(total_length) {
Ok(mut grant) => { Ok(mut grant) => {
// We could commit only the used memory. // We could commit only the used memory.
grant.to_commit(total_length); f(&mut grant)?;
return Ok(grant); grant.commit(total_length);
// We only send a wake up message when the channel is empty
// so that we don't fill the channel with too many WakeUps.
if sender.is_empty() {
sender.send(ReceiverAction::WakeUp).unwrap();
}
return Ok(());
} }
Err(bbqueue::Error::InsufficientSize) => continue, Err(bbqueue::Error::InsufficientSize) => continue,
Err(e) => unreachable!("{e:?}"), Err(e) => unreachable!("{e:?}"),