From 84371a6cd9b4f79ecb1723c14a21933cd43ac74d Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 19 Dec 2024 15:50:30 +0100 Subject: [PATCH] Measure the bbqueue congestion --- crates/milli/src/update/new/channel.rs | 132 +++++++++++++++++++------ 1 file changed, 103 insertions(+), 29 deletions(-) diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index 7590c02ac..eba0e3586 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -5,6 +5,8 @@ use std::marker::PhantomData; use std::mem; use std::num::NonZeroU16; use std::ops::Range; +use std::sync::atomic::{self, AtomicUsize}; +use std::sync::Arc; use std::time::Duration; use bbqueue::framed::{FrameGrantR, FrameProducer}; @@ -64,12 +66,23 @@ pub fn extractor_writer_bbqueue( consumer }); + let sent_messages_attempts = Arc::new(AtomicUsize::new(0)); + let blocking_sent_messages_attempts = Arc::new(AtomicUsize::new(0)); + let (sender, receiver) = flume::bounded(channel_capacity); - let sender = ExtractorBbqueueSender { sender, producers, capacity }; + let sender = ExtractorBbqueueSender { + sender, + producers, + capacity, + sent_messages_attempts: sent_messages_attempts.clone(), + blocking_sent_messages_attempts: blocking_sent_messages_attempts.clone(), + }; let receiver = WriterBbqueueReceiver { receiver, look_at_consumer: (0..consumers.len()).cycle(), consumers, + sent_messages_attempts, + blocking_sent_messages_attempts, }; (sender, receiver) } @@ -88,6 +101,12 @@ pub struct ExtractorBbqueueSender<'a> { /// /// capacity: usize, + /// The total number of attempts to send messages + /// over the bbqueue channel. + sent_messages_attempts: Arc, + /// The number of times an attempt to send a + /// messages failed and we had to pause for a bit. + blocking_sent_messages_attempts: Arc, } pub struct WriterBbqueueReceiver<'a> { @@ -100,6 +119,12 @@ pub struct WriterBbqueueReceiver<'a> { look_at_consumer: Cycle>, /// The BBQueue frames to read when waking-up. consumers: Vec>, + /// The total number of attempts to send messages + /// over the bbqueue channel. + sent_messages_attempts: Arc, + /// The number of times an attempt to send a + /// messages failed and we had to pause for a bit. + blocking_sent_messages_attempts: Arc, } /// The action to perform on the receiver/writer side. @@ -165,6 +190,16 @@ impl<'a> WriterBbqueueReceiver<'a> { } None } + + /// Returns the total count of attempts to send messages through the BBQueue channel. + pub fn sent_messages_attempts(&self) -> usize { + self.sent_messages_attempts.load(atomic::Ordering::Relaxed) + } + + /// Returns the count of attempts to send messages that had to be paused due to BBQueue being full. + pub fn blocking_sent_messages_attempts(&self) -> usize { + self.blocking_sent_messages_attempts.load(atomic::Ordering::Relaxed) + } } pub struct FrameWithHeader<'a> { @@ -454,10 +489,17 @@ impl<'b> ExtractorBbqueueSender<'b> { } // Spin loop to have a frame the size we requested. - reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { - payload_header.serialize_into(grant); - Ok(()) - })?; + reserve_and_write_grant( + &mut producer, + total_length, + &self.sender, + &self.sent_messages_attempts, + &self.blocking_sent_messages_attempts, + |grant| { + payload_header.serialize_into(grant); + Ok(()) + }, + )?; Ok(()) } @@ -496,20 +538,28 @@ impl<'b> ExtractorBbqueueSender<'b> { } // Spin loop to have a frame the size we requested. - reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { - let header_size = payload_header.header_size(); - let (header_bytes, remaining) = grant.split_at_mut(header_size); - payload_header.serialize_into(header_bytes); + reserve_and_write_grant( + &mut producer, + total_length, + &self.sender, + &self.sent_messages_attempts, + &self.blocking_sent_messages_attempts, + |grant| { + let header_size = payload_header.header_size(); + let (header_bytes, remaining) = grant.split_at_mut(header_size); + payload_header.serialize_into(header_bytes); - if dimensions != 0 { - let output_iter = remaining.chunks_exact_mut(dimensions * mem::size_of::()); - for (embedding, output) in embeddings.iter().zip(output_iter) { - output.copy_from_slice(bytemuck::cast_slice(embedding)); + if dimensions != 0 { + let output_iter = + remaining.chunks_exact_mut(dimensions * mem::size_of::()); + for (embedding, output) in embeddings.iter().zip(output_iter) { + output.copy_from_slice(bytemuck::cast_slice(embedding)); + } } - } - Ok(()) - })?; + Ok(()) + }, + )?; Ok(()) } @@ -567,13 +617,20 @@ impl<'b> ExtractorBbqueueSender<'b> { } // Spin loop to have a frame the size we requested. - reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { - let header_size = payload_header.header_size(); - let (header_bytes, remaining) = grant.split_at_mut(header_size); - payload_header.serialize_into(header_bytes); - let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize); - key_value_writer(key_buffer, value_buffer) - })?; + reserve_and_write_grant( + &mut producer, + total_length, + &self.sender, + &self.sent_messages_attempts, + &self.blocking_sent_messages_attempts, + |grant| { + let header_size = payload_header.header_size(); + let (header_bytes, remaining) = grant.split_at_mut(header_size); + payload_header.serialize_into(header_bytes); + let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize); + key_value_writer(key_buffer, value_buffer) + }, + )?; Ok(()) } @@ -615,12 +672,19 @@ impl<'b> ExtractorBbqueueSender<'b> { } // Spin loop to have a frame the size we requested. - reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { - let header_size = payload_header.header_size(); - let (header_bytes, remaining) = grant.split_at_mut(header_size); - payload_header.serialize_into(header_bytes); - key_writer(remaining) - })?; + reserve_and_write_grant( + &mut producer, + total_length, + &self.sender, + &self.sent_messages_attempts, + &self.blocking_sent_messages_attempts, + |grant| { + let header_size = payload_header.header_size(); + let (header_bytes, remaining) = grant.split_at_mut(header_size); + payload_header.serialize_into(header_bytes); + key_writer(remaining) + }, + )?; Ok(()) } @@ -633,12 +697,18 @@ fn reserve_and_write_grant( producer: &mut FrameProducer, total_length: usize, sender: &flume::Sender, + sent_messages_attempts: &AtomicUsize, + blocking_sent_messages_attempts: &AtomicUsize, f: F, ) -> crate::Result<()> where F: FnOnce(&mut [u8]) -> crate::Result<()>, { loop { + // An attempt means trying multiple times + // and succeeded to send or not. + sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed); + for _ in 0..10_000 { match producer.grant(total_length) { Ok(mut grant) => { @@ -662,6 +732,10 @@ where return Err(Error::InternalError(InternalError::AbortedIndexation)); } + // We made an attempt to send a message in the + // bbqueue channel but it didn't succeeded. + blocking_sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed); + // We prefer to yield and allow the writing thread // to do its job, especially beneficial when there // is only one CPU core available.