mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-04 18:45:46 +01:00
Finish most of the channels types
This commit is contained in:
parent
e1e76f39d0
commit
6ac5b3b136
@ -62,9 +62,14 @@ pub enum InternalError {
|
||||
#[error(transparent)]
|
||||
Store(#[from] MdbError),
|
||||
#[error("Cannot delete {key:?} from database {database_name}: {error}")]
|
||||
StoreDeletion { database_name: &'static str, key: Vec<u8>, error: heed::Error },
|
||||
StoreDeletion { database_name: &'static str, key: Box<[u8]>, error: heed::Error },
|
||||
#[error("Cannot insert {key:?} and value with length {value_length} into database {database_name}: {error}")]
|
||||
StorePut { database_name: &'static str, key: Vec<u8>, value_length: usize, error: heed::Error },
|
||||
StorePut {
|
||||
database_name: &'static str,
|
||||
key: Box<[u8]>,
|
||||
value_length: usize,
|
||||
error: heed::Error,
|
||||
},
|
||||
#[error(transparent)]
|
||||
Utf8(#[from] str::Utf8Error),
|
||||
#[error("An indexation process was explicitly aborted")]
|
||||
|
@ -1,12 +1,11 @@
|
||||
use std::cell::RefCell;
|
||||
use std::marker::PhantomData;
|
||||
use std::mem;
|
||||
use std::num::NonZeroU16;
|
||||
use std::{mem, slice};
|
||||
|
||||
use bbqueue::framed::{FrameGrantR, FrameProducer};
|
||||
use bytemuck::{NoUninit, CheckedBitPattern};
|
||||
use crossbeam::sync::{Parker, Unparker};
|
||||
use crossbeam_channel::{IntoIter, Receiver, SendError};
|
||||
use bytemuck::{checked, CheckedBitPattern, NoUninit};
|
||||
use crossbeam_channel::SendError;
|
||||
use heed::types::Bytes;
|
||||
use heed::BytesDecode;
|
||||
use memmap2::Mmap;
|
||||
@ -17,21 +16,32 @@ use super::ref_cell_ext::RefCellExt;
|
||||
use super::thread_local::{FullySend, ThreadLocal};
|
||||
use super::StdResult;
|
||||
use crate::heed_codec::facet::{FieldDocIdFacetF64Codec, FieldDocIdFacetStringCodec};
|
||||
use crate::index::db_name;
|
||||
use crate::index::main_key::{GEO_FACETED_DOCUMENTS_IDS_KEY, GEO_RTREE_KEY};
|
||||
use crate::index::{db_name, IndexEmbeddingConfig};
|
||||
use crate::update::new::KvReaderFieldId;
|
||||
use crate::vector::Embedding;
|
||||
use crate::{CboRoaringBitmapCodec, DocumentId, Index};
|
||||
|
||||
/// Creates a tuple of producer/receivers to be used by
|
||||
/// Creates a tuple of senders/receiver to be used by
|
||||
/// the extractors and the writer loop.
|
||||
///
|
||||
/// The `channel_capacity` parameter defines the number of
|
||||
/// too-large-to-fit-in-BBQueue entries that can be sent through
|
||||
/// a crossbeam channel. This parameter must stay low to make
|
||||
/// sure we do not use too much memory.
|
||||
///
|
||||
/// Note that the channel is also used to wake-up the receiver
|
||||
/// wehn new stuff is available in any BBQueue buffer but we send
|
||||
/// a message in this queue only if it is empty to avoid filling
|
||||
/// the channel *and* the BBQueue.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// Panics if the number of provided bbqueue is not exactly equal
|
||||
/// Panics if the number of provided BBQueues is not exactly equal
|
||||
/// to the number of available threads in the rayon threadpool.
|
||||
pub fn extractor_writer_bbqueue(
|
||||
bbbuffers: &[bbqueue::BBBuffer],
|
||||
channel_capacity: usize,
|
||||
) -> (ExtractorBbqueueSender, WriterBbqueueReceiver) {
|
||||
assert_eq!(
|
||||
bbbuffers.len(),
|
||||
@ -40,88 +50,252 @@ pub fn extractor_writer_bbqueue(
|
||||
);
|
||||
|
||||
let capacity = bbbuffers.first().unwrap().capacity();
|
||||
let parker = Parker::new();
|
||||
let extractors = ThreadLocal::with_capacity(bbbuffers.len());
|
||||
let producers = rayon::broadcast(|bi| {
|
||||
// Read the field description to understand this
|
||||
let capacity = capacity.checked_sub(9).unwrap();
|
||||
|
||||
let producers = ThreadLocal::with_capacity(bbbuffers.len());
|
||||
let consumers = rayon::broadcast(|bi| {
|
||||
let bbqueue = &bbbuffers[bi.index()];
|
||||
let (producer, consumer) = bbqueue.try_split_framed().unwrap();
|
||||
extractors.get_or(|| FullySend(RefCell::new(producer)));
|
||||
producers.get_or(|| FullySend(RefCell::new(producer)));
|
||||
consumer
|
||||
});
|
||||
|
||||
(
|
||||
ExtractorBbqueueSender {
|
||||
inner: extractors,
|
||||
capacity: capacity.checked_sub(9).unwrap(),
|
||||
unparker: parker.unparker().clone(),
|
||||
},
|
||||
WriterBbqueueReceiver { inner: producers, parker },
|
||||
)
|
||||
let (sender, receiver) = crossbeam_channel::bounded(channel_capacity);
|
||||
let sender = ExtractorBbqueueSender { sender, producers, capacity };
|
||||
let receiver = WriterBbqueueReceiver { receiver, consumers };
|
||||
(sender, receiver)
|
||||
}
|
||||
|
||||
pub struct ExtractorBbqueueSender<'a> {
|
||||
/// This channel is used to wake-up the receiver and
|
||||
/// send large entries that cannot fit in the BBQueue.
|
||||
sender: crossbeam_channel::Sender<ReceiverAction>,
|
||||
/// A memory buffer, one by thread, is used to serialize
|
||||
/// the entries directly in this shared, lock-free space.
|
||||
producers: ThreadLocal<FullySend<RefCell<FrameProducer<'a>>>>,
|
||||
/// The capacity of this frame producer, will never be able to store more than that.
|
||||
///
|
||||
/// Note that the FrameProducer requires up to 9 bytes to encode the length,
|
||||
/// the capacity has been shrinked accordingly.
|
||||
///
|
||||
/// <https://docs.rs/bbqueue/latest/bbqueue/framed/index.html#frame-header>
|
||||
capacity: usize,
|
||||
}
|
||||
|
||||
pub struct WriterBbqueueReceiver<'a> {
|
||||
inner: Vec<bbqueue::framed::FrameConsumer<'a>>,
|
||||
/// Used to park when no more work is required
|
||||
parker: Parker,
|
||||
/// Used to wake up when new entries are available either in
|
||||
/// any BBQueue buffer or directly sent throught this channel
|
||||
/// (still written to disk).
|
||||
receiver: crossbeam_channel::Receiver<ReceiverAction>,
|
||||
/// The BBQueue frames to read when waking-up.
|
||||
consumers: Vec<bbqueue::framed::FrameConsumer<'a>>,
|
||||
}
|
||||
|
||||
/// The action to perform on the receiver/writer side.
|
||||
pub enum ReceiverAction {
|
||||
/// Wake up, you have frames to read for the BBQueue buffers.
|
||||
WakeUp,
|
||||
/// An entry that cannot fit in the BBQueue buffers has been
|
||||
/// written to disk, memory-mapped and must be written in the
|
||||
/// database.
|
||||
LargeEntry {
|
||||
/// The database where the entry must be written.
|
||||
database: Database,
|
||||
/// The key of the entry that must be written in the database.
|
||||
key: Box<[u8]>,
|
||||
/// The large value that must be written.
|
||||
///
|
||||
/// Note: We can probably use a `File` here and
|
||||
/// use `Database::put_reserved` instead of memory-mapping.
|
||||
value: Mmap,
|
||||
},
|
||||
}
|
||||
|
||||
impl<'a> WriterBbqueueReceiver<'a> {
|
||||
pub fn recv(&mut self) -> Option<ReceiverAction> {
|
||||
self.receiver.recv().ok()
|
||||
}
|
||||
|
||||
pub fn read(&mut self) -> Option<FrameWithHeader<'a>> {
|
||||
loop {
|
||||
for consumer in &mut self.inner {
|
||||
// mark the frame as auto release
|
||||
if let Some() = consumer.read()
|
||||
for consumer in &mut self.consumers {
|
||||
if let Some(frame) = consumer.read() {
|
||||
return Some(FrameWithHeader::from(frame));
|
||||
}
|
||||
break None;
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
struct FrameWithHeader<'a> {
|
||||
pub struct FrameWithHeader<'a> {
|
||||
header: EntryHeader,
|
||||
frame: FrameGrantR<'a>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, CheckedBitPattern)]
|
||||
#[repr(u8)]
|
||||
enum EntryHeader {
|
||||
/// Wether a put of the key/value pair or a delete of the given key.
|
||||
DbOperation {
|
||||
/// The database on which to perform the operation.
|
||||
database: Database,
|
||||
/// The key length in the buffer.
|
||||
///
|
||||
/// If None it means that the buffer is dedicated
|
||||
/// to the key and it is therefore a deletion operation.
|
||||
key_length: Option<NonZeroU16>,
|
||||
},
|
||||
ArroyDeleteVector {
|
||||
docid: DocumentId,
|
||||
},
|
||||
/// The embedding is the remaining space and represents a non-aligned [f32].
|
||||
ArroySetVector {
|
||||
docid: DocumentId,
|
||||
embedder_id: u8,
|
||||
},
|
||||
impl FrameWithHeader<'_> {
|
||||
pub fn header(&self) -> EntryHeader {
|
||||
self.header
|
||||
}
|
||||
|
||||
pub fn frame(&self) -> &FrameGrantR<'_> {
|
||||
&self.frame
|
||||
}
|
||||
}
|
||||
|
||||
impl EntryHeader {
|
||||
fn delete_key_size(key_length: u16) -> usize {
|
||||
mem::size_of::<Self>() + key_length as usize
|
||||
}
|
||||
|
||||
fn put_key_value_size(key_length: u16, value_length: usize) -> usize {
|
||||
mem::size_of::<Self>() + key_length as usize + value_length
|
||||
}
|
||||
|
||||
fn bytes_of(&self) -> &[u8] {
|
||||
/// TODO do the variant matching ourselves
|
||||
todo!()
|
||||
impl<'a> From<FrameGrantR<'a>> for FrameWithHeader<'a> {
|
||||
fn from(mut frame: FrameGrantR<'a>) -> Self {
|
||||
frame.auto_release(true);
|
||||
FrameWithHeader { header: EntryHeader::from_slice(&frame[..]), frame }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)]
|
||||
#[repr(u32)]
|
||||
#[repr(C)]
|
||||
/// Wether a put of the key/value pair or a delete of the given key.
|
||||
pub struct DbOperation {
|
||||
/// The database on which to perform the operation.
|
||||
pub database: Database,
|
||||
/// The key length in the buffer.
|
||||
///
|
||||
/// If None it means that the buffer is dedicated
|
||||
/// to the key and it is therefore a deletion operation.
|
||||
pub key_length: Option<NonZeroU16>,
|
||||
}
|
||||
|
||||
impl DbOperation {
|
||||
pub fn key_value<'a>(&self, frame: &'a FrameGrantR<'_>) -> (&'a [u8], Option<&'a [u8]>) {
|
||||
/// TODO replace the return type by an enum Write | Delete
|
||||
let skip = EntryHeader::variant_size() + mem::size_of::<Self>();
|
||||
match self.key_length {
|
||||
Some(key_length) => {
|
||||
let (key, value) = frame[skip..].split_at(key_length.get() as usize);
|
||||
(key, Some(value))
|
||||
}
|
||||
None => (&frame[skip..], None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)]
|
||||
#[repr(transparent)]
|
||||
pub struct ArroyDeleteVector {
|
||||
pub docid: DocumentId,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)]
|
||||
#[repr(C)]
|
||||
/// The embedding is the remaining space and represents a non-aligned [f32].
|
||||
pub struct ArroySetVector {
|
||||
pub docid: DocumentId,
|
||||
pub embedder_id: u8,
|
||||
_padding: [u8; 3],
|
||||
}
|
||||
|
||||
impl ArroySetVector {
|
||||
pub fn read_embedding_into_vec<'v>(
|
||||
&self,
|
||||
frame: &FrameGrantR<'_>,
|
||||
vec: &'v mut Vec<f32>,
|
||||
) -> &'v [f32] {
|
||||
vec.clear();
|
||||
let skip = EntryHeader::variant_size() + mem::size_of::<Self>();
|
||||
let bytes = &frame[skip..];
|
||||
bytes.chunks_exact(mem::size_of::<f32>()).for_each(|bytes| {
|
||||
let f = bytes.try_into().map(f32::from_ne_bytes).unwrap();
|
||||
vec.push(f);
|
||||
});
|
||||
&vec[..]
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
#[repr(u8)]
|
||||
pub enum EntryHeader {
|
||||
DbOperation(DbOperation),
|
||||
ArroyDeleteVector(ArroyDeleteVector),
|
||||
ArroySetVector(ArroySetVector),
|
||||
}
|
||||
|
||||
impl EntryHeader {
|
||||
const fn variant_size() -> usize {
|
||||
mem::size_of::<u8>()
|
||||
}
|
||||
|
||||
const fn variant_id(&self) -> u8 {
|
||||
match self {
|
||||
EntryHeader::DbOperation(_) => 0,
|
||||
EntryHeader::ArroyDeleteVector(_) => 1,
|
||||
EntryHeader::ArroySetVector(_) => 2,
|
||||
}
|
||||
}
|
||||
|
||||
const fn total_key_value_size(key_length: NonZeroU16, value_length: usize) -> usize {
|
||||
Self::variant_size()
|
||||
+ mem::size_of::<DbOperation>()
|
||||
+ key_length.get() as usize
|
||||
+ value_length
|
||||
}
|
||||
|
||||
const fn total_key_size(key_length: NonZeroU16) -> usize {
|
||||
Self::total_key_value_size(key_length, 0)
|
||||
}
|
||||
|
||||
const fn total_delete_vector_size() -> usize {
|
||||
Self::variant_size() + mem::size_of::<ArroyDeleteVector>()
|
||||
}
|
||||
|
||||
/// The `embedding_length` corresponds to the number of `f32` in the embedding.
|
||||
fn total_set_vector_size(embedding_length: usize) -> usize {
|
||||
Self::variant_size()
|
||||
+ mem::size_of::<ArroySetVector>()
|
||||
+ embedding_length * mem::size_of::<f32>()
|
||||
}
|
||||
|
||||
fn header_size(&self) -> usize {
|
||||
let payload_size = match self {
|
||||
EntryHeader::DbOperation(op) => mem::size_of_val(op),
|
||||
EntryHeader::ArroyDeleteVector(adv) => mem::size_of_val(adv),
|
||||
EntryHeader::ArroySetVector(asv) => mem::size_of_val(asv),
|
||||
};
|
||||
Self::variant_size() + payload_size
|
||||
}
|
||||
|
||||
fn from_slice(slice: &[u8]) -> EntryHeader {
|
||||
let (variant_id, remaining) = slice.split_first().unwrap();
|
||||
match variant_id {
|
||||
0 => {
|
||||
let header_bytes = &remaining[..mem::size_of::<DbOperation>()];
|
||||
let header = checked::pod_read_unaligned(header_bytes);
|
||||
EntryHeader::DbOperation(header)
|
||||
}
|
||||
1 => {
|
||||
let header_bytes = &remaining[..mem::size_of::<ArroyDeleteVector>()];
|
||||
let header = checked::pod_read_unaligned(header_bytes);
|
||||
EntryHeader::ArroyDeleteVector(header)
|
||||
}
|
||||
2 => {
|
||||
let header_bytes = &remaining[..mem::size_of::<ArroySetVector>()];
|
||||
let header = checked::pod_read_unaligned(header_bytes);
|
||||
EntryHeader::ArroySetVector(header)
|
||||
}
|
||||
id => panic!("invalid variant id: {id}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn serialize_into(&self, header_bytes: &mut [u8]) {
|
||||
let (first, remaining) = header_bytes.split_first_mut().unwrap();
|
||||
let payload_bytes = match self {
|
||||
EntryHeader::DbOperation(op) => bytemuck::bytes_of(op),
|
||||
EntryHeader::ArroyDeleteVector(adv) => bytemuck::bytes_of(adv),
|
||||
EntryHeader::ArroySetVector(asv) => bytemuck::bytes_of(asv),
|
||||
};
|
||||
*first = self.variant_id();
|
||||
remaining.copy_from_slice(payload_bytes);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, NoUninit, CheckedBitPattern)]
|
||||
#[repr(u16)]
|
||||
pub enum Database {
|
||||
Main,
|
||||
Documents,
|
||||
@ -197,20 +371,6 @@ impl From<FacetKind> for Database {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct ExtractorBbqueueSender<'a> {
|
||||
inner: ThreadLocal<FullySend<RefCell<FrameProducer<'a>>>>,
|
||||
/// The capacity of this frame producer, will never be able to store more than that.
|
||||
///
|
||||
/// Note that the FrameProducer requires up to 9 bytes to encode the length,
|
||||
/// the capacity has been shrinked accordingly.
|
||||
///
|
||||
/// <https://docs.rs/bbqueue/latest/bbqueue/framed/index.html#frame-header>
|
||||
capacity: usize,
|
||||
/// Used to wake up the receiver thread,
|
||||
/// Used everytime we write something in the producer.
|
||||
unparker: Unparker,
|
||||
}
|
||||
|
||||
impl<'b> ExtractorBbqueueSender<'b> {
|
||||
pub fn docids<'a, D: DatabaseType>(&'a self) -> WordDocidsSender<'a, 'b, D> {
|
||||
WordDocidsSender { sender: self, _marker: PhantomData }
|
||||
@ -236,80 +396,171 @@ impl<'b> ExtractorBbqueueSender<'b> {
|
||||
GeoSender(&self)
|
||||
}
|
||||
|
||||
fn send_delete_vector(&self, docid: DocumentId) -> crate::Result<()> {
|
||||
match self
|
||||
.sender
|
||||
.send(WriterOperation::ArroyOperation(ArroyOperation::DeleteVectors { docid }))
|
||||
{
|
||||
Ok(()) => Ok(()),
|
||||
Err(SendError(_)) => Err(SendError(())),
|
||||
fn delete_vector(&self, docid: DocumentId) -> crate::Result<()> {
|
||||
let capacity = self.capacity;
|
||||
let refcell = self.producers.get().unwrap();
|
||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||
|
||||
let payload_header = EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid });
|
||||
let total_length = EntryHeader::total_delete_vector_size();
|
||||
if total_length > capacity {
|
||||
unreachable!("entry larger that the BBQueue capacity");
|
||||
}
|
||||
|
||||
// Spin loop to have a frame the size we requested.
|
||||
let mut grant = loop {
|
||||
match producer.grant(total_length) {
|
||||
Ok(grant) => break grant,
|
||||
Err(bbqueue::Error::InsufficientSize) => continue,
|
||||
Err(e) => unreachable!("{e:?}"),
|
||||
}
|
||||
};
|
||||
|
||||
payload_header.serialize_into(&mut grant);
|
||||
|
||||
// We could commit only the used memory.
|
||||
grant.commit(total_length);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_vector(
|
||||
&self,
|
||||
docid: DocumentId,
|
||||
embedder_id: u8,
|
||||
embedding: &[f32],
|
||||
) -> crate::Result<()> {
|
||||
let capacity = self.capacity;
|
||||
let refcell = self.producers.get().unwrap();
|
||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||
|
||||
let payload_header =
|
||||
EntryHeader::ArroySetVector(ArroySetVector { docid, embedder_id, _padding: [0; 3] });
|
||||
let total_length = EntryHeader::total_set_vector_size(embedding.len());
|
||||
if total_length > capacity {
|
||||
unreachable!("entry larger that the BBQueue capacity");
|
||||
}
|
||||
|
||||
// Spin loop to have a frame the size we requested.
|
||||
let mut grant = loop {
|
||||
match producer.grant(total_length) {
|
||||
Ok(grant) => break grant,
|
||||
Err(bbqueue::Error::InsufficientSize) => continue,
|
||||
Err(e) => unreachable!("{e:?}"),
|
||||
}
|
||||
};
|
||||
|
||||
// payload_header.serialize_into(&mut grant);
|
||||
let header_size = payload_header.header_size();
|
||||
let (header_bytes, remaining) = grant.split_at_mut(header_size);
|
||||
payload_header.serialize_into(header_bytes);
|
||||
remaining.copy_from_slice(bytemuck::cast_slice(embedding));
|
||||
|
||||
// We could commit only the used memory.
|
||||
grant.commit(total_length);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_key_value(&self, database: Database, key: &[u8], value: &[u8]) -> crate::Result<()> {
|
||||
let key_length = NonZeroU16::new(key.len().try_into().unwrap()).unwrap();
|
||||
self.write_key_value_with(database, key_length, value.len(), |buffer| {
|
||||
let (key_buffer, value_buffer) = buffer.split_at_mut(key.len());
|
||||
key_buffer.copy_from_slice(key);
|
||||
value_buffer.copy_from_slice(value);
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn write_key_value_with<F>(
|
||||
&self,
|
||||
database: Database,
|
||||
key_length: NonZeroU16,
|
||||
value_length: usize,
|
||||
key_value_writer: F,
|
||||
) -> crate::Result<()>
|
||||
where
|
||||
F: FnOnce(&mut [u8]) -> crate::Result<()>,
|
||||
{
|
||||
let capacity = self.capacity;
|
||||
let refcell = self.inner.get().unwrap();
|
||||
let refcell = self.producers.get().unwrap();
|
||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||
|
||||
let key_length = key.len().try_into().unwrap();
|
||||
let value_length = value.len();
|
||||
let total_length = EntryHeader::put_key_value_size(key_length, value_length);
|
||||
let operation = DbOperation { database, key_length: Some(key_length) };
|
||||
let payload_header = EntryHeader::DbOperation(operation);
|
||||
let total_length = EntryHeader::total_key_value_size(key_length, value_length);
|
||||
if total_length > capacity {
|
||||
unreachable!("entry larger that the bbqueue capacity");
|
||||
unreachable!("entry larger that the BBQueue capacity");
|
||||
}
|
||||
|
||||
let payload_header =
|
||||
EntryHeader::DbOperation { database, key_length: NonZeroU16::new(key_length) };
|
||||
|
||||
loop {
|
||||
let mut grant = match producer.grant(total_length) {
|
||||
Ok(grant) => grant,
|
||||
// Spin loop to have a frame the size we requested.
|
||||
let mut grant = loop {
|
||||
match producer.grant(total_length) {
|
||||
Ok(grant) => break grant,
|
||||
Err(bbqueue::Error::InsufficientSize) => continue,
|
||||
Err(e) => unreachable!("{e:?}"),
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
let (header, remaining) = grant.split_at_mut(mem::size_of::<EntryHeader>());
|
||||
header.copy_from_slice(payload_header.bytes_of());
|
||||
let (key_out, value_out) = remaining.split_at_mut(key.len());
|
||||
key_out.copy_from_slice(key);
|
||||
value_out.copy_from_slice(value);
|
||||
let header_size = payload_header.header_size();
|
||||
let (header_bytes, remaining) = grant.split_at_mut(header_size);
|
||||
payload_header.serialize_into(header_bytes);
|
||||
key_value_writer(remaining)?;
|
||||
|
||||
// We could commit only the used memory.
|
||||
grant.commit(total_length);
|
||||
// We could commit only the used memory.
|
||||
grant.commit(total_length);
|
||||
|
||||
break Ok(());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn delete_entry(&self, database: Database, key: &[u8]) -> crate::Result<()> {
|
||||
let key_length = NonZeroU16::new(key.len().try_into().unwrap()).unwrap();
|
||||
self.delete_entry_with(database, key_length, |buffer| {
|
||||
buffer.copy_from_slice(key);
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
fn delete_entry_with<F>(
|
||||
&self,
|
||||
database: Database,
|
||||
key_length: NonZeroU16,
|
||||
key_writer: F,
|
||||
) -> crate::Result<()>
|
||||
where
|
||||
F: FnOnce(&mut [u8]) -> crate::Result<()>,
|
||||
{
|
||||
let capacity = self.capacity;
|
||||
let refcell = self.inner.get().unwrap();
|
||||
let refcell = self.producers.get().unwrap();
|
||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||
|
||||
let key_length = key.len().try_into().unwrap();
|
||||
let total_length = EntryHeader::delete_key_size(key_length);
|
||||
// For deletion we do not specify the key length,
|
||||
// it's in the remaining bytes.
|
||||
let operation = DbOperation { database, key_length: None };
|
||||
let payload_header = EntryHeader::DbOperation(operation);
|
||||
let total_length = EntryHeader::total_key_size(key_length);
|
||||
if total_length > capacity {
|
||||
unreachable!("entry larger that the bbqueue capacity");
|
||||
unreachable!("entry larger that the BBQueue capacity");
|
||||
}
|
||||
|
||||
let payload_header = EntryHeader::DbOperation { database, key_length: None };
|
||||
|
||||
loop {
|
||||
let mut grant = match producer.grant(total_length) {
|
||||
Ok(grant) => grant,
|
||||
// Spin loop to have a frame the size we requested.
|
||||
let mut grant = loop {
|
||||
match producer.grant(total_length) {
|
||||
Ok(grant) => break grant,
|
||||
Err(bbqueue::Error::InsufficientSize) => continue,
|
||||
Err(e) => unreachable!("{e:?}"),
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
let (header, remaining) = grant.split_at_mut(mem::size_of::<EntryHeader>());
|
||||
header.copy_from_slice(payload_header.bytes_of());
|
||||
remaining.copy_from_slice(key);
|
||||
let header_size = payload_header.header_size();
|
||||
let (header_bytes, remaining) = grant.split_at_mut(header_size);
|
||||
payload_header.serialize_into(header_bytes);
|
||||
key_writer(remaining)?;
|
||||
|
||||
// We could commit only the used memory.
|
||||
grant.commit(total_length);
|
||||
// We could commit only the used memory.
|
||||
grant.commit(total_length);
|
||||
|
||||
break Ok(());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@ -355,72 +606,18 @@ pub struct WordDocidsSender<'a, 'b, D> {
|
||||
|
||||
impl<D: DatabaseType> WordDocidsSender<'_, '_, D> {
|
||||
pub fn write(&self, key: &[u8], bitmap: &RoaringBitmap) -> crate::Result<()> {
|
||||
let capacity = self.sender.capacity;
|
||||
let refcell = self.sender.inner.get().unwrap();
|
||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||
|
||||
let key_length = key.len().try_into().unwrap();
|
||||
let key_length = NonZeroU16::new(key.len().try_into().unwrap()).unwrap();
|
||||
let value_length = CboRoaringBitmapCodec::serialized_size(bitmap);
|
||||
|
||||
let total_length = EntryHeader::put_key_value_size(key_length, value_length);
|
||||
if total_length > capacity {
|
||||
unreachable!("entry larger that the bbqueue capacity");
|
||||
}
|
||||
|
||||
let payload_header = EntryHeader::DbOperation {
|
||||
database: D::DATABASE,
|
||||
key_length: NonZeroU16::new(key_length),
|
||||
};
|
||||
|
||||
loop {
|
||||
let mut grant = match producer.grant(total_length) {
|
||||
Ok(grant) => grant,
|
||||
Err(bbqueue::Error::InsufficientSize) => continue,
|
||||
Err(e) => unreachable!("{e:?}"),
|
||||
};
|
||||
|
||||
let (header, remaining) = grant.split_at_mut(mem::size_of::<EntryHeader>());
|
||||
header.copy_from_slice(payload_header.bytes_of());
|
||||
let (key_out, value_out) = remaining.split_at_mut(key.len());
|
||||
key_out.copy_from_slice(key);
|
||||
CboRoaringBitmapCodec::serialize_into_writer(bitmap, value_out)?;
|
||||
|
||||
// We could commit only the used memory.
|
||||
grant.commit(total_length);
|
||||
|
||||
break Ok(());
|
||||
}
|
||||
self.sender.write_key_value_with(D::DATABASE, key_length, value_length, |buffer| {
|
||||
let (key_buffer, value_buffer) = buffer.split_at_mut(key.len());
|
||||
key_buffer.copy_from_slice(key);
|
||||
CboRoaringBitmapCodec::serialize_into_writer(bitmap, value_buffer)?;
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
pub fn delete(&self, key: &[u8]) -> crate::Result<()> {
|
||||
let capacity = self.sender.capacity;
|
||||
let refcell = self.sender.inner.get().unwrap();
|
||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||
|
||||
let key_length = key.len().try_into().unwrap();
|
||||
let total_length = EntryHeader::delete_key_size(key_length);
|
||||
if total_length > capacity {
|
||||
unreachable!("entry larger that the bbqueue capacity");
|
||||
}
|
||||
|
||||
let payload_header = EntryHeader::DbOperation { database: D::DATABASE, key_length: None };
|
||||
|
||||
loop {
|
||||
let mut grant = match producer.grant(total_length) {
|
||||
Ok(grant) => grant,
|
||||
Err(bbqueue::Error::InsufficientSize) => continue,
|
||||
Err(e) => unreachable!("{e:?}"),
|
||||
};
|
||||
|
||||
let (header, remaining) = grant.split_at_mut(mem::size_of::<EntryHeader>());
|
||||
header.copy_from_slice(payload_header.bytes_of());
|
||||
remaining.copy_from_slice(key);
|
||||
|
||||
// We could commit only the used memory.
|
||||
grant.commit(total_length);
|
||||
|
||||
break Ok(());
|
||||
}
|
||||
self.sender.delete_entry(D::DATABASE, key)
|
||||
}
|
||||
}
|
||||
|
||||
@ -430,13 +627,10 @@ pub struct FacetDocidsSender<'a, 'b> {
|
||||
|
||||
impl FacetDocidsSender<'_, '_> {
|
||||
pub fn write(&self, key: &[u8], bitmap: &RoaringBitmap) -> crate::Result<()> {
|
||||
let capacity = self.sender.capacity;
|
||||
let refcell = self.sender.inner.get().unwrap();
|
||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||
|
||||
let (facet_kind, key) = FacetKind::extract_from_key(key);
|
||||
let key_length = key.len().try_into().unwrap();
|
||||
let database = Database::from(facet_kind);
|
||||
|
||||
let key_length = NonZeroU16::new(key.len().try_into().unwrap()).unwrap();
|
||||
let value_length = CboRoaringBitmapCodec::serialized_size(bitmap);
|
||||
let value_length = match facet_kind {
|
||||
// We must take the facet group size into account
|
||||
@ -445,26 +639,8 @@ impl FacetDocidsSender<'_, '_> {
|
||||
FacetKind::Null | FacetKind::Empty | FacetKind::Exists => value_length,
|
||||
};
|
||||
|
||||
let total_length = EntryHeader::put_key_value_size(key_length, value_length);
|
||||
if total_length > capacity {
|
||||
unreachable!("entry larger that the bbqueue capacity");
|
||||
}
|
||||
|
||||
let payload_header = EntryHeader::DbOperation {
|
||||
database: Database::from(facet_kind),
|
||||
key_length: NonZeroU16::new(key_length),
|
||||
};
|
||||
|
||||
loop {
|
||||
let mut grant = match producer.grant(total_length) {
|
||||
Ok(grant) => grant,
|
||||
Err(bbqueue::Error::InsufficientSize) => continue,
|
||||
Err(e) => unreachable!("{e:?}"),
|
||||
};
|
||||
|
||||
let (header, remaining) = grant.split_at_mut(mem::size_of::<EntryHeader>());
|
||||
header.copy_from_slice(payload_header.bytes_of());
|
||||
let (key_out, value_out) = remaining.split_at_mut(key.len());
|
||||
self.sender.write_key_value_with(database, key_length, value_length, |buffer| {
|
||||
let (key_out, value_out) = buffer.split_at_mut(key.len());
|
||||
key_out.copy_from_slice(key);
|
||||
|
||||
let value_out = match facet_kind {
|
||||
@ -477,47 +653,17 @@ impl FacetDocidsSender<'_, '_> {
|
||||
}
|
||||
FacetKind::Null | FacetKind::Empty | FacetKind::Exists => value_out,
|
||||
};
|
||||
|
||||
CboRoaringBitmapCodec::serialize_into_writer(bitmap, value_out)?;
|
||||
|
||||
// We could commit only the used memory.
|
||||
grant.commit(total_length);
|
||||
|
||||
break Ok(());
|
||||
}
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
pub fn delete(&self, key: &[u8]) -> crate::Result<()> {
|
||||
let capacity = self.sender.capacity;
|
||||
let refcell = self.sender.inner.get().unwrap();
|
||||
let mut producer = refcell.0.borrow_mut_or_yield();
|
||||
|
||||
let (facet_kind, key) = FacetKind::extract_from_key(key);
|
||||
let key_length = key.len().try_into().unwrap();
|
||||
|
||||
let total_length = EntryHeader::delete_key_size(key_length);
|
||||
if total_length > capacity {
|
||||
unreachable!("entry larger that the bbqueue capacity");
|
||||
}
|
||||
|
||||
let payload_header =
|
||||
EntryHeader::DbOperation { database: Database::from(facet_kind), key_length: None };
|
||||
|
||||
loop {
|
||||
let mut grant = match producer.grant(total_length) {
|
||||
Ok(grant) => grant,
|
||||
Err(bbqueue::Error::InsufficientSize) => continue,
|
||||
Err(e) => unreachable!("{e:?}"),
|
||||
};
|
||||
|
||||
let (header, remaining) = grant.split_at_mut(mem::size_of::<EntryHeader>());
|
||||
header.copy_from_slice(payload_header.bytes_of());
|
||||
remaining.copy_from_slice(key);
|
||||
|
||||
// We could commit only the used memory.
|
||||
grant.commit(total_length);
|
||||
|
||||
break Ok(());
|
||||
}
|
||||
let database = Database::from(facet_kind);
|
||||
self.sender.delete_entry(database, key)
|
||||
}
|
||||
}
|
||||
|
||||
@ -565,7 +711,7 @@ impl DocumentsSender<'_, '_> {
|
||||
|
||||
pub fn delete(&self, docid: DocumentId, external_id: String) -> crate::Result<()> {
|
||||
self.0.delete_entry(Database::Documents, &docid.to_be_bytes())?;
|
||||
self.0.send_delete_vector(docid)?;
|
||||
self.0.delete_vector(docid)?;
|
||||
self.0.delete_entry(Database::ExternalDocumentsIds, external_id.as_bytes())
|
||||
}
|
||||
}
|
||||
@ -579,13 +725,10 @@ impl EmbeddingSender<'_, '_> {
|
||||
embedder_id: u8,
|
||||
embeddings: Vec<Embedding>,
|
||||
) -> crate::Result<()> {
|
||||
self.0
|
||||
.send(WriterOperation::ArroyOperation(ArroyOperation::SetVectors {
|
||||
docid,
|
||||
embedder_id,
|
||||
embeddings,
|
||||
}))
|
||||
.map_err(|_| SendError(()))
|
||||
for embedding in embeddings {
|
||||
self.set_vector(docid, embedder_id, embedding)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn set_vector(
|
||||
@ -593,21 +736,8 @@ impl EmbeddingSender<'_, '_> {
|
||||
docid: DocumentId,
|
||||
embedder_id: u8,
|
||||
embedding: Embedding,
|
||||
) -> StdResult<(), SendError<()>> {
|
||||
self.0
|
||||
.send(WriterOperation::ArroyOperation(ArroyOperation::SetVector {
|
||||
docid,
|
||||
embedder_id,
|
||||
embedding,
|
||||
}))
|
||||
.map_err(|_| SendError(()))
|
||||
}
|
||||
|
||||
/// Marks all embedders as "to be built"
|
||||
pub fn finish(self, configs: Vec<IndexEmbeddingConfig>) -> StdResult<(), SendError<()>> {
|
||||
self.0
|
||||
.send(WriterOperation::ArroyOperation(ArroyOperation::Finish { configs }))
|
||||
.map_err(|_| SendError(()))
|
||||
) -> crate::Result<()> {
|
||||
self.0.set_vector(docid, embedder_id, &embedding[..])
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -76,7 +76,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
|
||||
context.data,
|
||||
&self.possible_embedding_mistakes,
|
||||
self.threads,
|
||||
self.sender,
|
||||
&self.sender,
|
||||
&context.doc_alloc,
|
||||
))
|
||||
}
|
||||
|
@ -40,7 +40,7 @@ use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
|
||||
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
|
||||
use crate::update::settings::InnerIndexSettings;
|
||||
use crate::update::{FacetsUpdateBulk, GrenadParameters};
|
||||
use crate::vector::{ArroyWrapper, EmbeddingConfigs, Embeddings};
|
||||
use crate::vector::{ArroyWrapper, EmbeddingConfigs};
|
||||
use crate::{
|
||||
Error, FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort,
|
||||
ThreadPoolNoAbortBuilder, UserError,
|
||||
@ -80,7 +80,7 @@ where
|
||||
let bbbuffers: Vec<_> = (0..rayon::current_num_threads())
|
||||
.map(|_| bbqueue::BBBuffer::new(100 * 1024 * 1024)) // 100 MiB by thread
|
||||
.collect();
|
||||
let (extractor_sender, writer_receiver) = extractor_writer_bbqueue(&bbbuffers);
|
||||
let (extractor_sender, writer_receiver) = extractor_writer_bbqueue(&bbbuffers, 1000);
|
||||
let finished_extraction = AtomicBool::new(false);
|
||||
|
||||
let metadata_builder = MetadataBuilder::from_index(index, wtxn)?;
|
||||
@ -386,7 +386,11 @@ where
|
||||
})
|
||||
.collect();
|
||||
|
||||
// Used by by the ArroySetVector to copy the embedding into an
|
||||
// aligned memory area, required by arroy to accept a new vector.
|
||||
let mut aligned_embedding = Vec::new();
|
||||
let mut arroy_writers = arroy_writers?;
|
||||
|
||||
{
|
||||
let span = tracing::trace_span!(target: "indexing::write_db", "all");
|
||||
let _entered = span.enter();
|
||||
@ -394,81 +398,85 @@ where
|
||||
let span = tracing::trace_span!(target: "indexing::write_db", "post_merge");
|
||||
let mut _entered_post_merge = None;
|
||||
|
||||
for operation in writer_receiver {
|
||||
while let Some(action) = writer_receiver.recv() {
|
||||
if _entered_post_merge.is_none()
|
||||
&& finished_extraction.load(std::sync::atomic::Ordering::Relaxed)
|
||||
{
|
||||
_entered_post_merge = Some(span.enter());
|
||||
}
|
||||
match operation {
|
||||
WriterOperation::DbOperation(db_operation) => {
|
||||
let database = db_operation.database(index);
|
||||
let database_name = db_operation.database_name();
|
||||
match db_operation.entry() {
|
||||
EntryOperation::Delete(e) => match database.delete(wtxn, e.entry()) {
|
||||
Ok(false) => unreachable!("We tried to delete an unknown key"),
|
||||
Ok(_) => (),
|
||||
Err(error) => {
|
||||
return Err(Error::InternalError(
|
||||
InternalError::StoreDeletion {
|
||||
database_name,
|
||||
key: e.entry().to_owned(),
|
||||
error,
|
||||
},
|
||||
));
|
||||
}
|
||||
},
|
||||
EntryOperation::Write(e) => {
|
||||
if let Err(error) = database.put(wtxn, e.key(), e.value()) {
|
||||
return Err(Error::InternalError(InternalError::StorePut {
|
||||
database_name,
|
||||
key: e.key().to_owned(),
|
||||
value_length: e.value().len(),
|
||||
error,
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
match action {
|
||||
ReceiverAction::WakeUp => (),
|
||||
ReceiverAction::LargeEntry { database, key, value } => {
|
||||
let database_name = database.database_name();
|
||||
let database = database.database(index);
|
||||
if let Err(error) = database.put(wtxn, &key, &value) {
|
||||
return Err(Error::InternalError(InternalError::StorePut {
|
||||
database_name,
|
||||
key,
|
||||
value_length: value.len(),
|
||||
error,
|
||||
}));
|
||||
}
|
||||
}
|
||||
WriterOperation::ArroyOperation(arroy_operation) => match arroy_operation {
|
||||
ArroyOperation::DeleteVectors { docid } => {
|
||||
for (
|
||||
_embedder_index,
|
||||
(_embedder_name, _embedder, writer, dimensions),
|
||||
) in &mut arroy_writers
|
||||
{
|
||||
}
|
||||
|
||||
while let Some(frame_with_header) = writer_receiver.read() {
|
||||
match frame_with_header.header() {
|
||||
EntryHeader::DbOperation(operation) => {
|
||||
let database_name = operation.database.database_name();
|
||||
let database = operation.database.database(index);
|
||||
let frame = frame_with_header.frame();
|
||||
match operation.key_value(frame) {
|
||||
(key, Some(value)) => {
|
||||
if let Err(error) = database.put(wtxn, key, value) {
|
||||
return Err(Error::InternalError(InternalError::StorePut {
|
||||
database_name,
|
||||
key: key.into(),
|
||||
value_length: value.len(),
|
||||
error,
|
||||
}));
|
||||
}
|
||||
}
|
||||
(key, None) => match database.delete(wtxn, key) {
|
||||
Ok(false) => {
|
||||
unreachable!("We tried to delete an unknown key: {key:?}")
|
||||
}
|
||||
Ok(_) => (),
|
||||
Err(error) => {
|
||||
return Err(Error::InternalError(
|
||||
InternalError::StoreDeletion {
|
||||
database_name,
|
||||
key: key.into(),
|
||||
error,
|
||||
},
|
||||
));
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
EntryHeader::ArroyDeleteVector(ArroyDeleteVector { docid }) => {
|
||||
for (_index, (_name, _embedder, writer, dimensions)) in &mut arroy_writers {
|
||||
let dimensions = *dimensions;
|
||||
writer.del_items(wtxn, dimensions, docid)?;
|
||||
}
|
||||
}
|
||||
ArroyOperation::SetVectors {
|
||||
docid,
|
||||
embedder_id,
|
||||
embeddings: raw_embeddings,
|
||||
} => {
|
||||
let (_, _, writer, dimensions) = arroy_writers
|
||||
.get(&embedder_id)
|
||||
.expect("requested a missing embedder");
|
||||
|
||||
let mut embeddings = Embeddings::new(*dimensions);
|
||||
for embedding in raw_embeddings {
|
||||
embeddings.append(embedding).unwrap();
|
||||
}
|
||||
|
||||
writer.del_items(wtxn, *dimensions, docid)?;
|
||||
writer.add_items(wtxn, docid, &embeddings)?;
|
||||
EntryHeader::ArroySetVector(asv) => {
|
||||
let ArroySetVector { docid, embedder_id, .. } = asv;
|
||||
let frame = frame_with_header.frame();
|
||||
let embedding = asv.read_embedding_into_vec(frame, &mut aligned_embedding);
|
||||
let (_, _, writer, dimensions) =
|
||||
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
|
||||
writer.del_items(wtxn, *dimensions, docid)?;
|
||||
writer.add_item(wtxn, docid, embedding)?;
|
||||
}
|
||||
}
|
||||
ArroyOperation::SetVector { docid, embedder_id, embedding } => {
|
||||
let (_, _, writer, dimensions) =
|
||||
arroy_writers.get(&embedder_id).expect("requested a missing embedder");
|
||||
writer.del_items(wtxn, *dimensions, docid)?;
|
||||
writer.add_item(wtxn, docid, &embedding)?;
|
||||
}
|
||||
_otherwise => unreachable!(),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
todo!("read the BBQueue once the channel is closed");
|
||||
|
||||
'vectors: {
|
||||
let span =
|
||||
tracing::trace_span!(target: "indexing::vectors", parent: &indexer_span, "build");
|
||||
|
Loading…
Reference in New Issue
Block a user