Renaming the indexers

This commit is contained in:
Clément Renault 2024-09-02 14:42:27 +02:00
parent 6526ce1208
commit 72e7b7846e
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
6 changed files with 35 additions and 39 deletions

View File

@ -8,11 +8,11 @@ use crate::documents::PrimaryKey;
use crate::update::new::{Deletion, DocumentChange, ItemsPool}; use crate::update::new::{Deletion, DocumentChange, ItemsPool};
use crate::{FieldsIdsMap, Index, InternalError, Result}; use crate::{FieldsIdsMap, Index, InternalError, Result};
pub struct DocumentDeletionIndexer { pub struct DocumentDeletion {
pub to_delete: RoaringBitmap, pub to_delete: RoaringBitmap,
} }
impl DocumentDeletionIndexer { impl DocumentDeletion {
pub fn new() -> Self { pub fn new() -> Self {
Self { to_delete: Default::default() } Self { to_delete: Default::default() }
} }
@ -22,7 +22,7 @@ impl DocumentDeletionIndexer {
} }
} }
impl<'p> Indexer<'p> for DocumentDeletionIndexer { impl<'p> Indexer<'p> for DocumentDeletion {
type Parameter = (&'p Index, &'p FieldsIdsMap, &'p PrimaryKey<'p>); type Parameter = (&'p Index, &'p FieldsIdsMap, &'p PrimaryKey<'p>);
fn document_changes( fn document_changes(

View File

@ -19,9 +19,9 @@ use crate::update::new::{Deletion, Insertion, KvReaderFieldId, KvWriterFieldId,
use crate::update::{AvailableIds, IndexDocumentsMethod}; use crate::update::{AvailableIds, IndexDocumentsMethod};
use crate::{DocumentId, Error, FieldsIdsMap, Index, Result, UserError}; use crate::{DocumentId, Error, FieldsIdsMap, Index, Result, UserError};
pub struct DocumentOperationIndexer { pub struct DocumentOperation {
pub(crate) operations: Vec<Payload>, operations: Vec<Payload>,
pub(crate) index_documents_method: IndexDocumentsMethod, index_documents_method: IndexDocumentsMethod,
} }
pub enum Payload { pub enum Payload {
@ -34,7 +34,7 @@ pub struct PayloadStats {
pub bytes: u64, pub bytes: u64,
} }
pub enum DocumentOperation { enum InnerDocOp {
Addition(DocumentOffset), Addition(DocumentOffset),
Deletion, Deletion,
} }
@ -48,7 +48,7 @@ pub struct DocumentOffset {
pub offset: u32, pub offset: u32,
} }
impl DocumentOperationIndexer { impl DocumentOperation {
pub fn new(method: IndexDocumentsMethod) -> Self { pub fn new(method: IndexDocumentsMethod) -> Self {
Self { operations: Default::default(), index_documents_method: method } Self { operations: Default::default(), index_documents_method: method }
} }
@ -70,7 +70,7 @@ impl DocumentOperationIndexer {
} }
} }
impl<'p> Indexer<'p> for DocumentOperationIndexer { impl<'p> Indexer<'p> for DocumentOperation {
type Parameter = (&'p Index, &'p RoTxn<'static>, &'p mut FieldsIdsMap, &'p PrimaryKey<'p>); type Parameter = (&'p Index, &'p RoTxn<'static>, &'p mut FieldsIdsMap, &'p PrimaryKey<'p>);
fn document_changes( fn document_changes(
@ -120,7 +120,7 @@ impl<'p> Indexer<'p> for DocumentOperationIndexer {
let content = content.clone(); let content = content.clone();
let document_offset = DocumentOffset { content, offset }; let document_offset = DocumentOffset { content, offset };
let document_operation = DocumentOperation::Addition(document_offset); let document_operation = InnerDocOp::Addition(document_offset);
match docids_version_offsets.get_mut(&external_document_id) { match docids_version_offsets.get_mut(&external_document_id) {
None => { None => {
@ -160,10 +160,10 @@ impl<'p> Indexer<'p> for DocumentOperationIndexer {
docids_version_offsets.insert( docids_version_offsets.insert(
external_document_id, external_document_id,
(docid, vec![DocumentOperation::Deletion]), (docid, vec![InnerDocOp::Deletion]),
); );
} }
Some((_, offsets)) => offsets.push(DocumentOperation::Deletion), Some((_, offsets)) => offsets.push(InnerDocOp::Deletion),
} }
} }
} }
@ -204,7 +204,7 @@ fn merge_document_for_updates(
fields_ids_map: &FieldsIdsMap, fields_ids_map: &FieldsIdsMap,
docid: DocumentId, docid: DocumentId,
external_docid: String, external_docid: String,
operations: &[DocumentOperation], operations: &[InnerDocOp],
) -> Result<Option<DocumentChange>> { ) -> Result<Option<DocumentChange>> {
let mut document = BTreeMap::<_, Cow<_>>::new(); let mut document = BTreeMap::<_, Cow<_>>::new();
let current = index.documents.remap_data_type::<Bytes>().get(rtxn, &docid)?; let current = index.documents.remap_data_type::<Bytes>().get(rtxn, &docid)?;
@ -226,14 +226,12 @@ fn merge_document_for_updates(
}); });
} }
} }
if operations.is_empty() { if operations.is_empty() {
match current { match current {
Some(current) => { Some(current) => {
return Ok(Some(DocumentChange::Deletion(Deletion::create( let deletion = Deletion::create(docid, external_docid, current.boxed());
docid, return Ok(Some(DocumentChange::Deletion(deletion)));
external_docid,
current.boxed(),
))));
} }
None => return Ok(None), None => return Ok(None),
} }
@ -241,8 +239,8 @@ fn merge_document_for_updates(
for operation in operations { for operation in operations {
let DocumentOffset { content, offset } = match operation { let DocumentOffset { content, offset } = match operation {
DocumentOperation::Addition(offset) => offset, InnerDocOp::Addition(offset) => offset,
DocumentOperation::Deletion => { InnerDocOp::Deletion => {
unreachable!("Deletion in document operations") unreachable!("Deletion in document operations")
} }
}; };
@ -283,13 +281,13 @@ fn merge_document_for_replacements(
fields_ids_map: &FieldsIdsMap, fields_ids_map: &FieldsIdsMap,
docid: DocumentId, docid: DocumentId,
external_docid: String, external_docid: String,
operations: &[DocumentOperation], operations: &[InnerDocOp],
) -> Result<Option<DocumentChange>> { ) -> Result<Option<DocumentChange>> {
let current = index.documents.remap_data_type::<Bytes>().get(rtxn, &docid)?; let current = index.documents.remap_data_type::<Bytes>().get(rtxn, &docid)?;
let current: Option<&KvReaderFieldId> = current.map(Into::into); let current: Option<&KvReaderFieldId> = current.map(Into::into);
match operations.last() { match operations.last() {
Some(DocumentOperation::Addition(DocumentOffset { content, offset })) => { Some(InnerDocOp::Addition(DocumentOffset { content, offset })) => {
let reader = DocumentsBatchReader::from_reader(Cursor::new(content.as_ref()))?; let reader = DocumentsBatchReader::from_reader(Cursor::new(content.as_ref()))?;
let (mut cursor, batch_index) = reader.into_cursor_and_fields_index(); let (mut cursor, batch_index) = reader.into_cursor_and_fields_index();
let update = cursor.get(*offset)?.expect("must exists"); let update = cursor.get(*offset)?.expect("must exists");
@ -318,13 +316,13 @@ fn merge_document_for_replacements(
} }
} }
} }
Some(DocumentOperation::Deletion) => match current { Some(InnerDocOp::Deletion) => match current {
Some(current) => { Some(current) => {
let deletion = Deletion::create(docid, external_docid, current.boxed()); let deletion = Deletion::create(docid, external_docid, current.boxed());
Ok(Some(DocumentChange::Deletion(deletion))) Ok(Some(DocumentChange::Deletion(deletion)))
} }
None => Ok(None), None => Ok(None),
}, },
None => Ok(None), None => Ok(None), // but it's strange
} }
} }

View File

@ -1,13 +1,13 @@
use std::thread; use std::thread;
use big_s::S; use big_s::S;
pub use document_deletion::DocumentDeletionIndexer; pub use document_deletion::DocumentDeletion;
pub use document_operation::DocumentOperationIndexer; pub use document_operation::DocumentOperation;
use heed::RwTxn; use heed::RwTxn;
pub use partial_dump::PartialDumpIndexer; pub use partial_dump::PartialDump;
use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::iter::{IntoParallelIterator, ParallelIterator};
use rayon::ThreadPool; use rayon::ThreadPool;
pub use update_by_function::UpdateByFunctionIndexer; pub use update_by_function::UpdateByFunction;
use super::channel::{ use super::channel::{
extractors_merger_channels, merger_writer_channels, EntryOperation, ExtractorsMergerChannels, extractors_merger_channels, merger_writer_channels, EntryOperation, ExtractorsMergerChannels,

View File

@ -6,17 +6,17 @@ use crate::update::concurrent_available_ids::ConcurrentAvailableIds;
use crate::update::new::{DocumentChange, Insertion, KvWriterFieldId}; use crate::update::new::{DocumentChange, Insertion, KvWriterFieldId};
use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError}; use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError};
pub struct PartialDumpIndexer<I> { pub struct PartialDump<I> {
pub iter: I, pub iter: I,
} }
impl<I> PartialDumpIndexer<I> { impl<I> PartialDump<I> {
pub fn new_from_jsonlines(iter: I) -> Self { pub fn new_from_jsonlines(iter: I) -> Self {
PartialDumpIndexer { iter } PartialDump { iter }
} }
} }
impl<'p, I> Indexer<'p> for PartialDumpIndexer<I> impl<'p, I> Indexer<'p> for PartialDump<I>
where where
I: IntoIterator<Item = Object>, I: IntoIterator<Item = Object>,
I::IntoIter: Send + 'p, I::IntoIter: Send + 'p,
@ -45,6 +45,7 @@ where
let key = fields_ids_map.id(key).unwrap(); let key = fields_ids_map.id(key).unwrap();
/// TODO better error management /// TODO better error management
let value = serde_json::to_vec(&value).unwrap(); let value = serde_json::to_vec(&value).unwrap();
/// TODO it is not ordered
writer.insert(key, value).unwrap(); writer.insert(key, value).unwrap();
}); });

View File

@ -4,15 +4,16 @@ use super::Indexer;
use crate::update::new::DocumentChange; use crate::update::new::DocumentChange;
use crate::Result; use crate::Result;
pub struct UpdateByFunctionIndexer; pub struct UpdateByFunction;
impl<'p> Indexer<'p> for UpdateByFunctionIndexer { impl<'p> Indexer<'p> for UpdateByFunction {
type Parameter = (); type Parameter = ();
fn document_changes( fn document_changes(
self, self,
_param: Self::Parameter, _param: Self::Parameter,
) -> Result<impl ParallelIterator<Item = Result<Option<DocumentChange>>> + 'p> { ) -> Result<impl ParallelIterator<Item = Result<Option<DocumentChange>>> + 'p> {
todo!();
Ok(vec![].into_par_iter()) Ok(vec![].into_par_iter())
} }
} }

View File

@ -1,8 +1,4 @@
pub use document_change::{Deletion, DocumentChange, Insertion, Update}; pub use document_change::{Deletion, DocumentChange, Insertion, Update};
pub use indexer::{
index, DocumentDeletionIndexer, DocumentOperationIndexer, PartialDumpIndexer,
UpdateByFunctionIndexer,
};
pub use items_pool::ItemsPool; pub use items_pool::ItemsPool;
use super::del_add::DelAdd; use super::del_add::DelAdd;
@ -13,7 +9,7 @@ mod merger;
// mod extract; // mod extract;
// mod global_fields_ids_map; // mod global_fields_ids_map;
mod channel; mod channel;
mod indexer; pub mod indexer;
mod items_pool; mod items_pool;
/// TODO move them elsewhere /// TODO move them elsewhere