Measure the bbqueue congestion

This commit is contained in:
Kerollmops 2024-12-19 15:50:30 +01:00
parent 8006016a43
commit 84371a6cd9
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -5,6 +5,8 @@ use std::marker::PhantomData;
use std::mem; use std::mem;
use std::num::NonZeroU16; use std::num::NonZeroU16;
use std::ops::Range; use std::ops::Range;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use bbqueue::framed::{FrameGrantR, FrameProducer}; use bbqueue::framed::{FrameGrantR, FrameProducer};
@ -64,12 +66,23 @@ pub fn extractor_writer_bbqueue(
consumer consumer
}); });
let sent_messages_attempts = Arc::new(AtomicUsize::new(0));
let blocking_sent_messages_attempts = Arc::new(AtomicUsize::new(0));
let (sender, receiver) = flume::bounded(channel_capacity); let (sender, receiver) = flume::bounded(channel_capacity);
let sender = ExtractorBbqueueSender { sender, producers, capacity }; let sender = ExtractorBbqueueSender {
sender,
producers,
capacity,
sent_messages_attempts: sent_messages_attempts.clone(),
blocking_sent_messages_attempts: blocking_sent_messages_attempts.clone(),
};
let receiver = WriterBbqueueReceiver { let receiver = WriterBbqueueReceiver {
receiver, receiver,
look_at_consumer: (0..consumers.len()).cycle(), look_at_consumer: (0..consumers.len()).cycle(),
consumers, consumers,
sent_messages_attempts,
blocking_sent_messages_attempts,
}; };
(sender, receiver) (sender, receiver)
} }
@ -88,6 +101,12 @@ pub struct ExtractorBbqueueSender<'a> {
/// ///
/// <https://docs.rs/bbqueue/latest/bbqueue/framed/index.html#frame-header> /// <https://docs.rs/bbqueue/latest/bbqueue/framed/index.html#frame-header>
capacity: usize, capacity: usize,
/// The total number of attempts to send messages
/// over the bbqueue channel.
sent_messages_attempts: Arc<AtomicUsize>,
/// The number of times an attempt to send a
/// messages failed and we had to pause for a bit.
blocking_sent_messages_attempts: Arc<AtomicUsize>,
} }
pub struct WriterBbqueueReceiver<'a> { pub struct WriterBbqueueReceiver<'a> {
@ -100,6 +119,12 @@ pub struct WriterBbqueueReceiver<'a> {
look_at_consumer: Cycle<Range<usize>>, look_at_consumer: Cycle<Range<usize>>,
/// The BBQueue frames to read when waking-up. /// The BBQueue frames to read when waking-up.
consumers: Vec<bbqueue::framed::FrameConsumer<'a>>, consumers: Vec<bbqueue::framed::FrameConsumer<'a>>,
/// The total number of attempts to send messages
/// over the bbqueue channel.
sent_messages_attempts: Arc<AtomicUsize>,
/// The number of times an attempt to send a
/// messages failed and we had to pause for a bit.
blocking_sent_messages_attempts: Arc<AtomicUsize>,
} }
/// The action to perform on the receiver/writer side. /// The action to perform on the receiver/writer side.
@ -165,6 +190,16 @@ impl<'a> WriterBbqueueReceiver<'a> {
} }
None None
} }
/// Returns the total count of attempts to send messages through the BBQueue channel.
pub fn sent_messages_attempts(&self) -> usize {
self.sent_messages_attempts.load(atomic::Ordering::Relaxed)
}
/// Returns the count of attempts to send messages that had to be paused due to BBQueue being full.
pub fn blocking_sent_messages_attempts(&self) -> usize {
self.blocking_sent_messages_attempts.load(atomic::Ordering::Relaxed)
}
} }
pub struct FrameWithHeader<'a> { pub struct FrameWithHeader<'a> {
@ -454,10 +489,17 @@ impl<'b> ExtractorBbqueueSender<'b> {
} }
// Spin loop to have a frame the size we requested. // Spin loop to have a frame the size we requested.
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { reserve_and_write_grant(
payload_header.serialize_into(grant); &mut producer,
Ok(()) total_length,
})?; &self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
payload_header.serialize_into(grant);
Ok(())
},
)?;
Ok(()) Ok(())
} }
@ -496,20 +538,28 @@ impl<'b> ExtractorBbqueueSender<'b> {
} }
// Spin loop to have a frame the size we requested. // Spin loop to have a frame the size we requested.
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { reserve_and_write_grant(
let header_size = payload_header.header_size(); &mut producer,
let (header_bytes, remaining) = grant.split_at_mut(header_size); total_length,
payload_header.serialize_into(header_bytes); &self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
if dimensions != 0 { if dimensions != 0 {
let output_iter = remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>()); let output_iter =
for (embedding, output) in embeddings.iter().zip(output_iter) { remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>());
output.copy_from_slice(bytemuck::cast_slice(embedding)); for (embedding, output) in embeddings.iter().zip(output_iter) {
output.copy_from_slice(bytemuck::cast_slice(embedding));
}
} }
}
Ok(()) Ok(())
})?; },
)?;
Ok(()) Ok(())
} }
@ -567,13 +617,20 @@ impl<'b> ExtractorBbqueueSender<'b> {
} }
// Spin loop to have a frame the size we requested. // Spin loop to have a frame the size we requested.
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { reserve_and_write_grant(
let header_size = payload_header.header_size(); &mut producer,
let (header_bytes, remaining) = grant.split_at_mut(header_size); total_length,
payload_header.serialize_into(header_bytes); &self.sender,
let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize); &self.sent_messages_attempts,
key_value_writer(key_buffer, value_buffer) &self.blocking_sent_messages_attempts,
})?; |grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize);
key_value_writer(key_buffer, value_buffer)
},
)?;
Ok(()) Ok(())
} }
@ -615,12 +672,19 @@ impl<'b> ExtractorBbqueueSender<'b> {
} }
// Spin loop to have a frame the size we requested. // Spin loop to have a frame the size we requested.
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| { reserve_and_write_grant(
let header_size = payload_header.header_size(); &mut producer,
let (header_bytes, remaining) = grant.split_at_mut(header_size); total_length,
payload_header.serialize_into(header_bytes); &self.sender,
key_writer(remaining) &self.sent_messages_attempts,
})?; &self.blocking_sent_messages_attempts,
|grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
key_writer(remaining)
},
)?;
Ok(()) Ok(())
} }
@ -633,12 +697,18 @@ fn reserve_and_write_grant<F>(
producer: &mut FrameProducer, producer: &mut FrameProducer,
total_length: usize, total_length: usize,
sender: &flume::Sender<ReceiverAction>, sender: &flume::Sender<ReceiverAction>,
sent_messages_attempts: &AtomicUsize,
blocking_sent_messages_attempts: &AtomicUsize,
f: F, f: F,
) -> crate::Result<()> ) -> crate::Result<()>
where where
F: FnOnce(&mut [u8]) -> crate::Result<()>, F: FnOnce(&mut [u8]) -> crate::Result<()>,
{ {
loop { loop {
// An attempt means trying multiple times
// and succeeded to send or not.
sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed);
for _ in 0..10_000 { for _ in 0..10_000 {
match producer.grant(total_length) { match producer.grant(total_length) {
Ok(mut grant) => { Ok(mut grant) => {
@ -662,6 +732,10 @@ where
return Err(Error::InternalError(InternalError::AbortedIndexation)); return Err(Error::InternalError(InternalError::AbortedIndexation));
} }
// We made an attempt to send a message in the
// bbqueue channel but it didn't succeeded.
blocking_sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed);
// We prefer to yield and allow the writing thread // We prefer to yield and allow the writing thread
// to do its job, especially beneficial when there // to do its job, especially beneficial when there
// is only one CPU core available. // is only one CPU core available.