From 0459b1a2420d40282a0a259f02f0aedd57db6514 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 4 Dec 2024 10:32:25 +0100 Subject: [PATCH 1/2] Change the reserve and grant function to accept a closure --- crates/milli/src/update/new/channel.rs | 71 +++++++++++++++----------- 1 file changed, 40 insertions(+), 31 deletions(-) diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index b749eb7fe..5675069d6 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -7,7 +7,7 @@ use std::num::NonZeroU16; use std::ops::Range; use std::time::Duration; -use bbqueue::framed::{FrameGrantR, FrameGrantW, FrameProducer}; +use bbqueue::framed::{FrameGrantR, FrameProducer}; use bbqueue::BBBuffer; use bytemuck::{checked, CheckedBitPattern, NoUninit}; use flume::{RecvTimeoutError, SendError}; @@ -454,8 +454,10 @@ impl<'b> ExtractorBbqueueSender<'b> { } // Spin loop to have a frame the size we requested. - let mut grant = reserve_grant(&mut producer, total_length, &self.sender)?; - payload_header.serialize_into(&mut grant); + reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { + payload_header.serialize_into(grant); + Ok(()) + })?; // We only send a wake up message when the channel is empty // so that we don't fill the channel with too many WakeUps. @@ -500,18 +502,20 @@ impl<'b> ExtractorBbqueueSender<'b> { } // Spin loop to have a frame the size we requested. - let mut grant = reserve_grant(&mut producer, total_length, &self.sender)?; + reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { + let header_size = payload_header.header_size(); + let (header_bytes, remaining) = grant.split_at_mut(header_size); + payload_header.serialize_into(header_bytes); - let header_size = payload_header.header_size(); - let (header_bytes, remaining) = grant.split_at_mut(header_size); - payload_header.serialize_into(header_bytes); - - if dimensions != 0 { - let output_iter = remaining.chunks_exact_mut(dimensions * mem::size_of::()); - for (embedding, output) in embeddings.iter().zip(output_iter) { - output.copy_from_slice(bytemuck::cast_slice(embedding)); + if dimensions != 0 { + let output_iter = remaining.chunks_exact_mut(dimensions * mem::size_of::()); + for (embedding, output) in embeddings.iter().zip(output_iter) { + output.copy_from_slice(bytemuck::cast_slice(embedding)); + } } - } + + Ok(()) + })?; // We only send a wake up message when the channel is empty // so that we don't fill the channel with too many WakeUps. @@ -575,13 +579,13 @@ impl<'b> ExtractorBbqueueSender<'b> { } // Spin loop to have a frame the size we requested. - let mut grant = reserve_grant(&mut producer, total_length, &self.sender)?; - - let header_size = payload_header.header_size(); - let (header_bytes, remaining) = grant.split_at_mut(header_size); - payload_header.serialize_into(header_bytes); - let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize); - key_value_writer(key_buffer, value_buffer)?; + reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { + let header_size = payload_header.header_size(); + let (header_bytes, remaining) = grant.split_at_mut(header_size); + payload_header.serialize_into(header_bytes); + let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize); + key_value_writer(key_buffer, value_buffer) + })?; // We only send a wake up message when the channel is empty // so that we don't fill the channel with too many WakeUps. @@ -629,12 +633,12 @@ impl<'b> ExtractorBbqueueSender<'b> { } // Spin loop to have a frame the size we requested. - let mut grant = reserve_grant(&mut producer, total_length, &self.sender)?; - - let header_size = payload_header.header_size(); - let (header_bytes, remaining) = grant.split_at_mut(header_size); - payload_header.serialize_into(header_bytes); - key_writer(remaining)?; + reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { + let header_size = payload_header.header_size(); + let (header_bytes, remaining) = grant.split_at_mut(header_size); + payload_header.serialize_into(header_bytes); + key_writer(remaining) + })?; // We only send a wake up message when the channel is empty // so that we don't fill the channel with too many WakeUps. @@ -648,18 +652,23 @@ impl<'b> ExtractorBbqueueSender<'b> { /// Try to reserve a frame grant of `total_length` by spin looping /// on the BBQueue buffer and panics if the receiver has been disconnected. -fn reserve_grant<'b>( - producer: &mut FrameProducer<'b>, +fn reserve_and_write_grant( + producer: &mut FrameProducer, total_length: usize, sender: &flume::Sender, -) -> crate::Result> { + f: F, +) -> crate::Result<()> +where + F: FnOnce(&mut [u8]) -> crate::Result<()>, +{ loop { for _ in 0..10_000 { match producer.grant(total_length) { Ok(mut grant) => { // We could commit only the used memory. - grant.to_commit(total_length); - return Ok(grant); + f(&mut grant)?; + grant.commit(total_length); + return Ok(()); } Err(bbqueue::Error::InsufficientSize) => continue, Err(e) => unreachable!("{e:?}"), From 96831ed9bb9b2784a294f32f4665f16135347f27 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 4 Dec 2024 11:03:01 +0100 Subject: [PATCH 2/2] Send the WakeUp message if necessary in the reserve function --- crates/milli/src/update/new/channel.rs | 36 +++++++------------------- 1 file changed, 10 insertions(+), 26 deletions(-) diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index 5675069d6..ebd0ba429 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -459,12 +459,6 @@ impl<'b> ExtractorBbqueueSender<'b> { Ok(()) })?; - // We only send a wake up message when the channel is empty - // so that we don't fill the channel with too many WakeUps. - if self.sender.is_empty() { - self.sender.send(ReceiverAction::WakeUp).unwrap(); - } - Ok(()) } @@ -517,12 +511,6 @@ impl<'b> ExtractorBbqueueSender<'b> { Ok(()) })?; - // We only send a wake up message when the channel is empty - // so that we don't fill the channel with too many WakeUps. - if self.sender.is_empty() { - self.sender.send(ReceiverAction::WakeUp).unwrap(); - } - Ok(()) } @@ -587,12 +575,6 @@ impl<'b> ExtractorBbqueueSender<'b> { key_value_writer(key_buffer, value_buffer) })?; - // We only send a wake up message when the channel is empty - // so that we don't fill the channel with too many WakeUps. - if self.sender.is_empty() { - self.sender.send(ReceiverAction::WakeUp).unwrap(); - } - Ok(()) } @@ -640,18 +622,13 @@ impl<'b> ExtractorBbqueueSender<'b> { key_writer(remaining) })?; - // We only send a wake up message when the channel is empty - // so that we don't fill the channel with too many WakeUps. - if self.sender.is_empty() { - self.sender.send(ReceiverAction::WakeUp).unwrap(); - } - Ok(()) } } -/// Try to reserve a frame grant of `total_length` by spin looping -/// on the BBQueue buffer and panics if the receiver has been disconnected. +/// Try to reserve a frame grant of `total_length` by spin +/// looping on the BBQueue buffer, panics if the receiver +/// has been disconnected or send a WakeUp message if necessary. fn reserve_and_write_grant( producer: &mut FrameProducer, total_length: usize, @@ -668,6 +645,13 @@ where // We could commit only the used memory. f(&mut grant)?; grant.commit(total_length); + + // We only send a wake up message when the channel is empty + // so that we don't fill the channel with too many WakeUps. + if sender.is_empty() { + sender.send(ReceiverAction::WakeUp).unwrap(); + } + return Ok(()); } Err(bbqueue::Error::InsufficientSize) => continue,