Do spurious wake ups on the receiver side

This commit is contained in:
Clément Renault 2024-12-02 10:42:47 +01:00
parent 263c5a348e
commit bcab61ab1d
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -3,11 +3,12 @@ use std::io::{self, BufWriter};
use std::marker::PhantomData;
use std::mem;
use std::num::NonZeroU16;
use std::time::Duration;
use bbqueue::framed::{FrameGrantR, FrameGrantW, FrameProducer};
use bbqueue::BBBuffer;
use bytemuck::{checked, CheckedBitPattern, NoUninit};
use flume::SendError;
use flume::{RecvTimeoutError, SendError};
use heed::types::Bytes;
use heed::BytesDecode;
use memmap2::{Mmap, MmapMut};
@ -136,10 +137,24 @@ impl LargeVectors {
}
impl<'a> WriterBbqueueReceiver<'a> {
/// Tries to receive an action to do until the timeout occurs
/// and if it does, consider it as a spurious wake up.
pub fn recv_action(&mut self) -> Option<ReceiverAction> {
self.receiver.recv().ok()
match self.receiver.recv_timeout(Duration::from_millis(100)) {
Ok(action) => Some(action),
Err(RecvTimeoutError::Timeout) => Some(ReceiverAction::WakeUp),
Err(RecvTimeoutError::Disconnected) => None,
}
}
/// Reads all the BBQueue buffers and selects the first available frame.
///
/// Note: Selecting the first available frame gives preference to
/// frames that will be cleaned up first. It may result in the
/// last frames being more likely to fill up. One potential optimization
/// could involve keeping track of the last processed BBQueue index
/// to cycle through the frames instead of always starting from the
/// beginning.
pub fn recv_frame(&mut self) -> Option<FrameWithHeader<'a>> {
for consumer in &mut self.consumers {
if let Some(frame) = consumer.read() {