Implement a first version of the bbqueue channels

This commit is contained in:
Clément Renault 2024-11-26 12:19:32 +01:00
parent a2f64f6552
commit 79671c9faa
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
3 changed files with 55 additions and 0 deletions

7
Cargo.lock generated
View File

@ -489,6 +489,11 @@ version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "bbqueue"
version = "0.5.1"
source = "git+https://github.com/kerollmops/bbqueue#cbb87cc707b5af415ef203bdaf2443e06ba0d6d4"
[[package]] [[package]]
name = "benchmarks" name = "benchmarks"
version = "1.12.0" version = "1.12.0"
@ -3611,6 +3616,7 @@ version = "1.12.0"
dependencies = [ dependencies = [
"allocator-api2", "allocator-api2",
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"bbqueue",
"big_s", "big_s",
"bimap", "bimap",
"bincode", "bincode",
@ -3623,6 +3629,7 @@ dependencies = [
"candle-transformers", "candle-transformers",
"charabia", "charabia",
"concat-arrays", "concat-arrays",
"crossbeam",
"crossbeam-channel", "crossbeam-channel",
"csv", "csv",
"deserr", "deserr",

View File

@ -98,6 +98,8 @@ allocator-api2 = "0.2.18"
rustc-hash = "2.0.0" rustc-hash = "2.0.0"
uell = "0.1.0" uell = "0.1.0"
enum-iterator = "2.1.0" enum-iterator = "2.1.0"
bbqueue = { git = "https://github.com/kerollmops/bbqueue" }
crossbeam = "0.8.4"
[dev-dependencies] [dev-dependencies]
mimalloc = { version = "0.1.43", default-features = false } mimalloc = { version = "0.1.43", default-features = false }

View File

@ -1,6 +1,7 @@
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use crossbeam::sync::{Parker, Unparker};
use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; use crossbeam_channel::{IntoIter, Receiver, SendError, Sender};
use heed::types::Bytes; use heed::types::Bytes;
use heed::BytesDecode; use heed::BytesDecode;
@ -8,6 +9,7 @@ use memmap2::Mmap;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use super::extract::FacetKind; use super::extract::FacetKind;
use super::thread_local::{FullySend, ThreadLocal};
use super::StdResult; use super::StdResult;
use crate::heed_codec::facet::{FieldDocIdFacetF64Codec, FieldDocIdFacetStringCodec}; use crate::heed_codec::facet::{FieldDocIdFacetF64Codec, FieldDocIdFacetStringCodec};
use crate::index::main_key::{GEO_FACETED_DOCUMENTS_IDS_KEY, GEO_RTREE_KEY}; use crate::index::main_key::{GEO_FACETED_DOCUMENTS_IDS_KEY, GEO_RTREE_KEY};
@ -16,6 +18,50 @@ use crate::update::new::KvReaderFieldId;
use crate::vector::Embedding; use crate::vector::Embedding;
use crate::{DocumentId, Index}; use crate::{DocumentId, Index};
/// Creates a tuple of producer/receivers to be used by
/// the extractors and the writer loop.
///
/// # Safety
///
/// Panics if the number of provided bbqueue is not exactly equal
/// to the number of available threads in the rayon threadpool.
pub fn extractor_writer_bbqueue(
bbqueue: &[bbqueue::BBBuffer],
) -> (ExtractorBbqueueSender, WriterBbqueueReceiver) {
assert_eq!(
bbqueue.len(),
rayon::current_num_threads(),
"You must provide as many BBBuffer as the available number of threads to extract"
);
let parker = Parker::new();
let extractors = ThreadLocal::with_capacity(bbqueue.len());
let producers = rayon::broadcast(|bi| {
let bbqueue = &bbqueue[bi.index()];
let (producer, consumer) = bbqueue.try_split_framed().unwrap();
extractors.get_or(|| FullySend(producer));
consumer
});
(
ExtractorBbqueueSender { inner: extractors, unparker: parker.unparker().clone() },
WriterBbqueueReceiver { inner: producers, parker },
)
}
pub struct ExtractorBbqueueSender<'a> {
inner: ThreadLocal<FullySend<bbqueue::framed::FrameProducer<'a>>>,
/// Used to wake up the receiver thread,
/// Used everytime we write something in the producer.
unparker: Unparker,
}
pub struct WriterBbqueueReceiver<'a> {
inner: Vec<bbqueue::framed::FrameConsumer<'a>>,
/// Used to park when no more work is required
parker: Parker,
}
/// The capacity of the channel is currently in number of messages. /// The capacity of the channel is currently in number of messages.
pub fn extractor_writer_channel(cap: usize) -> (ExtractorSender, WriterReceiver) { pub fn extractor_writer_channel(cap: usize) -> (ExtractorSender, WriterReceiver) {
let (sender, receiver) = crossbeam_channel::bounded(cap); let (sender, receiver) = crossbeam_channel::bounded(cap);