5177: Debug log  the channel congestion r=Kerollmops a=Kerollmops

This PR displays the congestion of the BBQueue channel and the allocated memory for the channel and the extraction. This information can be beneficial for debugging and noticing slow disks. We show three pieces of information in debug:
- The direct attempts: the number of tries to send something in the BBQueue channel,
- The blocked attempts: the number of unsuccessful attempts that must be retried,
- The congestion: The percentage of blocking attempts. The higher, the slower the receiver and, therefore, the disk.

Co-authored-by: Kerollmops <clement@meilisearch.com>
Co-authored-by: Clément Renault <clement@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2025-01-29 15:35:31 +00:00 committed by GitHub
commit 4224edea28
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 127 additions and 29 deletions

View File

@ -5,6 +5,8 @@ use std::marker::PhantomData;
use std::mem;
use std::num::NonZeroU16;
use std::ops::Range;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::Arc;
use std::time::Duration;
use bbqueue::framed::{FrameGrantR, FrameProducer};
@ -71,12 +73,23 @@ pub fn extractor_writer_bbqueue(
consumer
});
let sent_messages_attempts = Arc::new(AtomicUsize::new(0));
let blocking_sent_messages_attempts = Arc::new(AtomicUsize::new(0));
let (sender, receiver) = flume::bounded(channel_capacity);
let sender = ExtractorBbqueueSender { sender, producers, max_grant };
let sender = ExtractorBbqueueSender {
sender,
producers,
max_grant,
sent_messages_attempts: sent_messages_attempts.clone(),
blocking_sent_messages_attempts: blocking_sent_messages_attempts.clone(),
};
let receiver = WriterBbqueueReceiver {
receiver,
look_at_consumer: (0..consumers.len()).cycle(),
consumers,
sent_messages_attempts,
blocking_sent_messages_attempts,
};
(sender, receiver)
}
@ -92,6 +105,12 @@ pub struct ExtractorBbqueueSender<'a> {
/// It will never be able to store more than that as the
/// buffer cannot split data into two parts.
max_grant: usize,
/// The total number of attempts to send messages
/// over the bbqueue channel.
sent_messages_attempts: Arc<AtomicUsize>,
/// The number of times an attempt to send a
/// messages failed and we had to pause for a bit.
blocking_sent_messages_attempts: Arc<AtomicUsize>,
}
pub struct WriterBbqueueReceiver<'a> {
@ -104,6 +123,12 @@ pub struct WriterBbqueueReceiver<'a> {
look_at_consumer: Cycle<Range<usize>>,
/// The BBQueue frames to read when waking-up.
consumers: Vec<bbqueue::framed::FrameConsumer<'a>>,
/// The total number of attempts to send messages
/// over the bbqueue channel.
sent_messages_attempts: Arc<AtomicUsize>,
/// The number of times an attempt to send a
/// message failed and we had to pause for a bit.
blocking_sent_messages_attempts: Arc<AtomicUsize>,
}
/// The action to perform on the receiver/writer side.
@ -169,6 +194,16 @@ impl<'a> WriterBbqueueReceiver<'a> {
}
None
}
/// Returns the total count of attempts to send messages through the BBQueue channel.
pub fn sent_messages_attempts(&self) -> usize {
self.sent_messages_attempts.load(atomic::Ordering::Relaxed)
}
/// Returns the count of attempts to send messages that had to be paused due to BBQueue being full.
pub fn blocking_sent_messages_attempts(&self) -> usize {
self.blocking_sent_messages_attempts.load(atomic::Ordering::Relaxed)
}
}
pub struct FrameWithHeader<'a> {
@ -458,10 +493,17 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
// Spin loop to have a frame the size we requested.
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
payload_header.serialize_into(grant);
Ok(())
})?;
reserve_and_write_grant(
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
payload_header.serialize_into(grant);
Ok(())
},
)?;
Ok(())
}
@ -500,20 +542,28 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
// Spin loop to have a frame the size we requested.
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
reserve_and_write_grant(
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
if dimensions != 0 {
let output_iter = remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>());
for (embedding, output) in embeddings.iter().zip(output_iter) {
output.copy_from_slice(bytemuck::cast_slice(embedding));
if dimensions != 0 {
let output_iter =
remaining.chunks_exact_mut(dimensions * mem::size_of::<f32>());
for (embedding, output) in embeddings.iter().zip(output_iter) {
output.copy_from_slice(bytemuck::cast_slice(embedding));
}
}
}
Ok(())
})?;
Ok(())
},
)?;
Ok(())
}
@ -571,13 +621,20 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
// Spin loop to have a frame the size we requested.
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize);
key_value_writer(key_buffer, value_buffer)
})?;
reserve_and_write_grant(
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize);
key_value_writer(key_buffer, value_buffer)
},
)?;
Ok(())
}
@ -619,12 +676,19 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
// Spin loop to have a frame the size we requested.
reserve_and_write_grant(&mut producer, total_length, &self.sender, |grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
key_writer(remaining)
})?;
reserve_and_write_grant(
&mut producer,
total_length,
&self.sender,
&self.sent_messages_attempts,
&self.blocking_sent_messages_attempts,
|grant| {
let header_size = payload_header.header_size();
let (header_bytes, remaining) = grant.split_at_mut(header_size);
payload_header.serialize_into(header_bytes);
key_writer(remaining)
},
)?;
Ok(())
}
@ -637,12 +701,18 @@ fn reserve_and_write_grant<F>(
producer: &mut FrameProducer,
total_length: usize,
sender: &flume::Sender<ReceiverAction>,
sent_messages_attempts: &AtomicUsize,
blocking_sent_messages_attempts: &AtomicUsize,
f: F,
) -> crate::Result<()>
where
F: FnOnce(&mut [u8]) -> crate::Result<()>,
{
loop {
// An attempt means trying multiple times
// whether is succeeded or not.
sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed);
for _ in 0..10_000 {
match producer.grant(total_length) {
Ok(mut grant) => {
@ -666,6 +736,10 @@ where
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
// We made an attempt to send a message in the
// bbqueue channel but it didn't succeed.
blocking_sent_messages_attempts.fetch_add(1, atomic::Ordering::Relaxed);
// We prefer to yield and allow the writing thread
// to do its job, especially beneficial when there
// is only one CPU core available.

View File

@ -21,6 +21,7 @@ use crate::progress::Progress;
use crate::update::GrenadParameters;
use crate::vector::{ArroyWrapper, EmbeddingConfigs};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort};
use std::sync::Once;
pub(crate) mod de;
pub mod document_changes;
@ -33,6 +34,8 @@ mod post_processing;
mod update_by_function;
mod write;
static LOG_MEMORY_METRICS_ONCE: Once = Once::new();
/// This is the main function of this crate.
///
/// Give it the output of the [`Indexer::document_changes`] method and it will execute it in the [`rayon::ThreadPool`].
@ -93,6 +96,15 @@ where
},
);
LOG_MEMORY_METRICS_ONCE.call_once(|| {
tracing::debug!(
"Indexation allocated memory metrics - \
Total BBQueue size: {total_bbbuffer_capacity}, \
Total extractor memory: {:?}",
grenad_parameters.max_memory,
);
});
let (extractor_sender, writer_receiver) = pool
.install(|| extractor_writer_bbqueue(&mut bbbuffers, total_bbbuffer_capacity, 1000))
.unwrap();

View File

@ -72,7 +72,19 @@ pub(super) fn write_to_db(
&mut aligned_embedding,
)?;
}
write_from_bbqueue(&mut writer_receiver, index, wtxn, arroy_writers, &mut aligned_embedding)?;
let direct_attempts = writer_receiver.sent_messages_attempts();
let blocking_attempts = writer_receiver.blocking_sent_messages_attempts();
let congestion_pct = (blocking_attempts as f64 / direct_attempts as f64) * 100.0;
tracing::debug!(
"Channel congestion metrics - \
Attempts: {direct_attempts}, \
Blocked attempts: {blocking_attempts} \
({congestion_pct:.1}% congestion)"
);
Ok(())
}