diff --git a/Cargo.lock b/Cargo.lock index 8a0a6b3d0..038b269ce 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1910,6 +1910,15 @@ dependencies = [ "serde_json", ] +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -3623,6 +3632,7 @@ dependencies = [ "enum-iterator", "filter-parser", "flatten-serde-json", + "flume", "fst", "fxhash", "geoutils", @@ -5180,6 +5190,9 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] [[package]] name = "spm_precompiled" diff --git a/crates/milli/Cargo.toml b/crates/milli/Cargo.toml index b66dec9a4..a88401470 100644 --- a/crates/milli/Cargo.toml +++ b/crates/milli/Cargo.toml @@ -99,6 +99,7 @@ rustc-hash = "2.0.0" uell = "0.1.0" enum-iterator = "2.1.0" bbqueue = { git = "https://github.com/kerollmops/bbqueue" } +flume = { version = "0.11.1", default-features = false } [dev-dependencies] mimalloc = { version = "0.1.43", default-features = false } diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index bebaad686..e8bb6930c 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -4,10 +4,10 @@ use std::marker::PhantomData; use std::mem; use std::num::NonZeroU16; -use bbqueue::framed::{FrameGrantR, FrameProducer}; +use bbqueue::framed::{FrameGrantR, FrameGrantW, FrameProducer}; use bbqueue::BBBuffer; use bytemuck::{checked, CheckedBitPattern, NoUninit}; -use crossbeam_channel::SendError; +use flume::SendError; use heed::types::Bytes; use heed::BytesDecode; use memmap2::{Mmap, MmapMut}; @@ -33,7 +33,7 @@ use crate::{CboRoaringBitmapCodec, DocumentId, Index}; /// /// The `channel_capacity` parameter defines the number of /// too-large-to-fit-in-BBQueue entries that can be sent through -/// a crossbeam channel. This parameter must stay low to make +/// a flume channel. This parameter must stay low to make /// sure we do not use too much memory. /// /// Note that the channel is also used to wake-up the receiver @@ -61,7 +61,7 @@ pub fn extractor_writer_bbqueue( consumer }); - let (sender, receiver) = crossbeam_channel::bounded(channel_capacity); + let (sender, receiver) = flume::bounded(channel_capacity); let sender = ExtractorBbqueueSender { sender, producers, capacity }; let receiver = WriterBbqueueReceiver { receiver, consumers }; (sender, receiver) @@ -70,7 +70,7 @@ pub fn extractor_writer_bbqueue( pub struct ExtractorBbqueueSender<'a> { /// This channel is used to wake-up the receiver and /// send large entries that cannot fit in the BBQueue. - sender: crossbeam_channel::Sender, + sender: flume::Sender, /// A memory buffer, one by thread, is used to serialize /// the entries directly in this shared, lock-free space. producers: ThreadLocal>>>, @@ -87,7 +87,7 @@ pub struct WriterBbqueueReceiver<'a> { /// Used to wake up when new entries are available either in /// any BBQueue buffer or directly sent throught this channel /// (still written to disk). - receiver: crossbeam_channel::Receiver, + receiver: flume::Receiver, /// The BBQueue frames to read when waking-up. consumers: Vec>, } @@ -437,19 +437,9 @@ impl<'b> ExtractorBbqueueSender<'b> { } // Spin loop to have a frame the size we requested. - let mut grant = loop { - match producer.grant(total_length) { - Ok(grant) => break grant, - Err(bbqueue::Error::InsufficientSize) => continue, - Err(e) => unreachable!("{e:?}"), - } - }; - + let mut grant = reserve_grant(&mut producer, total_length, &self.sender); payload_header.serialize_into(&mut grant); - // We could commit only the used memory. - grant.commit(total_length); - // We only send a wake up message when the channel is empty // so that we don't fill the channel with too many WakeUps. if self.sender.is_empty() { @@ -494,13 +484,7 @@ impl<'b> ExtractorBbqueueSender<'b> { } // Spin loop to have a frame the size we requested. - let mut grant = loop { - match producer.grant(total_length) { - Ok(grant) => break grant, - Err(bbqueue::Error::InsufficientSize) => continue, - Err(e) => unreachable!("{e:?}"), - } - }; + let mut grant = reserve_grant(&mut producer, total_length, &self.sender); let header_size = payload_header.header_size(); let (header_bytes, remaining) = grant.split_at_mut(header_size); @@ -571,13 +555,7 @@ impl<'b> ExtractorBbqueueSender<'b> { } // Spin loop to have a frame the size we requested. - let mut grant = loop { - match producer.grant(total_length) { - Ok(grant) => break grant, - Err(bbqueue::Error::InsufficientSize) => continue, - Err(e) => unreachable!("{e:?}"), - } - }; + let mut grant = reserve_grant(&mut producer, total_length, &self.sender); let header_size = payload_header.header_size(); let (header_bytes, remaining) = grant.split_at_mut(header_size); @@ -585,9 +563,6 @@ impl<'b> ExtractorBbqueueSender<'b> { let (key_buffer, value_buffer) = remaining.split_at_mut(key_length.get() as usize); key_value_writer(key_buffer, value_buffer)?; - // We could commit only the used memory. - grant.commit(total_length); - // We only send a wake up message when the channel is empty // so that we don't fill the channel with too many WakeUps. if self.sender.is_empty() { @@ -628,22 +603,13 @@ impl<'b> ExtractorBbqueueSender<'b> { } // Spin loop to have a frame the size we requested. - let mut grant = loop { - match producer.grant(total_length) { - Ok(grant) => break grant, - Err(bbqueue::Error::InsufficientSize) => continue, - Err(e) => unreachable!("{e:?}"), - } - }; + let mut grant = reserve_grant(&mut producer, total_length, &self.sender); let header_size = payload_header.header_size(); let (header_bytes, remaining) = grant.split_at_mut(header_size); payload_header.serialize_into(header_bytes); key_writer(remaining)?; - // We could commit only the used memory. - grant.commit(total_length); - // We only send a wake up message when the channel is empty // so that we don't fill the channel with too many WakeUps. if self.sender.is_empty() { @@ -654,6 +620,31 @@ impl<'b> ExtractorBbqueueSender<'b> { } } +/// Try to reserve a frame grant of `total_length` by spin looping +/// on the BBQueue buffer and panics if the receiver has been disconnected. +fn reserve_grant<'b>( + producer: &mut FrameProducer<'b>, + total_length: usize, + sender: &flume::Sender, +) -> FrameGrantW<'b> { + loop { + for _ in 0..10_000 { + match producer.grant(total_length) { + Ok(mut grant) => { + // We could commit only the used memory. + grant.to_commit(total_length); + return grant; + } + Err(bbqueue::Error::InsufficientSize) => continue, + Err(e) => unreachable!("{e:?}"), + } + } + if sender.is_disconnected() { + panic!("channel is disconnected"); + } + } +} + pub enum ExactWordDocids {} pub enum FidWordCountDocids {} pub enum WordDocids {}