From 874c1ac538eea2406db3005eecf4a49249282b03 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Thu, 29 Aug 2024 15:07:59 +0200 Subject: [PATCH] First channels types --- Cargo.lock | 2 +- milli/src/update/new/channel.rs | 93 +++++++++++++++++++++++++++++++++ milli/src/update/new/mod.rs | 9 +++- 3 files changed, 101 insertions(+), 3 deletions(-) create mode 100644 milli/src/update/new/channel.rs diff --git a/Cargo.lock b/Cargo.lock index c3e9532e2..a21cbc007 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3858,7 +3858,7 @@ checksum = "a2e27bcfe835a379d32352112f6b8dbae2d99d16a5fff42abe6e5ba5386c1e5a" [[package]] name = "obkv" version = "0.3.0" -source = "git+https://github.com/kerollmops/obkv?branch=unsized-kvreader#d248eb7edd3453ff758afc2883f6ae25684eb69e" +source = "git+https://github.com/kerollmops/obkv?branch=unsized-kvreader#5289a6658cd471f4212c1edc1a40b2a3c3d11fe0" [[package]] name = "once_cell" diff --git a/milli/src/update/new/channel.rs b/milli/src/update/new/channel.rs new file mode 100644 index 000000000..0dd2d9935 --- /dev/null +++ b/milli/src/update/new/channel.rs @@ -0,0 +1,93 @@ +use crossbeam_channel::{Receiver, RecvError, SendError, Sender}; +use heed::types::Bytes; + +use super::indexer::KvReaderFieldId; +use super::StdResult; +use crate::{DocumentId, Index}; + +/// The capacity of the channel is currently in number of messages. +pub fn merge_writer_channel(cap: usize) -> WriterChannels { + let (sender, receiver) = crossbeam_channel::bounded(cap); + + WriterChannels { + writer_receiver: WriterReceiver(receiver), + merger_sender: MergerSender(sender.clone()), + document_sender: DocumentSender(sender), + } +} + +pub struct WriterChannels { + pub writer_receiver: WriterReceiver, + pub merger_sender: MergerSender, + pub document_sender: DocumentSender, +} + +pub struct KeyValueEntry { + pub key_length: u16, + pub data: Box<[u8]>, +} + +impl KeyValueEntry { + pub fn entry(&self) -> (&[u8], &[u8]) { + self.data.split_at(self.key_length as usize) + } +} + +pub struct DocumentEntry { + docid: DocumentId, + content: Box<[u8]>, +} + +impl DocumentEntry { + pub fn new_uncompressed(docid: DocumentId, content: Box) -> Self { + DocumentEntry { docid, content: content.into() } + } + + pub fn new_compressed(docid: DocumentId, content: Box<[u8]>) -> Self { + DocumentEntry { docid, content } + } + + pub fn entry(&self) -> ([u8; 4], &[u8]) { + let docid = self.docid.to_be_bytes(); + (docid, &self.content) + } +} + +pub enum WriterOperation { + WordDocIds(KeyValueEntry), + Document(DocumentEntry), +} + +impl WriterOperation { + pub fn database(&self, index: &Index) -> heed::Database { + match self { + WriterOperation::WordDocIds(_) => index.word_docids.remap_types(), + WriterOperation::Document(_) => index.documents.remap_types(), + } + } +} + +pub struct WriterReceiver(Receiver); + +impl WriterReceiver { + pub fn recv(&self) -> StdResult { + self.0.recv() + } +} + +pub struct MergerSender(Sender); + +#[derive(Clone)] +pub struct DocumentSender(Sender); + +impl DocumentSender { + pub fn send(&self, document: DocumentEntry) -> StdResult<(), SendError> { + match self.0.send(WriterOperation::Document(document)) { + Ok(()) => Ok(()), + Err(SendError(wop)) => match wop { + WriterOperation::Document(entry) => Err(SendError(entry)), + _ => unreachable!(), + }, + } + } +} diff --git a/milli/src/update/new/mod.rs b/milli/src/update/new/mod.rs index 20266267d..726153c53 100644 --- a/milli/src/update/new/mod.rs +++ b/milli/src/update/new/mod.rs @@ -1,9 +1,12 @@ mod document_change; // mod extract; +mod channel; mod items_pool; mod global_fields_ids_map; +pub type StdResult = std::result::Result; + mod indexer { use std::borrow::Cow; use std::collections::{BTreeMap, HashMap}; @@ -352,11 +355,13 @@ mod indexer { pub struct UpdateByFunctionIndexer; + // fn + /// Reads the previous version of a document from the database, the new versions /// in the grenad update files and merges them to generate a new boxed obkv. /// /// This function is only meant to be used when doing an update and not a replacement. - pub fn merge_document_for_updates( + fn merge_document_for_updates( rtxn: &RoTxn, index: &Index, fields_ids_map: &FieldsIdsMap, @@ -431,7 +436,7 @@ mod indexer { /// Returns only the most recent version of a document based on the updates from the payloads. /// /// This function is only meant to be used when doing a replacement and not an update. - pub fn merge_document_for_replacements( + fn merge_document_for_replacements( rtxn: &RoTxn, index: &Index, fields_ids_map: &FieldsIdsMap,