Make the frame consumer pulling fair

This commit is contained in:
Clément Renault 2024-12-02 11:49:01 +01:00
parent d5c07ef7b3
commit e9f34fb4b1
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -1,8 +1,10 @@
use std::cell::RefCell;
use std::io::{self, BufWriter};
use std::iter::Cycle;
use std::marker::PhantomData;
use std::mem;
use std::num::NonZeroU16;
use std::ops::Range;
use std::time::Duration;
use bbqueue::framed::{FrameGrantR, FrameGrantW, FrameProducer};
@ -64,7 +66,11 @@ pub fn extractor_writer_bbqueue(
let (sender, receiver) = flume::bounded(channel_capacity);
let sender = ExtractorBbqueueSender { sender, producers, capacity };
let receiver = WriterBbqueueReceiver { receiver, consumers };
let receiver = WriterBbqueueReceiver {
receiver,
look_at_consumer: (0..consumers.len()).cycle(),
consumers,
};
(sender, receiver)
}
@ -89,6 +95,9 @@ pub struct WriterBbqueueReceiver<'a> {
/// any BBQueue buffer or directly sent throught this channel
/// (still written to disk).
receiver: flume::Receiver<ReceiverAction>,
/// Indicates the consumer to observe. This cycling range
/// ensures fair distribution of work among consumers.
look_at_consumer: Cycle<Range<usize>>,
/// The BBQueue frames to read when waking-up.
consumers: Vec<bbqueue::framed::FrameConsumer<'a>>,
}
@ -148,16 +157,9 @@ impl<'a> WriterBbqueueReceiver<'a> {
}
/// Reads all the BBQueue buffers and selects the first available frame.
///
/// Note: Selecting the first available frame gives preference to
/// frames that will be cleaned up first. It may result in the
/// last frames being more likely to fill up. One potential optimization
/// could involve keeping track of the last processed BBQueue index
/// to cycle through the frames instead of always starting from the
/// beginning.
pub fn recv_frame(&mut self) -> Option<FrameWithHeader<'a>> {
for consumer in &mut self.consumers {
if let Some(frame) = consumer.read() {
for index in self.look_at_consumer.by_ref().take(self.consumers.len()) {
if let Some(frame) = self.consumers[index].read() {
return Some(FrameWithHeader::from(frame));
}
}
@ -511,9 +513,6 @@ impl<'b> ExtractorBbqueueSender<'b> {
}
}
// We could commit only the used memory.
grant.commit(total_length);
// We only send a wake up message when the channel is empty
// so that we don't fill the channel with too many WakeUps.
if self.sender.is_empty() {