First channels types

This commit is contained in:
Clément Renault 2024-08-29 15:07:59 +02:00
parent e6ffa4d454
commit 874c1ac538
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
3 changed files with 101 additions and 3 deletions

2
Cargo.lock generated
View File

@ -3858,7 +3858,7 @@ checksum = "a2e27bcfe835a379d32352112f6b8dbae2d99d16a5fff42abe6e5ba5386c1e5a"
[[package]]
name = "obkv"
version = "0.3.0"
source = "git+https://github.com/kerollmops/obkv?branch=unsized-kvreader#d248eb7edd3453ff758afc2883f6ae25684eb69e"
source = "git+https://github.com/kerollmops/obkv?branch=unsized-kvreader#5289a6658cd471f4212c1edc1a40b2a3c3d11fe0"
[[package]]
name = "once_cell"

View File

@ -0,0 +1,93 @@
use crossbeam_channel::{Receiver, RecvError, SendError, Sender};
use heed::types::Bytes;
use super::indexer::KvReaderFieldId;
use super::StdResult;
use crate::{DocumentId, Index};
/// The capacity of the channel is currently in number of messages.
pub fn merge_writer_channel(cap: usize) -> WriterChannels {
let (sender, receiver) = crossbeam_channel::bounded(cap);
WriterChannels {
writer_receiver: WriterReceiver(receiver),
merger_sender: MergerSender(sender.clone()),
document_sender: DocumentSender(sender),
}
}
pub struct WriterChannels {
pub writer_receiver: WriterReceiver,
pub merger_sender: MergerSender,
pub document_sender: DocumentSender,
}
pub struct KeyValueEntry {
pub key_length: u16,
pub data: Box<[u8]>,
}
impl KeyValueEntry {
pub fn entry(&self) -> (&[u8], &[u8]) {
self.data.split_at(self.key_length as usize)
}
}
pub struct DocumentEntry {
docid: DocumentId,
content: Box<[u8]>,
}
impl DocumentEntry {
pub fn new_uncompressed(docid: DocumentId, content: Box<KvReaderFieldId>) -> Self {
DocumentEntry { docid, content: content.into() }
}
pub fn new_compressed(docid: DocumentId, content: Box<[u8]>) -> Self {
DocumentEntry { docid, content }
}
pub fn entry(&self) -> ([u8; 4], &[u8]) {
let docid = self.docid.to_be_bytes();
(docid, &self.content)
}
}
pub enum WriterOperation {
WordDocIds(KeyValueEntry),
Document(DocumentEntry),
}
impl WriterOperation {
pub fn database(&self, index: &Index) -> heed::Database<Bytes, Bytes> {
match self {
WriterOperation::WordDocIds(_) => index.word_docids.remap_types(),
WriterOperation::Document(_) => index.documents.remap_types(),
}
}
}
pub struct WriterReceiver(Receiver<WriterOperation>);
impl WriterReceiver {
pub fn recv(&self) -> StdResult<WriterOperation, RecvError> {
self.0.recv()
}
}
pub struct MergerSender(Sender<WriterOperation>);
#[derive(Clone)]
pub struct DocumentSender(Sender<WriterOperation>);
impl DocumentSender {
pub fn send(&self, document: DocumentEntry) -> StdResult<(), SendError<DocumentEntry>> {
match self.0.send(WriterOperation::Document(document)) {
Ok(()) => Ok(()),
Err(SendError(wop)) => match wop {
WriterOperation::Document(entry) => Err(SendError(entry)),
_ => unreachable!(),
},
}
}
}

View File

@ -1,9 +1,12 @@
mod document_change;
// mod extract;
mod channel;
mod items_pool;
mod global_fields_ids_map;
pub type StdResult<T, E> = std::result::Result<T, E>;
mod indexer {
use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
@ -352,11 +355,13 @@ mod indexer {
pub struct UpdateByFunctionIndexer;
// fn
/// Reads the previous version of a document from the database, the new versions
/// in the grenad update files and merges them to generate a new boxed obkv.
///
/// This function is only meant to be used when doing an update and not a replacement.
pub fn merge_document_for_updates(
fn merge_document_for_updates(
rtxn: &RoTxn,
index: &Index,
fields_ids_map: &FieldsIdsMap,
@ -431,7 +436,7 @@ mod indexer {
/// Returns only the most recent version of a document based on the updates from the payloads.
///
/// This function is only meant to be used when doing a replacement and not an update.
pub fn merge_document_for_replacements(
fn merge_document_for_replacements(
rtxn: &RoTxn,
index: &Index,
fields_ids_map: &FieldsIdsMap,