diff --git a/crates/milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs b/crates/milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs index 257d5bd0a..cae1874dd 100644 --- a/crates/milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs +++ b/crates/milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs @@ -41,6 +41,25 @@ impl CboRoaringBitmapCodec { } } + pub fn serialize_into_writer( + roaring: &RoaringBitmap, + mut writer: W, + ) -> io::Result<()> { + if roaring.len() <= THRESHOLD as u64 { + // If the number of items (u32s) to encode is less than or equal to the threshold + // it means that it would weigh the same or less than the RoaringBitmap + // header, so we directly encode them using ByteOrder instead. + for integer in roaring { + writer.write_u32::(integer)?; + } + } else { + // Otherwise, we use the classic RoaringBitmapCodec that writes a header. + roaring.serialize_into(writer)?; + } + + Ok(()) + } + pub fn deserialize_from(mut bytes: &[u8]) -> io::Result { if bytes.len() <= THRESHOLD * size_of::() { // If there is threshold or less than threshold integers that can fit into this array diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs index 21cd6b87d..cacc7b129 100644 --- a/crates/milli/src/update/new/channel.rs +++ b/crates/milli/src/update/new/channel.rs @@ -1,14 +1,19 @@ +use std::cell::RefCell; use std::marker::PhantomData; -use std::sync::atomic::{AtomicUsize, Ordering}; +use std::num::NonZeroU16; +use std::{mem, slice}; +use bbqueue::framed::{FrameGrantR, FrameProducer}; +use bytemuck::{NoUninit, CheckedBitPattern}; use crossbeam::sync::{Parker, Unparker}; -use crossbeam_channel::{IntoIter, Receiver, SendError, Sender}; +use crossbeam_channel::{IntoIter, Receiver, SendError}; use heed::types::Bytes; use heed::BytesDecode; use memmap2::Mmap; use roaring::RoaringBitmap; use super::extract::FacetKind; +use super::ref_cell_ext::RefCellExt; use super::thread_local::{FullySend, ThreadLocal}; use super::StdResult; use crate::heed_codec::facet::{FieldDocIdFacetF64Codec, FieldDocIdFacetStringCodec}; @@ -16,7 +21,7 @@ use crate::index::main_key::{GEO_FACETED_DOCUMENTS_IDS_KEY, GEO_RTREE_KEY}; use crate::index::{db_name, IndexEmbeddingConfig}; use crate::update::new::KvReaderFieldId; use crate::vector::Embedding; -use crate::{DocumentId, Index}; +use crate::{CboRoaringBitmapCodec, DocumentId, Index}; /// Creates a tuple of producer/receivers to be used by /// the extractors and the writer loop. @@ -26,125 +31,97 @@ use crate::{DocumentId, Index}; /// Panics if the number of provided bbqueue is not exactly equal /// to the number of available threads in the rayon threadpool. pub fn extractor_writer_bbqueue( - bbqueue: &[bbqueue::BBBuffer], + bbbuffers: &[bbqueue::BBBuffer], ) -> (ExtractorBbqueueSender, WriterBbqueueReceiver) { assert_eq!( - bbqueue.len(), + bbbuffers.len(), rayon::current_num_threads(), "You must provide as many BBBuffer as the available number of threads to extract" ); + let capacity = bbbuffers.first().unwrap().capacity(); let parker = Parker::new(); - let extractors = ThreadLocal::with_capacity(bbqueue.len()); + let extractors = ThreadLocal::with_capacity(bbbuffers.len()); let producers = rayon::broadcast(|bi| { - let bbqueue = &bbqueue[bi.index()]; + let bbqueue = &bbbuffers[bi.index()]; let (producer, consumer) = bbqueue.try_split_framed().unwrap(); - extractors.get_or(|| FullySend(producer)); + extractors.get_or(|| FullySend(RefCell::new(producer))); consumer }); ( - ExtractorBbqueueSender { inner: extractors, unparker: parker.unparker().clone() }, + ExtractorBbqueueSender { + inner: extractors, + capacity: capacity.checked_sub(9).unwrap(), + unparker: parker.unparker().clone(), + }, WriterBbqueueReceiver { inner: producers, parker }, ) } -pub struct ExtractorBbqueueSender<'a> { - inner: ThreadLocal>>, - /// Used to wake up the receiver thread, - /// Used everytime we write something in the producer. - unparker: Unparker, -} - pub struct WriterBbqueueReceiver<'a> { inner: Vec>, /// Used to park when no more work is required parker: Parker, } -/// The capacity of the channel is currently in number of messages. -pub fn extractor_writer_channel(cap: usize) -> (ExtractorSender, WriterReceiver) { - let (sender, receiver) = crossbeam_channel::bounded(cap); - ( - ExtractorSender { - sender, - send_count: Default::default(), - writer_contentious_count: Default::default(), - extractor_contentious_count: Default::default(), - }, - WriterReceiver(receiver), - ) -} - -pub enum KeyValueEntry { - Small { key_length: usize, data: Box<[u8]> }, - Large { key_entry: KeyEntry, data: Mmap }, -} - -impl KeyValueEntry { - pub fn from_small_key_value(key: &[u8], value: &[u8]) -> Self { - let mut data = Vec::with_capacity(key.len() + value.len()); - data.extend_from_slice(key); - data.extend_from_slice(value); - KeyValueEntry::Small { key_length: key.len(), data: data.into_boxed_slice() } - } - - fn from_large_key_value(key: &[u8], value: Mmap) -> Self { - KeyValueEntry::Large { key_entry: KeyEntry::from_key(key), data: value } - } - - pub fn key(&self) -> &[u8] { - match self { - KeyValueEntry::Small { key_length, data } => &data[..*key_length], - KeyValueEntry::Large { key_entry, data: _ } => key_entry.entry(), - } - } - - pub fn value(&self) -> &[u8] { - match self { - KeyValueEntry::Small { key_length, data } => &data[*key_length..], - KeyValueEntry::Large { key_entry: _, data } => &data[..], +impl<'a> WriterBbqueueReceiver<'a> { + pub fn read(&mut self) -> Option> { + loop { + for consumer in &mut self.inner { + // mark the frame as auto release + if let Some() = consumer.read() + } + break None; } } } -pub struct KeyEntry { - data: Box<[u8]>, +struct FrameWithHeader<'a> { + header: EntryHeader, + frame: FrameGrantR<'a>, } -impl KeyEntry { - pub fn from_key(key: &[u8]) -> Self { - KeyEntry { data: key.to_vec().into_boxed_slice() } +#[derive(Debug, Clone, Copy, CheckedBitPattern)] +#[repr(u8)] +enum EntryHeader { + /// Wether a put of the key/value pair or a delete of the given key. + DbOperation { + /// The database on which to perform the operation. + database: Database, + /// The key length in the buffer. + /// + /// If None it means that the buffer is dedicated + /// to the key and it is therefore a deletion operation. + key_length: Option, + }, + ArroyDeleteVector { + docid: DocumentId, + }, + /// The embedding is the remaining space and represents a non-aligned [f32]. + ArroySetVector { + docid: DocumentId, + embedder_id: u8, + }, +} + +impl EntryHeader { + fn delete_key_size(key_length: u16) -> usize { + mem::size_of::() + key_length as usize } - pub fn entry(&self) -> &[u8] { - self.data.as_ref() + fn put_key_value_size(key_length: u16, value_length: usize) -> usize { + mem::size_of::() + key_length as usize + value_length + } + + fn bytes_of(&self) -> &[u8] { + /// TODO do the variant matching ourselves + todo!() } } -pub enum EntryOperation { - Delete(KeyEntry), - Write(KeyValueEntry), -} - -pub enum WriterOperation { - DbOperation(DbOperation), - ArroyOperation(ArroyOperation), -} - -pub enum ArroyOperation { - DeleteVectors { docid: DocumentId }, - SetVectors { docid: DocumentId, embedder_id: u8, embeddings: Vec }, - SetVector { docid: DocumentId, embedder_id: u8, embedding: Embedding }, - Finish { configs: Vec }, -} - -pub struct DbOperation { - database: Database, - entry: EntryOperation, -} - -#[derive(Debug)] +#[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)] +#[repr(u32)] pub enum Database { Main, Documents, @@ -220,82 +197,46 @@ impl From for Database { } } -impl DbOperation { - pub fn database(&self, index: &Index) -> heed::Database { - self.database.database(index) - } - - pub fn database_name(&self) -> &'static str { - self.database.database_name() - } - - pub fn entry(self) -> EntryOperation { - self.entry - } +pub struct ExtractorBbqueueSender<'a> { + inner: ThreadLocal>>>, + /// The capacity of this frame producer, will never be able to store more than that. + /// + /// Note that the FrameProducer requires up to 9 bytes to encode the length, + /// the capacity has been shrinked accordingly. + /// + /// + capacity: usize, + /// Used to wake up the receiver thread, + /// Used everytime we write something in the producer. + unparker: Unparker, } -pub struct WriterReceiver(Receiver); - -impl IntoIterator for WriterReceiver { - type Item = WriterOperation; - type IntoIter = IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() - } -} - -pub struct ExtractorSender { - sender: Sender, - /// The number of message we sent in total in the channel. - send_count: AtomicUsize, - /// The number of times we sent something in a channel that was full. - writer_contentious_count: AtomicUsize, - /// The number of times we sent something in a channel that was empty. - extractor_contentious_count: AtomicUsize, -} - -impl Drop for ExtractorSender { - fn drop(&mut self) { - let send_count = *self.send_count.get_mut(); - let writer_contentious_count = *self.writer_contentious_count.get_mut(); - let extractor_contentious_count = *self.extractor_contentious_count.get_mut(); - tracing::debug!( - "Extractor channel stats: {send_count} sends, \ - {writer_contentious_count} writer contentions ({}%), \ - {extractor_contentious_count} extractor contentions ({}%)", - (writer_contentious_count as f32 / send_count as f32) * 100.0, - (extractor_contentious_count as f32 / send_count as f32) * 100.0 - ) - } -} - -impl ExtractorSender { - pub fn docids(&self) -> WordDocidsSender<'_, D> { +impl<'b> ExtractorBbqueueSender<'b> { + pub fn docids<'a, D: DatabaseType>(&'a self) -> WordDocidsSender<'a, 'b, D> { WordDocidsSender { sender: self, _marker: PhantomData } } - pub fn facet_docids(&self) -> FacetDocidsSender<'_> { + pub fn facet_docids<'a>(&'a self) -> FacetDocidsSender<'a, 'b> { FacetDocidsSender { sender: self } } - pub fn field_id_docid_facet_sender(&self) -> FieldIdDocidFacetSender<'_> { - FieldIdDocidFacetSender(self) + pub fn field_id_docid_facet_sender<'a>(&'a self) -> FieldIdDocidFacetSender<'a, 'b> { + FieldIdDocidFacetSender(&self) } - pub fn documents(&self) -> DocumentsSender<'_> { - DocumentsSender(self) + pub fn documents<'a>(&'a self) -> DocumentsSender<'a, 'b> { + DocumentsSender(&self) } - pub fn embeddings(&self) -> EmbeddingSender<'_> { - EmbeddingSender(&self.sender) + pub fn embeddings<'a>(&'a self) -> EmbeddingSender<'a, 'b> { + EmbeddingSender(&self) } - pub fn geo(&self) -> GeoSender<'_> { - GeoSender(&self.sender) + pub fn geo<'a>(&'a self) -> GeoSender<'a, 'b> { + GeoSender(&self) } - fn send_delete_vector(&self, docid: DocumentId) -> StdResult<(), SendError<()>> { + fn send_delete_vector(&self, docid: DocumentId) -> crate::Result<()> { match self .sender .send(WriterOperation::ArroyOperation(ArroyOperation::DeleteVectors { docid })) @@ -305,18 +246,69 @@ impl ExtractorSender { } } - fn send_db_operation(&self, op: DbOperation) -> StdResult<(), SendError<()>> { - if self.sender.is_full() { - self.writer_contentious_count.fetch_add(1, Ordering::SeqCst); - } - if self.sender.is_empty() { - self.extractor_contentious_count.fetch_add(1, Ordering::SeqCst); + fn write_key_value(&self, database: Database, key: &[u8], value: &[u8]) -> crate::Result<()> { + let capacity = self.capacity; + let refcell = self.inner.get().unwrap(); + let mut producer = refcell.0.borrow_mut_or_yield(); + + let key_length = key.len().try_into().unwrap(); + let value_length = value.len(); + let total_length = EntryHeader::put_key_value_size(key_length, value_length); + if total_length > capacity { + unreachable!("entry larger that the bbqueue capacity"); } - self.send_count.fetch_add(1, Ordering::SeqCst); - match self.sender.send(WriterOperation::DbOperation(op)) { - Ok(()) => Ok(()), - Err(SendError(_)) => Err(SendError(())), + let payload_header = + EntryHeader::DbOperation { database, key_length: NonZeroU16::new(key_length) }; + + loop { + let mut grant = match producer.grant(total_length) { + Ok(grant) => grant, + Err(bbqueue::Error::InsufficientSize) => continue, + Err(e) => unreachable!("{e:?}"), + }; + + let (header, remaining) = grant.split_at_mut(mem::size_of::()); + header.copy_from_slice(payload_header.bytes_of()); + let (key_out, value_out) = remaining.split_at_mut(key.len()); + key_out.copy_from_slice(key); + value_out.copy_from_slice(value); + + // We could commit only the used memory. + grant.commit(total_length); + + break Ok(()); + } + } + + fn delete_entry(&self, database: Database, key: &[u8]) -> crate::Result<()> { + let capacity = self.capacity; + let refcell = self.inner.get().unwrap(); + let mut producer = refcell.0.borrow_mut_or_yield(); + + let key_length = key.len().try_into().unwrap(); + let total_length = EntryHeader::delete_key_size(key_length); + if total_length > capacity { + unreachable!("entry larger that the bbqueue capacity"); + } + + let payload_header = EntryHeader::DbOperation { database, key_length: None }; + + loop { + let mut grant = match producer.grant(total_length) { + Ok(grant) => grant, + Err(bbqueue::Error::InsufficientSize) => continue, + Err(e) => unreachable!("{e:?}"), + }; + + let (header, remaining) = grant.split_at_mut(mem::size_of::()); + header.copy_from_slice(payload_header.bytes_of()); + remaining.copy_from_slice(key); + + // We could commit only the used memory. + grant.commit(total_length); + + break Ok(()); } } } @@ -356,159 +348,237 @@ impl DatabaseType for WordPositionDocids { const DATABASE: Database = Database::WordPositionDocids; } -pub trait DocidsSender { - fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>>; - fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>>; -} - -pub struct WordDocidsSender<'a, D> { - sender: &'a ExtractorSender, +pub struct WordDocidsSender<'a, 'b, D> { + sender: &'a ExtractorBbqueueSender<'b>, _marker: PhantomData, } -impl DocidsSender for WordDocidsSender<'_, D> { - fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { - let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value)); - match self.sender.send_db_operation(DbOperation { database: D::DATABASE, entry }) { - Ok(()) => Ok(()), - Err(SendError(_)) => Err(SendError(())), +impl WordDocidsSender<'_, '_, D> { + pub fn write(&self, key: &[u8], bitmap: &RoaringBitmap) -> crate::Result<()> { + let capacity = self.sender.capacity; + let refcell = self.sender.inner.get().unwrap(); + let mut producer = refcell.0.borrow_mut_or_yield(); + + let key_length = key.len().try_into().unwrap(); + let value_length = CboRoaringBitmapCodec::serialized_size(bitmap); + + let total_length = EntryHeader::put_key_value_size(key_length, value_length); + if total_length > capacity { + unreachable!("entry larger that the bbqueue capacity"); } - } - fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> { - let entry = EntryOperation::Delete(KeyEntry::from_key(key)); - match self.sender.send_db_operation(DbOperation { database: D::DATABASE, entry }) { - Ok(()) => Ok(()), - Err(SendError(_)) => Err(SendError(())), - } - } -} - -pub struct FacetDocidsSender<'a> { - sender: &'a ExtractorSender, -} - -impl DocidsSender for FacetDocidsSender<'_> { - fn write(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { - let (facet_kind, key) = FacetKind::extract_from_key(key); - let database = Database::from(facet_kind); - let entry = match facet_kind { - // skip level group size - FacetKind::String | FacetKind::Number => { - // add facet group size - let value = [&[1], value].concat(); - EntryOperation::Write(KeyValueEntry::from_small_key_value(key, &value)) - } - _ => EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value)), + let payload_header = EntryHeader::DbOperation { + database: D::DATABASE, + key_length: NonZeroU16::new(key_length), }; - match self.sender.send_db_operation(DbOperation { database, entry }) { - Ok(()) => Ok(()), - Err(SendError(_)) => Err(SendError(())), + + loop { + let mut grant = match producer.grant(total_length) { + Ok(grant) => grant, + Err(bbqueue::Error::InsufficientSize) => continue, + Err(e) => unreachable!("{e:?}"), + }; + + let (header, remaining) = grant.split_at_mut(mem::size_of::()); + header.copy_from_slice(payload_header.bytes_of()); + let (key_out, value_out) = remaining.split_at_mut(key.len()); + key_out.copy_from_slice(key); + CboRoaringBitmapCodec::serialize_into_writer(bitmap, value_out)?; + + // We could commit only the used memory. + grant.commit(total_length); + + break Ok(()); } } - fn delete(&self, key: &[u8]) -> StdResult<(), SendError<()>> { + pub fn delete(&self, key: &[u8]) -> crate::Result<()> { + let capacity = self.sender.capacity; + let refcell = self.sender.inner.get().unwrap(); + let mut producer = refcell.0.borrow_mut_or_yield(); + + let key_length = key.len().try_into().unwrap(); + let total_length = EntryHeader::delete_key_size(key_length); + if total_length > capacity { + unreachable!("entry larger that the bbqueue capacity"); + } + + let payload_header = EntryHeader::DbOperation { database: D::DATABASE, key_length: None }; + + loop { + let mut grant = match producer.grant(total_length) { + Ok(grant) => grant, + Err(bbqueue::Error::InsufficientSize) => continue, + Err(e) => unreachable!("{e:?}"), + }; + + let (header, remaining) = grant.split_at_mut(mem::size_of::()); + header.copy_from_slice(payload_header.bytes_of()); + remaining.copy_from_slice(key); + + // We could commit only the used memory. + grant.commit(total_length); + + break Ok(()); + } + } +} + +pub struct FacetDocidsSender<'a, 'b> { + sender: &'a ExtractorBbqueueSender<'b>, +} + +impl FacetDocidsSender<'_, '_> { + pub fn write(&self, key: &[u8], bitmap: &RoaringBitmap) -> crate::Result<()> { + let capacity = self.sender.capacity; + let refcell = self.sender.inner.get().unwrap(); + let mut producer = refcell.0.borrow_mut_or_yield(); + let (facet_kind, key) = FacetKind::extract_from_key(key); - let database = Database::from(facet_kind); - let entry = EntryOperation::Delete(KeyEntry::from_key(key)); - match self.sender.send_db_operation(DbOperation { database, entry }) { - Ok(()) => Ok(()), - Err(SendError(_)) => Err(SendError(())), + let key_length = key.len().try_into().unwrap(); + + let value_length = CboRoaringBitmapCodec::serialized_size(bitmap); + let value_length = match facet_kind { + // We must take the facet group size into account + // when we serialize strings and numbers. + FacetKind::Number | FacetKind::String => value_length + 1, + FacetKind::Null | FacetKind::Empty | FacetKind::Exists => value_length, + }; + + let total_length = EntryHeader::put_key_value_size(key_length, value_length); + if total_length > capacity { + unreachable!("entry larger that the bbqueue capacity"); + } + + let payload_header = EntryHeader::DbOperation { + database: Database::from(facet_kind), + key_length: NonZeroU16::new(key_length), + }; + + loop { + let mut grant = match producer.grant(total_length) { + Ok(grant) => grant, + Err(bbqueue::Error::InsufficientSize) => continue, + Err(e) => unreachable!("{e:?}"), + }; + + let (header, remaining) = grant.split_at_mut(mem::size_of::()); + header.copy_from_slice(payload_header.bytes_of()); + let (key_out, value_out) = remaining.split_at_mut(key.len()); + key_out.copy_from_slice(key); + + let value_out = match facet_kind { + // We must take the facet group size into account + // when we serialize strings and numbers. + FacetKind::String | FacetKind::Number => { + let (first, remaining) = value_out.split_first_mut().unwrap(); + *first = 1; + remaining + } + FacetKind::Null | FacetKind::Empty | FacetKind::Exists => value_out, + }; + CboRoaringBitmapCodec::serialize_into_writer(bitmap, value_out)?; + + // We could commit only the used memory. + grant.commit(total_length); + + break Ok(()); + } + } + + pub fn delete(&self, key: &[u8]) -> crate::Result<()> { + let capacity = self.sender.capacity; + let refcell = self.sender.inner.get().unwrap(); + let mut producer = refcell.0.borrow_mut_or_yield(); + + let (facet_kind, key) = FacetKind::extract_from_key(key); + let key_length = key.len().try_into().unwrap(); + + let total_length = EntryHeader::delete_key_size(key_length); + if total_length > capacity { + unreachable!("entry larger that the bbqueue capacity"); + } + + let payload_header = + EntryHeader::DbOperation { database: Database::from(facet_kind), key_length: None }; + + loop { + let mut grant = match producer.grant(total_length) { + Ok(grant) => grant, + Err(bbqueue::Error::InsufficientSize) => continue, + Err(e) => unreachable!("{e:?}"), + }; + + let (header, remaining) = grant.split_at_mut(mem::size_of::()); + header.copy_from_slice(payload_header.bytes_of()); + remaining.copy_from_slice(key); + + // We could commit only the used memory. + grant.commit(total_length); + + break Ok(()); } } } -pub struct FieldIdDocidFacetSender<'a>(&'a ExtractorSender); +pub struct FieldIdDocidFacetSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>); -impl FieldIdDocidFacetSender<'_> { - pub fn write_facet_string(&self, key: &[u8], value: &[u8]) -> StdResult<(), SendError<()>> { +impl FieldIdDocidFacetSender<'_, '_> { + pub fn write_facet_string(&self, key: &[u8], value: &[u8]) -> crate::Result<()> { debug_assert!(FieldDocIdFacetStringCodec::bytes_decode(key).is_ok()); - let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, value)); - self.0 - .send_db_operation(DbOperation { database: Database::FieldIdDocidFacetStrings, entry }) + self.0.write_key_value(Database::FieldIdDocidFacetStrings, key, value) } - pub fn write_facet_f64(&self, key: &[u8]) -> StdResult<(), SendError<()>> { + pub fn write_facet_f64(&self, key: &[u8]) -> crate::Result<()> { debug_assert!(FieldDocIdFacetF64Codec::bytes_decode(key).is_ok()); - let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value(key, &[])); - self.0.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetF64s, entry }) + self.0.write_key_value(Database::FieldIdDocidFacetF64s, key, &[]) } - pub fn delete_facet_string(&self, key: &[u8]) -> StdResult<(), SendError<()>> { + pub fn delete_facet_string(&self, key: &[u8]) -> crate::Result<()> { debug_assert!(FieldDocIdFacetStringCodec::bytes_decode(key).is_ok()); - let entry = EntryOperation::Delete(KeyEntry::from_key(key)); - self.0 - .send_db_operation(DbOperation { database: Database::FieldIdDocidFacetStrings, entry }) + self.0.delete_entry(Database::FieldIdDocidFacetStrings, key) } - pub fn delete_facet_f64(&self, key: &[u8]) -> StdResult<(), SendError<()>> { + pub fn delete_facet_f64(&self, key: &[u8]) -> crate::Result<()> { debug_assert!(FieldDocIdFacetF64Codec::bytes_decode(key).is_ok()); - let entry = EntryOperation::Delete(KeyEntry::from_key(key)); - self.0.send_db_operation(DbOperation { database: Database::FieldIdDocidFacetF64s, entry }) + self.0.delete_entry(Database::FieldIdDocidFacetF64s, key) } } -pub struct DocumentsSender<'a>(&'a ExtractorSender); +pub struct DocumentsSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>); -impl DocumentsSender<'_> { +impl DocumentsSender<'_, '_> { /// TODO do that efficiently pub fn uncompressed( &self, docid: DocumentId, external_id: String, document: &KvReaderFieldId, - ) -> StdResult<(), SendError<()>> { - let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value( - &docid.to_be_bytes(), - document.as_bytes(), - )); - match self.0.send_db_operation(DbOperation { database: Database::Documents, entry }) { - Ok(()) => Ok(()), - Err(SendError(_)) => Err(SendError(())), - }?; - - let entry = EntryOperation::Write(KeyValueEntry::from_small_key_value( + ) -> crate::Result<()> { + self.0.write_key_value(Database::Documents, &docid.to_be_bytes(), document.as_bytes())?; + self.0.write_key_value( + Database::ExternalDocumentsIds, external_id.as_bytes(), &docid.to_be_bytes(), - )); - match self - .0 - .send_db_operation(DbOperation { database: Database::ExternalDocumentsIds, entry }) - { - Ok(()) => Ok(()), - Err(SendError(_)) => Err(SendError(())), - } + ) } - pub fn delete(&self, docid: DocumentId, external_id: String) -> StdResult<(), SendError<()>> { - let entry = EntryOperation::Delete(KeyEntry::from_key(&docid.to_be_bytes())); - match self.0.send_db_operation(DbOperation { database: Database::Documents, entry }) { - Ok(()) => Ok(()), - Err(SendError(_)) => Err(SendError(())), - }?; - + pub fn delete(&self, docid: DocumentId, external_id: String) -> crate::Result<()> { + self.0.delete_entry(Database::Documents, &docid.to_be_bytes())?; self.0.send_delete_vector(docid)?; - - let entry = EntryOperation::Delete(KeyEntry::from_key(external_id.as_bytes())); - match self - .0 - .send_db_operation(DbOperation { database: Database::ExternalDocumentsIds, entry }) - { - Ok(()) => Ok(()), - Err(SendError(_)) => Err(SendError(())), - } + self.0.delete_entry(Database::ExternalDocumentsIds, external_id.as_bytes()) } } -pub struct EmbeddingSender<'a>(&'a Sender); +pub struct EmbeddingSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>); -impl EmbeddingSender<'_> { +impl EmbeddingSender<'_, '_> { pub fn set_vectors( &self, docid: DocumentId, embedder_id: u8, embeddings: Vec, - ) -> StdResult<(), SendError<()>> { + ) -> crate::Result<()> { self.0 .send(WriterOperation::ArroyOperation(ArroyOperation::SetVectors { docid, @@ -541,33 +611,36 @@ impl EmbeddingSender<'_> { } } -pub struct GeoSender<'a>(&'a Sender); +pub struct GeoSender<'a, 'b>(&'a ExtractorBbqueueSender<'b>); -impl GeoSender<'_> { +impl GeoSender<'_, '_> { pub fn set_rtree(&self, value: Mmap) -> StdResult<(), SendError<()>> { - self.0 - .send(WriterOperation::DbOperation(DbOperation { - database: Database::Main, - entry: EntryOperation::Write(KeyValueEntry::from_large_key_value( - GEO_RTREE_KEY.as_bytes(), - value, - )), - })) - .map_err(|_| SendError(())) + todo!("set rtree from file") + // self.0 + // .send(WriterOperation::DbOperation(DbOperation { + // database: Database::Main, + // entry: EntryOperation::Write(KeyValueEntry::from_large_key_value( + // GEO_RTREE_KEY.as_bytes(), + // value, + // )), + // })) + // .map_err(|_| SendError(())) } pub fn set_geo_faceted(&self, bitmap: &RoaringBitmap) -> StdResult<(), SendError<()>> { - let mut buffer = Vec::new(); - bitmap.serialize_into(&mut buffer).unwrap(); + todo!("serialize directly into bbqueue (as a real roaringbitmap not a cbo)") - self.0 - .send(WriterOperation::DbOperation(DbOperation { - database: Database::Main, - entry: EntryOperation::Write(KeyValueEntry::from_small_key_value( - GEO_FACETED_DOCUMENTS_IDS_KEY.as_bytes(), - &buffer, - )), - })) - .map_err(|_| SendError(())) + // let mut buffer = Vec::new(); + // bitmap.serialize_into(&mut buffer).unwrap(); + + // self.0 + // .send(WriterOperation::DbOperation(DbOperation { + // database: Database::Main, + // entry: EntryOperation::Write(KeyValueEntry::from_small_key_value( + // GEO_FACETED_DOCUMENTS_IDS_KEY.as_bytes(), + // &buffer, + // )), + // })) + // .map_err(|_| SendError(())) } } diff --git a/crates/milli/src/update/new/extract/documents.rs b/crates/milli/src/update/new/extract/documents.rs index aeb1d5694..13307025a 100644 --- a/crates/milli/src/update/new/extract/documents.rs +++ b/crates/milli/src/update/new/extract/documents.rs @@ -12,13 +12,14 @@ use crate::update::new::thread_local::FullySend; use crate::update::new::DocumentChange; use crate::vector::EmbeddingConfigs; use crate::Result; -pub struct DocumentsExtractor<'a> { - document_sender: &'a DocumentsSender<'a>, + +pub struct DocumentsExtractor<'a, 'b> { + document_sender: DocumentsSender<'a, 'b>, embedders: &'a EmbeddingConfigs, } -impl<'a> DocumentsExtractor<'a> { - pub fn new(document_sender: &'a DocumentsSender<'a>, embedders: &'a EmbeddingConfigs) -> Self { +impl<'a, 'b> DocumentsExtractor<'a, 'b> { + pub fn new(document_sender: DocumentsSender<'a, 'b>, embedders: &'a EmbeddingConfigs) -> Self { Self { document_sender, embedders } } } @@ -29,7 +30,7 @@ pub struct DocumentExtractorData { pub field_distribution_delta: HashMap, } -impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> { +impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> { type Data = FullySend>; fn init_data(&self, _extractor_alloc: &'extractor Bump) -> Result { diff --git a/crates/milli/src/update/new/extract/vectors/mod.rs b/crates/milli/src/update/new/extract/vectors/mod.rs index 8ac73a8d7..52b13f37d 100644 --- a/crates/milli/src/update/new/extract/vectors/mod.rs +++ b/crates/milli/src/update/new/extract/vectors/mod.rs @@ -20,7 +20,7 @@ use crate::{DocumentId, FieldDistribution, InternalError, Result, ThreadPoolNoAb pub struct EmbeddingExtractor<'a> { embedders: &'a EmbeddingConfigs, - sender: &'a EmbeddingSender<'a>, + sender: EmbeddingSender<'a>, possible_embedding_mistakes: PossibleEmbeddingMistakes, threads: &'a ThreadPoolNoAbort, } @@ -28,7 +28,7 @@ pub struct EmbeddingExtractor<'a> { impl<'a> EmbeddingExtractor<'a> { pub fn new( embedders: &'a EmbeddingConfigs, - sender: &'a EmbeddingSender<'a>, + sender: EmbeddingSender<'a>, field_distribution: &'a FieldDistribution, threads: &'a ThreadPoolNoAbort, ) -> Self { @@ -368,7 +368,7 @@ impl<'a, 'extractor> Chunks<'a, 'extractor> { possible_embedding_mistakes: &PossibleEmbeddingMistakes, unused_vectors_distribution: &UnusedVectorsDistributionBump, threads: &ThreadPoolNoAbort, - sender: &EmbeddingSender<'a>, + sender: EmbeddingSender<'a>, has_manual_generation: Option<&'a str>, ) -> Result<()> { if let Some(external_docid) = has_manual_generation { diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 35dea7a98..88a4c2f77 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -76,7 +76,11 @@ where MSP: Fn() -> bool + Sync, SP: Fn(Progress) + Sync, { - let (extractor_sender, writer_receiver) = extractor_writer_channel(10_000); + /// TODO restrict memory and remove this memory from the extractors bum allocators + let bbbuffers: Vec<_> = (0..rayon::current_num_threads()) + .map(|_| bbqueue::BBBuffer::new(100 * 1024 * 1024)) // 100 MiB by thread + .collect(); + let (extractor_sender, writer_receiver) = extractor_writer_bbqueue(&bbbuffers); let finished_extraction = AtomicBool::new(false); let metadata_builder = MetadataBuilder::from_index(index, wtxn)?; @@ -115,7 +119,7 @@ where // document but we need to create a function that collects and compresses documents. let document_sender = extractor_sender.documents(); - let document_extractor = DocumentsExtractor::new(&document_sender, embedders); + let document_extractor = DocumentsExtractor::new(document_sender, embedders); let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); { let span = tracing::trace_span!(target: "indexing::documents::extract", parent: &indexer_span, "documents"); diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs index 039c56b9d..f2809b376 100644 --- a/crates/milli/src/update/new/merger.rs +++ b/crates/milli/src/update/new/merger.rs @@ -19,7 +19,7 @@ pub fn merge_and_send_rtree<'extractor, MSP>( datastore: impl IntoIterator>>, rtxn: &RoTxn, index: &Index, - geo_sender: GeoSender<'_>, + geo_sender: GeoSender<'_, '_>, must_stop_processing: &MSP, ) -> Result<()> where @@ -62,19 +62,19 @@ where } #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")] -pub fn merge_and_send_docids<'extractor, MSP>( +pub fn merge_and_send_docids<'extractor, MSP, D>( mut caches: Vec>, database: Database, index: &Index, - docids_sender: impl DocidsSender + Sync, + docids_sender: WordDocidsSender, must_stop_processing: &MSP, ) -> Result<()> where MSP: Fn() -> bool + Sync, + D: DatabaseType + Sync, { transpose_and_freeze_caches(&mut caches)?.into_par_iter().try_for_each(|frozen| { let rtxn = index.read_txn()?; - let mut buffer = Vec::new(); if must_stop_processing() { return Err(InternalError::AbortedIndexation.into()); } @@ -82,8 +82,7 @@ where let current = database.get(&rtxn, key)?; match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => { - let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer); - docids_sender.write(key, value).unwrap(); + docids_sender.write(key, &bitmap).unwrap(); Ok(()) } Operation::Delete => { @@ -101,21 +100,19 @@ pub fn merge_and_send_facet_docids<'extractor>( mut caches: Vec>, database: FacetDatabases, index: &Index, - docids_sender: impl DocidsSender + Sync, + docids_sender: FacetDocidsSender, ) -> Result { transpose_and_freeze_caches(&mut caches)? .into_par_iter() .map(|frozen| { let mut facet_field_ids_delta = FacetFieldIdsDelta::default(); let rtxn = index.read_txn()?; - let mut buffer = Vec::new(); merge_caches(frozen, |key, DelAddRoaringBitmap { del, add }| { let current = database.get_cbo_roaring_bytes_value(&rtxn, key)?; match merge_cbo_bitmaps(current, del, add)? { Operation::Write(bitmap) => { facet_field_ids_delta.register_from_key(key); - let value = cbo_bitmap_serialize_into_vec(&bitmap, &mut buffer); - docids_sender.write(key, value).unwrap(); + docids_sender.write(key, &bitmap).unwrap(); Ok(()) } Operation::Delete => {