Implement mostly all senders

This commit is contained in:
Clément Renault 2024-11-26 18:30:44 +01:00
parent 79671c9faa
commit 8442db8101
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
6 changed files with 398 additions and 304 deletions

View File

@ -41,6 +41,25 @@ impl CboRoaringBitmapCodec {
} }
} }
pub fn serialize_into_writer<W: io::Write>(
roaring: &RoaringBitmap,
mut writer: W,
) -> io::Result<()> {
if roaring.len() <= THRESHOLD as u64 {
// If the number of items (u32s) to encode is less than or equal to the threshold
// it means that it would weigh the same or less than the RoaringBitmap
// header, so we directly encode them using ByteOrder instead.
for integer in roaring {
writer.write_u32::<NativeEndian>(integer)?;
}
} else {
// Otherwise, we use the classic RoaringBitmapCodec that writes a header.
roaring.serialize_into(writer)?;
}
Ok(())
}
pub fn deserialize_from(mut bytes: &[u8]) -> io::Result<RoaringBitmap> { pub fn deserialize_from(mut bytes: &[u8]) -> io::Result<RoaringBitmap> {
if bytes.len() <= THRESHOLD * size_of::<u32>() { if bytes.len() <= THRESHOLD * size_of::<u32>() {
// If there is threshold or less than threshold integers that can fit into this array // If there is threshold or less than threshold integers that can fit into this array

View File

@ -1,14 +1,19 @@
use std::cell::RefCell;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::num::NonZeroU16;
use std::{mem, slice};
use bbqueue::framed::{FrameGrantR, FrameProducer};
use bytemuck::{NoUninit, CheckedBitPattern};
use crossbeam::sync::{Parker, Unparker}; use crossbeam::sync::{Parker, Unparker};
use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; use crossbeam_channel::{IntoIter, Receiver, SendError};
use heed::types::Bytes; use heed::types::Bytes;
use heed::BytesDecode; use heed::BytesDecode;
use memmap2::Mmap; use memmap2::Mmap;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use super::extract::FacetKind; use super::extract::FacetKind;
use super::ref_cell_ext::RefCellExt;
use super::thread_local::{FullySend, ThreadLocal}; use super::thread_local::{FullySend, ThreadLocal};
use super::StdResult; use super::StdResult;
use crate::heed_codec::facet::{FieldDocIdFacetF64Codec, FieldDocIdFacetStringCodec}; use crate::heed_codec::facet::{FieldDocIdFacetF64Codec, FieldDocIdFacetStringCodec};
@ -16,7 +21,7 @@ use crate::index::main_key::{GEO_FACETED_DOCUMENTS_IDS_KEY, GEO_RTREE_KEY};
use crate::index::{db_name, IndexEmbeddingConfig}; use crate::index::{db_name, IndexEmbeddingConfig};
use crate::update::new::KvReaderFieldId; use crate::update::new::KvReaderFieldId;
use crate::vector::Embedding; use crate::vector::Embedding;
use crate::{DocumentId, Index}; use crate::{CboRoaringBitmapCodec, DocumentId, Index};
/// Creates a tuple of producer/receivers to be used by /// Creates a tuple of producer/receivers to be used by
/// the extractors and the writer loop. /// the extractors and the writer loop.
@ -26,125 +31,97 @@ use crate::{DocumentId, Index};
/// Panics if the number of provided bbqueue is not exactly equal /// Panics if the number of provided bbqueue is not exactly equal
/// to the number of available threads in the rayon threadpool. /// to the number of available threads in the rayon threadpool.
pub fn extractor_writer_bbqueue( pub fn extractor_writer_bbqueue(
bbqueue: &[bbqueue::BBBuffer], bbbuffers: &[bbqueue::BBBuffer],
) -> (ExtractorBbqueueSender, WriterBbqueueReceiver) { ) -> (ExtractorBbqueueSender, WriterBbqueueReceiver) {
assert_eq!( assert_eq!(
bbqueue.len(), bbbuffers.len(),
rayon::current_num_threads(), rayon::current_num_threads(),
"You must provide as many BBBuffer as the available number of threads to extract" "You must provide as many BBBuffer as the available number of threads to extract"
); );
let capacity = bbbuffers.first().unwrap().capacity();
let parker = Parker::new(); let parker = Parker::new();
let extractors = ThreadLocal::with_capacity(bbqueue.len()); let extractors = ThreadLocal::with_capacity(bbbuffers.len());
let producers = rayon::broadcast(|bi| { let producers = rayon::broadcast(|bi| {
let bbqueue = &bbqueue[bi.index()]; let bbqueue = &bbbuffers[bi.index()];
let (producer, consumer) = bbqueue.try_split_framed().unwrap(); let (producer, consumer) = bbqueue.try_split_framed().unwrap();
extractors.get_or(|| FullySend(producer)); extractors.get_or(|| FullySend(RefCell::new(producer)));
consumer consumer
}); });
( (
ExtractorBbqueueSender { inner: extractors, unparker: parker.unparker().clone() }, ExtractorBbqueueSender {
inner: extractors,
capacity: capacity.checked_sub(9).unwrap(),
unparker: parker.unparker().clone(),
},
WriterBbqueueReceiver { inner: producers, parker }, WriterBbqueueReceiver { inner: producers, parker },
) )
} }
pub struct ExtractorBbqueueSender<'a> {
inner: ThreadLocal<FullySend<bbqueue::framed::FrameProducer<'a>>>,
/// Used to wake up the receiver thread,
/// Used everytime we write something in the producer.
unparker: Unparker,
}
pub struct WriterBbqueueReceiver<'a> { pub struct WriterBbqueueReceiver<'a> {
inner: Vec<bbqueue::framed::FrameConsumer<'a>>, inner: Vec<bbqueue::framed::FrameConsumer<'a>>,
/// Used to park when no more work is required /// Used to park when no more work is required
parker: Parker, parker: Parker,
} }
/// The capacity of the channel is currently in number of messages. impl<'a> WriterBbqueueReceiver<'a> {
pub fn extractor_writer_channel(cap: usize) -> (ExtractorSender, WriterReceiver) { pub fn read(&mut self) -> Option<FrameWithHeader<'a>> {
let (sender, receiver) = crossbeam_channel::bounded(cap); loop {
( for consumer in &mut self.inner {
ExtractorSender { // mark the frame as auto release
sender, if let Some() = consumer.read()
send_count: Default::default(), }
writer_contentious_count: Default::default(), break None;
extractor_contentious_count: Default::default(),
},
WriterReceiver(receiver),
)
}
pub enum KeyValueEntry {
Small { key_length: usize, data: Box<[u8]> },
Large { key_entry: KeyEntry, data: Mmap },
}
impl KeyValueEntry {
pub fn from_small_key_value(key: &[u8], value: &[u8]) -> Self {
let mut data = Vec::with_capacity(key.len() + value.len());
data.extend_from_slice(key);
data.extend_from_slice(value);
KeyValueEntry::Small { key_length: key.len(), data: data.into_boxed_slice() }
}
fn from_large_key_value(key: &[u8], value: Mmap) -> Self {
KeyValueEntry::Large { key_entry: KeyEntry::from_key(key), data: value }
}
pub fn key(&self) -> &[u8] {
match self {
KeyValueEntry::Small { key_length, data } => &data[..*key_length],
KeyValueEntry::Large { key_entry, data: _ } => key_entry.entry(),
}
}
pub fn value(&self) -> &[u8] {
match self {
KeyValueEntry::Small { key_length, data } => &data[*key_length..],
KeyValueEntry::Large { key_entry: _, data } => &data[..],
} }
} }
} }
pub struct KeyEntry { struct FrameWithHeader<'a> {
data: Box<[u8]>, header: EntryHeader,
frame: FrameGrantR<'a>,
} }
impl KeyEntry { #[derive(Debug, Clone, Copy, CheckedBitPattern)]
pub fn from_key(key: &[u8]) -> Self { #[repr(u8)]
KeyEntry { data: key.to_vec().into_boxed_slice() } enum EntryHeader {
/// Wether a put of the key/value pair or a delete of the given key.
DbOperation {
/// The database on which to perform the operation.
database: Database,
/// The key length in the buffer.
///
/// If None it means that the buffer is dedicated
/// to the key and it is therefore a deletion operation.
key_length: Option<NonZeroU16>,
},
ArroyDeleteVector {
docid: DocumentId,
},
/// The embedding is the remaining space and represents a non-aligned [f32].
ArroySetVector {
docid: DocumentId,
embedder_id: u8,
},
}
impl EntryHeader {
fn delete_key_size(key_length: u16) -> usize {
mem::size_of::<Self>() + key_length as usize
} }
pub fn entry(&self) -> &[u8] { fn put_key_value_size(key_length: u16, value_length: usize) -> usize {
self.data.as_ref() mem::size_of::<Self>() + key_length as usize + value_length
}
fn bytes_of(&self) -> &[u8] {
/// TODO do the variant matching ourselves
todo!()
} }
} }
pub enum EntryOperation { #[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)]
Delete(KeyEntry), #[repr(u32)]
Write(KeyValueEntry),
}
pub enum WriterOperation {
DbOperation(DbOperation),
ArroyOperation(ArroyOperation),
}
pub enum ArroyOperation {
DeleteVectors { docid: DocumentId },
SetVectors { docid: DocumentId, embedder_id: u8, embeddings: Vec<Embedding> },
SetVector { docid: DocumentId, embedder_id: u8, embedding: Embedding },
Finish { configs: Vec<IndexEmbeddingConfig> },
}
pub struct DbOperation {
database: Database,
entry: EntryOperation,
}
#[derive(Debug)]
pub enum Database { pub enum Database {
Main, Main,
Documents, Documents,
@ -220,82 +197,46 @@ impl From<FacetKind> for Database {
} }
} }
impl DbOperation { pub struct ExtractorBbqueueSender<'a> {
pub fn database(&self, index: &Index) -> heed::Database<Bytes, Bytes> { inner: ThreadLocal<FullySend<RefCell<FrameProducer<'a>>>>,
self.database.database(index) /// The capacity of this frame producer, will never be able to store more than that.
} ///
/// Note that the FrameProducer requires up to 9 bytes to encode the length,
pub fn database_name(&self) -> &'static str { /// the capacity has been shrinked accordingly.
self.database.database_name() ///
} /// <https://docs.rs/bbqueue/latest/bbqueue/framed/index.html#frame-header>
capacity: usize,
pub fn entry(self) -> EntryOperation { /// Used to wake up the receiver thread,
self.entry /// Used everytime we write something in the producer.
} unparker: Unparker,
} }
pub struct WriterReceiver(Receiver<WriterOperation>); impl<'b> ExtractorBbqueueSender<'b> {
pub fn docids<'a, D: DatabaseType>(&'a self) -> WordDocidsSender<'a, 'b, D> {
impl IntoIterator for WriterReceiver {
type Item = WriterOperation;
type IntoIter = IntoIter<Self::Item>;
fn into_iter(self) -> Self::IntoIter {
self.0.into_iter()
}
}
pub struct ExtractorSender {
sender: Sender<WriterOperation>,
/// The number of message we sent in total in the channel.
send_count: AtomicUsize,
/// The number of times we sent something in a channel that was full.
writer_contentious_count: AtomicUsize,
/// The number of times we sent something in a channel that was empty.
extractor_contentious_count: AtomicUsize,
}
impl Drop for ExtractorSender {
fn drop(&mut self) {
let send_count = *self.send_count.get_mut();
let writer_contentious_count = *self.writer_contentious_count.get_mut();
let extractor_contentious_count = *self.extractor_contentious_count.get_mut();
tracing::debug!(
"Extractor channel stats: {send_count} sends, \
{writer_contentious_count} writer contentions ({}%), \
{extractor_contentious_count} extractor contentions ({}%)",
(writer_contentious_count as f32 / send_count as f32) * 100.0,
(extractor_contentious_count as f32 / send_count as f32) * 100.0
)
}
}
impl ExtractorSender {
pub fn docids<D: DatabaseType>(&self) -> WordDocidsSender<'_, D> {
WordDocidsSender { sender: self, _marker: PhantomData } WordDocidsSender { sender: self, _marker: PhantomData }
} }
pub fn facet_docids(&self) -> FacetDocidsSender<'_> { pub fn facet_docids<'a>(&'a self) -> FacetDocidsSender<'a, 'b> {
FacetDocidsSender { sender: self } FacetDocidsSender { sender: self }
} }
pub fn field_id_docid_facet_sender(&self) -> FieldIdDocidFacetSender<'_> { pub fn field_id_docid_facet_sender<'a>(&'a self) -> FieldIdDocidFacetSender<'a, 'b> {
FieldIdDocidFacetSender(self) FieldIdDocidFacetSender(&self)
} }
pub fn documents(&self) -> DocumentsSender<'_> { pub fn documents<'a>(&'a self) -> DocumentsSender<'a, 'b> {
DocumentsSender(self) DocumentsSender(&self)
} }
pub fn embeddings(&self) -> EmbeddingSender<'_> { pub fn embeddings<'a>(&'a self) -> EmbeddingSender<'a, 'b> {
EmbeddingSender(&self.sender) EmbeddingSender(&self)
} }
pub fn geo(&self) -> GeoSender<'_> { pub fn geo<'a>(&'a self) -> GeoSender<'a, 'b> {
GeoSender(&self.sender) GeoSender(&self)
} }
fn send_delete_vector(&self, docid: DocumentId) -> StdResult<(), SendError<()>> { fn send_delete_vector(&self, docid: DocumentId) -> crate::Result<()> {
match self match self
.sender .sender
.send(WriterOperation::ArroyOperation(ArroyOperation::DeleteVectors { docid })) .send(WriterOperation::ArroyOperation(ArroyOperation::DeleteVectors { docid }))
@ -305,18 +246,69 @@ impl ExtractorSender {
} }
} }
fn send_db_operation(&self, op: DbOperation) -> StdResult<(), SendError<()>> { fn write_key_value(&self, database: Database, key: &[u8], value: &[u8]) -> crate::Result<()> {
if self.sender.is_full() { let capacity = self.capacity;
self.writer_contentious_count.fetch_add(1, Ordering::SeqCst); let refcell = self.inner.get().unwrap();
} let mut producer = refcell.0.borrow_mut_or_yield();
if self.sender.is_empty() {
self.extractor_contentious_count.fetch_add(1, Ordering::SeqCst); let key_length = key.len().try_into().unwrap();
let value_length = value.len();
let total_length = EntryHeader::put_key_value_size(key_length, value_length);
if total_length > capacity {
unreachable!("entry larger that the bbqueue capacity");
} }
self.send_count.fetch_add(1, Ordering::SeqCst); let payload_header =
match self.sender.send(WriterOperation::DbOperation(op)) { EntryHeader::DbOperation { database, key_length: NonZeroU16::new(key_length) };
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())), loop {
let mut grant = match producer.grant(total_length) {
Ok(grant) => grant,
Err(bbqueue::Error::InsufficientSize) => continue,
Err(e) => unreachable!("{e:?}"),
};
let (header, remaining) = grant.split_at_mut(mem::size_of::<EntryHeader>());
header.copy_from_slice(payload_header.bytes_of());
let (key_out, value_out) = remaining.split_at_mut(key.len());
key_out.copy_from_slice(key);
value_out.copy_from_slice(value);
// We could commit only the used memory.
grant.commit(total_length);
break Ok(());
}
}
fn delete_entry(&self, database: Database, key: &[u8]) -> crate::Result<()> {
let capacity = self.capacity;
let refcell = self.inner.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield();
let key_length = key.len().try_into().unwrap();
let total_length = EntryHeader::delete_key_size(key_length);
if total_length > capacity {
unreachable!("entry larger that the bbqueue capacity");
}
let payload_header = EntryHeader::DbOperation { database, key_length: None };
loop {
let mut grant = match producer.grant(total_length) {
Ok(grant) => grant,
Err(bbqueue::Error::InsufficientSize) => continue,
Err(e) => unreachable!("{e:?}"),
};
let (header, remaining) = grant.split_at_mut(mem::size_of::<EntryHeader>());
header.copy_from_slice(payload_header.bytes_of());
remaining.copy_from_slice(key);
// We could commit only the used memory.
grant.commit(total_length);
break Ok(());
} }
} }
} }
@ -356,159 +348,237 @@ impl DatabaseType for WordPositionDocids {
const DATABASE: Database = Database::WordPositionDocids; const DATABASE: Database = Database::WordPositionDocids;
} }
pub trait DocidsSender { pub struct WordDocidsSender<'a, 'b, D> {
fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>>; sender: &'a ExtractorBbqueueSender<'b>,
fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>>;
}
pub struct WordDocidsSender<'a, D> {
sender: &'a ExtractorSender,
_marker: PhantomData<D>, _marker: PhantomData<D>,
} }
impl<D: DatabaseType> DocidsSender for WordDocidsSender<'_, D> { impl<D: DatabaseType> WordDocidsSender<'_, '_, D> {
fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { pub fn write(&self, key: &[u8], bitmap: &RoaringBitmap) -> crate::Result<()> {
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value)); let capacity = self.sender.capacity;
match self.sender.send_db_operation(DbOperation { database: D::DATABASE, entry }) { let refcell = self.sender.inner.get().unwrap();
Ok(()) => Ok(()), let mut producer = refcell.0.borrow_mut_or_yield();
Err(SendError(_)) => Err(SendError(())),
let key_length = key.len().try_into().unwrap();
let value_length = CboRoaringBitmapCodec::serialized_size(bitmap);
let total_length = EntryHeader::put_key_value_size(key_length, value_length);
if total_length > capacity {
unreachable!("entry larger that the bbqueue capacity");
} }
}
fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> { let payload_header = EntryHeader::DbOperation {
let entry = EntryOperation::Delete(KeyEntry::from_key(key)); database: D::DATABASE,
match self.sender.send_db_operation(DbOperation { database: D::DATABASE, entry }) { key_length: NonZeroU16::new(key_length),
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
}
}
pub struct FacetDocidsSender<'a> {
sender: &'a ExtractorSender,
}
impl DocidsSender for FacetDocidsSender<'_> {
fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> {
let (facet_kind, key) = FacetKind::extract_from_key(key);
let database = Database::from(facet_kind);
let entry = match facet_kind {
// skip level group size
FacetKind::String | FacetKind::Number => {
// add facet group size
let value = [&[1], value].concat();
EntryOperation::Write(KeyValueEntry::from_small_key_value(key, &value))
}
_ => EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value)),
}; };
match self.sender.send_db_operation(DbOperation { database, entry }) {
Ok(()) => Ok(()), loop {
Err(SendError(_)) => Err(SendError(())), let mut grant = match producer.grant(total_length) {
Ok(grant) => grant,
Err(bbqueue::Error::InsufficientSize) => continue,
Err(e) => unreachable!("{e:?}"),
};
let (header, remaining) = grant.split_at_mut(mem::size_of::<EntryHeader>());
header.copy_from_slice(payload_header.bytes_of());
let (key_out, value_out) = remaining.split_at_mut(key.len());
key_out.copy_from_slice(key);
CboRoaringBitmapCodec::serialize_into_writer(bitmap, value_out)?;
// We could commit only the used memory.
grant.commit(total_length);
break Ok(());
} }
} }
fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> { pub fn delete(&self, key: &[u8]) -> crate::Result<()> {
let capacity = self.sender.capacity;
let refcell = self.sender.inner.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield();
let key_length = key.len().try_into().unwrap();
let total_length = EntryHeader::delete_key_size(key_length);
if total_length > capacity {
unreachable!("entry larger that the bbqueue capacity");
}
let payload_header = EntryHeader::DbOperation { database: D::DATABASE, key_length: None };
loop {
let mut grant = match producer.grant(total_length) {
Ok(grant) => grant,
Err(bbqueue::Error::InsufficientSize) => continue,
Err(e) => unreachable!("{e:?}"),
};
let (header, remaining) = grant.split_at_mut(mem::size_of::<EntryHeader>());
header.copy_from_slice(payload_header.bytes_of());
remaining.copy_from_slice(key);
// We could commit only the used memory.
grant.commit(total_length);
break Ok(());
}
}
}
pub struct FacetDocidsSender<'a, 'b> {
sender: &'a ExtractorBbqueueSender<'b>,
}
impl FacetDocidsSender<'_, '_> {
pub fn write(&self, key: &[u8], bitmap: &RoaringBitmap) -> crate::Result<()> {
let capacity = self.sender.capacity;
let refcell = self.sender.inner.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield();
let (facet_kind, key) = FacetKind::extract_from_key(key); let (facet_kind, key) = FacetKind::extract_from_key(key);
let database = Database::from(facet_kind); let key_length = key.len().try_into().unwrap();
let entry = EntryOperation::Delete(KeyEntry::from_key(key));
match self.sender.send_db_operation(DbOperation { database, entry }) { let value_length = CboRoaringBitmapCodec::serialized_size(bitmap);
Ok(()) => Ok(()), let value_length = match facet_kind {
Err(SendError(_)) => Err(SendError(())), // We must take the facet group size into account
// when we serialize strings and numbers.
FacetKind::Number | FacetKind::String => value_length + 1,
FacetKind::Null | FacetKind::Empty | FacetKind::Exists => value_length,
};
let total_length = EntryHeader::put_key_value_size(key_length, value_length);
if total_length > capacity {
unreachable!("entry larger that the bbqueue capacity");
}
let payload_header = EntryHeader::DbOperation {
database: Database::from(facet_kind),
key_length: NonZeroU16::new(key_length),
};
loop {
let mut grant = match producer.grant(total_length) {
Ok(grant) => grant,
Err(bbqueue::Error::InsufficientSize) => continue,
Err(e) => unreachable!("{e:?}"),
};
let (header, remaining) = grant.split_at_mut(mem::size_of::<EntryHeader>());
header.copy_from_slice(payload_header.bytes_of());
let (key_out, value_out) = remaining.split_at_mut(key.len());
key_out.copy_from_slice(key);
let value_out = match facet_kind {
// We must take the facet group size into account
// when we serialize strings and numbers.
FacetKind::String | FacetKind::Number => {
let (first, remaining) = value_out.split_first_mut().unwrap();
*first = 1;
remaining
}
FacetKind::Null | FacetKind::Empty | FacetKind::Exists => value_out,
};
CboRoaringBitmapCodec::serialize_into_writer(bitmap, value_out)?;
// We could commit only the used memory.
grant.commit(total_length);
break Ok(());
}
}
pub fn delete(&self, key: &[u8]) -> crate::Result<()> {
let capacity = self.sender.capacity;
let refcell = self.sender.inner.get().unwrap();
let mut producer = refcell.0.borrow_mut_or_yield();
let (facet_kind, key) = FacetKind::extract_from_key(key);
let key_length = key.len().try_into().unwrap();
let total_length = EntryHeader::delete_key_size(key_length);
if total_length > capacity {
unreachable!("entry larger that the bbqueue capacity");
}
let payload_header =
EntryHeader::DbOperation { database: Database::from(facet_kind), key_length: None };
loop {
let mut grant = match producer.grant(total_length) {
Ok(grant) => grant,
Err(bbqueue::Error::InsufficientSize) => continue,
Err(e) => unreachable!("{e:?}"),
};
let (header, remaining) = grant.split_at_mut(mem::size_of::<EntryHeader>());
header.copy_from_slice(payload_header.bytes_of());
remaining.copy_from_slice(key);
// We could commit only the used memory.
grant.commit(total_length);
break Ok(());
} }
} }
} }
pub struct FieldIdDocidFacetSender<'a>(&'a ExtractorSender); pub struct FieldIdDocidFacetSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
impl FieldIdDocidFacetSender<'_> { impl FieldIdDocidFacetSender<'_, '_> {
pub fn write_facet_string(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { pub fn write_facet_string(&self, key: &[u8], value: &[u8]) -> crate::Result<()> {
debug_assert!(FieldDocIdFacetStringCodec::bytes_decode(key).is_ok()); debug_assert!(FieldDocIdFacetStringCodec::bytes_decode(key).is_ok());
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value)); self.0.write_key_value(Database::FieldIdDocidFacetStrings, key, value)
self.0
.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetStrings, entry })
} }
pub fn write_facet_f64(&self, key: &[u8]) -> StdResult<(), SendError<()>> { pub fn write_facet_f64(&self, key: &[u8]) -> crate::Result<()> {
debug_assert!(FieldDocIdFacetF64Codec::bytes_decode(key).is_ok()); debug_assert!(FieldDocIdFacetF64Codec::bytes_decode(key).is_ok());
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, &[])); self.0.write_key_value(Database::FieldIdDocidFacetF64s, key, &[])
self.0.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetF64s, entry })
} }
pub fn delete_facet_string(&self, key: &[u8]) -> StdResult<(), SendError<()>> { pub fn delete_facet_string(&self, key: &[u8]) -> crate::Result<()> {
debug_assert!(FieldDocIdFacetStringCodec::bytes_decode(key).is_ok()); debug_assert!(FieldDocIdFacetStringCodec::bytes_decode(key).is_ok());
let entry = EntryOperation::Delete(KeyEntry::from_key(key)); self.0.delete_entry(Database::FieldIdDocidFacetStrings, key)
self.0
.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetStrings, entry })
} }
pub fn delete_facet_f64(&self, key: &[u8]) -> StdResult<(), SendError<()>> { pub fn delete_facet_f64(&self, key: &[u8]) -> crate::Result<()> {
debug_assert!(FieldDocIdFacetF64Codec::bytes_decode(key).is_ok()); debug_assert!(FieldDocIdFacetF64Codec::bytes_decode(key).is_ok());
let entry = EntryOperation::Delete(KeyEntry::from_key(key)); self.0.delete_entry(Database::FieldIdDocidFacetF64s, key)
self.0.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetF64s, entry })
} }
} }
pub struct DocumentsSender<'a>(&'a ExtractorSender); pub struct DocumentsSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
impl DocumentsSender<'_> { impl DocumentsSender<'_, '_> {
/// TODO do that efficiently /// TODO do that efficiently
pub fn uncompressed( pub fn uncompressed(
&self, &self,
docid: DocumentId, docid: DocumentId,
external_id: String, external_id: String,
document: &KvReaderFieldId, document: &KvReaderFieldId,
) -> StdResult<(), SendError<()>> { ) -> crate::Result<()> {
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value( self.0.write_key_value(Database::Documents, &docid.to_be_bytes(), document.as_bytes())?;
&docid.to_be_bytes(), self.0.write_key_value(
document.as_bytes(), Database::ExternalDocumentsIds,
));
match self.0.send_db_operation(DbOperation { database: Database::Documents, entry }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}?;
let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(
external_id.as_bytes(), external_id.as_bytes(),
&docid.to_be_bytes(), &docid.to_be_bytes(),
)); )
match self
.0
.send_db_operation(DbOperation { database: Database::ExternalDocumentsIds, entry })
{
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
} }
pub fn delete(&self, docid: DocumentId, external_id: String) -> StdResult<(), SendError<()>> { pub fn delete(&self, docid: DocumentId, external_id: String) -> crate::Result<()> {
let entry = EntryOperation::Delete(KeyEntry::from_key(&docid.to_be_bytes())); self.0.delete_entry(Database::Documents, &docid.to_be_bytes())?;
match self.0.send_db_operation(DbOperation { database: Database::Documents, entry }) {
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}?;
self.0.send_delete_vector(docid)?; self.0.send_delete_vector(docid)?;
self.0.delete_entry(Database::ExternalDocumentsIds, external_id.as_bytes())
let entry = EntryOperation::Delete(KeyEntry::from_key(external_id.as_bytes()));
match self
.0
.send_db_operation(DbOperation { database: Database::ExternalDocumentsIds, entry })
{
Ok(()) => Ok(()),
Err(SendError(_)) => Err(SendError(())),
}
} }
} }
pub struct EmbeddingSender<'a>(&'a Sender<WriterOperation>); pub struct EmbeddingSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
impl EmbeddingSender<'_> { impl EmbeddingSender<'_, '_> {
pub fn set_vectors( pub fn set_vectors(
&self, &self,
docid: DocumentId, docid: DocumentId,
embedder_id: u8, embedder_id: u8,
embeddings: Vec<Embedding>, embeddings: Vec<Embedding>,
) -> StdResult<(), SendError<()>> { ) -> crate::Result<()> {
self.0 self.0
.send(WriterOperation::ArroyOperation(ArroyOperation::SetVectors { .send(WriterOperation::ArroyOperation(ArroyOperation::SetVectors {
docid, docid,
@ -541,33 +611,36 @@ impl EmbeddingSender<'_> {
} }
} }
pub struct GeoSender<'a>(&'a Sender<WriterOperation>); pub struct GeoSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>);
impl GeoSender<'_> { impl GeoSender<'_, '_> {
pub fn set_rtree(&self, value: Mmap) -> StdResult<(), SendError<()>> { pub fn set_rtree(&self, value: Mmap) -> StdResult<(), SendError<()>> {
self.0 todo!("set rtree from file")
.send(WriterOperation::DbOperation(DbOperation { // self.0
database: Database::Main, // .send(WriterOperation::DbOperation(DbOperation {
entry: EntryOperation::Write(KeyValueEntry::from_large_key_value( // database: Database::Main,
GEO_RTREE_KEY.as_bytes(), // entry: EntryOperation::Write(KeyValueEntry::from_large_key_value(
value, // GEO_RTREE_KEY.as_bytes(),
)), // value,
})) // )),
.map_err(|_| SendError(())) // }))
// .map_err(|_| SendError(()))
} }
pub fn set_geo_faceted(&self, bitmap: &RoaringBitmap) -> StdResult<(), SendError<()>> { pub fn set_geo_faceted(&self, bitmap: &RoaringBitmap) -> StdResult<(), SendError<()>> {
let mut buffer = Vec::new(); todo!("serialize directly into bbqueue (as a real roaringbitmap not a cbo)")
bitmap.serialize_into(&mut buffer).unwrap();
self.0 // let mut buffer = Vec::new();
.send(WriterOperation::DbOperation(DbOperation { // bitmap.serialize_into(&mut buffer).unwrap();
database: Database::Main,
entry: EntryOperation::Write(KeyValueEntry::from_small_key_value( // self.0
GEO_FACETED_DOCUMENTS_IDS_KEY.as_bytes(), // .send(WriterOperation::DbOperation(DbOperation {
&buffer, // database: Database::Main,
)), // entry: EntryOperation::Write(KeyValueEntry::from_small_key_value(
})) // GEO_FACETED_DOCUMENTS_IDS_KEY.as_bytes(),
.map_err(|_| SendError(())) // &buffer,
// )),
// }))
// .map_err(|_| SendError(()))
} }
} }

View File

@ -12,13 +12,14 @@ use crate::update::new::thread_local::FullySend;
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::vector::EmbeddingConfigs; use crate::vector::EmbeddingConfigs;
use crate::Result; use crate::Result;
pub struct DocumentsExtractor<'a> {
document_sender: &'a DocumentsSender<'a>, pub struct DocumentsExtractor<'a, 'b> {
document_sender: DocumentsSender<'a, 'b>,
embedders: &'a EmbeddingConfigs, embedders: &'a EmbeddingConfigs,
} }
impl<'a> DocumentsExtractor<'a> { impl<'a, 'b> DocumentsExtractor<'a, 'b> {
pub fn new(document_sender: &'a DocumentsSender<'a>, embedders: &'a EmbeddingConfigs) -> Self { pub fn new(document_sender: DocumentsSender<'a, 'b>, embedders: &'a EmbeddingConfigs) -> Self {
Self { document_sender, embedders } Self { document_sender, embedders }
} }
} }
@ -29,7 +30,7 @@ pub struct DocumentExtractorData {
pub field_distribution_delta: HashMap<String, i64>, pub field_distribution_delta: HashMap<String, i64>,
} }
impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> { impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> {
type Data = FullySend<RefCell<DocumentExtractorData>>; type Data = FullySend<RefCell<DocumentExtractorData>>;
fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result<Self::Data> { fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result<Self::Data> {

View File

@ -20,7 +20,7 @@ use crate::{DocumentId, FieldDistribution, InternalError, Result, ThreadPoolNoAb
pub struct EmbeddingExtractor<'a> { pub struct EmbeddingExtractor<'a> {
embedders: &'a EmbeddingConfigs, embedders: &'a EmbeddingConfigs,
sender: &'a EmbeddingSender<'a>, sender: EmbeddingSender<'a>,
possible_embedding_mistakes: PossibleEmbeddingMistakes, possible_embedding_mistakes: PossibleEmbeddingMistakes,
threads: &'a ThreadPoolNoAbort, threads: &'a ThreadPoolNoAbort,
} }
@ -28,7 +28,7 @@ pub struct EmbeddingExtractor<'a> {
impl<'a> EmbeddingExtractor<'a> { impl<'a> EmbeddingExtractor<'a> {
pub fn new( pub fn new(
embedders: &'a EmbeddingConfigs, embedders: &'a EmbeddingConfigs,
sender: &'a EmbeddingSender<'a>, sender: EmbeddingSender<'a>,
field_distribution: &'a FieldDistribution, field_distribution: &'a FieldDistribution,
threads: &'a ThreadPoolNoAbort, threads: &'a ThreadPoolNoAbort,
) -> Self { ) -> Self {
@ -368,7 +368,7 @@ impl<'a, 'extractor> Chunks<'a, 'extractor> {
possible_embedding_mistakes: &PossibleEmbeddingMistakes, possible_embedding_mistakes: &PossibleEmbeddingMistakes,
unused_vectors_distribution: &UnusedVectorsDistributionBump, unused_vectors_distribution: &UnusedVectorsDistributionBump,
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
sender: &EmbeddingSender<'a>, sender: EmbeddingSender<'a>,
has_manual_generation: Option<&'a str>, has_manual_generation: Option<&'a str>,
) -> Result<()> { ) -> Result<()> {
if let Some(external_docid) = has_manual_generation { if let Some(external_docid) = has_manual_generation {

View File

@ -76,7 +76,11 @@ where
MSP: Fn() -> bool + Sync, MSP: Fn() -> bool + Sync,
SP: Fn(Progress) + Sync, SP: Fn(Progress) + Sync,
{ {
let (extractor_sender, writer_receiver) = extractor_writer_channel(10_000); /// TODO restrict memory and remove this memory from the extractors bum allocators
let bbbuffers: Vec<_> = (0..rayon::current_num_threads())
.map(|_| bbqueue::BBBuffer::new(100 * 1024 * 1024)) // 100 MiB by thread
.collect();
let (extractor_sender, writer_receiver) = extractor_writer_bbqueue(&bbbuffers);
let finished_extraction = AtomicBool::new(false); let finished_extraction = AtomicBool::new(false);
let metadata_builder = MetadataBuilder::from_index(index, wtxn)?; let metadata_builder = MetadataBuilder::from_index(index, wtxn)?;
@ -115,7 +119,7 @@ where
// document but we need to create a function that collects and compresses documents. // document but we need to create a function that collects and compresses documents.
let document_sender = extractor_sender.documents(); let document_sender = extractor_sender.documents();
let document_extractor = DocumentsExtractor::new(&document_sender, embedders); let document_extractor = DocumentsExtractor::new(document_sender, embedders);
let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); let datastore = ThreadLocal::with_capacity(rayon::current_num_threads());
{ {
let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents"); let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents");

View File

@ -19,7 +19,7 @@ pub fn merge_and_send_rtree<'extractor, MSP>(
datastore: impl IntoIterator<Item = RefCell<GeoExtractorData<'extractor>>>, datastore: impl IntoIterator<Item = RefCell<GeoExtractorData<'extractor>>>,
rtxn: &RoTxn, rtxn: &RoTxn,
index: &Index, index: &Index,
geo_sender: GeoSender<'_>, geo_sender: GeoSender<'_, '_>,
must_stop_processing: &MSP, must_stop_processing: &MSP,
) -> Result<()> ) -> Result<()>
where where
@ -62,19 +62,19 @@ where
} }
#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
pub fn merge_and_send_docids<'extractor, MSP>( pub fn merge_and_send_docids<'extractor, MSP, D>(
mut caches: Vec<BalancedCaches<'extractor>>, mut caches: Vec<BalancedCaches<'extractor>>,
database: Database<Bytes, Bytes>, database: Database<Bytes, Bytes>,
index: &Index, index: &Index,
docids_sender: impl DocidsSender + Sync, docids_sender: WordDocidsSender<D>,
must_stop_processing: &MSP, must_stop_processing: &MSP,
) -> Result<()> ) -> Result<()>
where where
MSP: Fn() -> bool + Sync, MSP: Fn() -> bool + Sync,
D: DatabaseType + Sync,
{ {
transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| { transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| {
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
let mut buffer = Vec::new();
if must_stop_processing() { if must_stop_processing() {
return Err(InternalError::AbortedIndexation.into()); return Err(InternalError::AbortedIndexation.into());
} }
@ -82,8 +82,7 @@ where
let current = database.get(&rtxn, key)?; let current = database.get(&rtxn, key)?;
match merge_cbo_bitmaps(current, del, add)? { match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => { Operation::Write(bitmap) => {
let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer); docids_sender.write(key, &bitmap).unwrap();
docids_sender.write(key, value).unwrap();
Ok(()) Ok(())
} }
Operation::Delete => { Operation::Delete => {
@ -101,21 +100,19 @@ pub fn merge_and_send_facet_docids<'extractor>(
mut caches: Vec<BalancedCaches<'extractor>>, mut caches: Vec<BalancedCaches<'extractor>>,
database: FacetDatabases, database: FacetDatabases,
index: &Index, index: &Index,
docids_sender: impl DocidsSender + Sync, docids_sender: FacetDocidsSender,
) -> Result<FacetFieldIdsDelta> { ) -> Result<FacetFieldIdsDelta> {
transpose_and_freeze_caches(&mut caches)? transpose_and_freeze_caches(&mut caches)?
.into_par_iter() .into_par_iter()
.map(|frozen| { .map(|frozen| {
let mut facet_field_ids_delta = FacetFieldIdsDelta::default(); let mut facet_field_ids_delta = FacetFieldIdsDelta::default();
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
let mut buffer = Vec::new();
merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| { merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| {
let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?; let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?;
match merge_cbo_bitmaps(current, del, add)? { match merge_cbo_bitmaps(current, del, add)? {
Operation::Write(bitmap) => { Operation::Write(bitmap) => {
facet_field_ids_delta.register_from_key(key); facet_field_ids_delta.register_from_key(key);
let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer); docids_sender.write(key, &bitmap).unwrap();
docids_sender.write(key, value).unwrap();
Ok(()) Ok(())
} }
Operation::Delete => { Operation::Delete => {