From 7a204609fea7577eb5ea73dd4791516afe73b1eb Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Mon, 30 Jun 2025 14:21:46 +0200 Subject: [PATCH] Move document context and identifiers in document.rs --- crates/milli/src/update/new/document.rs | 136 +++++++++++++++++- .../milli/src/update/new/document_change.rs | 45 +----- .../milli/src/update/new/extract/documents.rs | 10 +- .../new/extract/faceted/extract_facets.rs | 3 +- .../milli/src/update/new/extract/geo/mod.rs | 4 +- .../extract/searchable/extract_word_docids.rs | 3 +- .../extract_word_pair_proximity_docids.rs | 4 +- .../src/update/new/extract/vectors/mod.rs | 6 +- .../update/new/indexer/document_changes.rs | 84 +---------- .../update/new/indexer/document_deletion.rs | 15 +- .../update/new/indexer/document_operation.rs | 8 +- .../milli/src/update/new/indexer/extract.rs | 4 +- .../src/update/new/indexer/partial_dump.rs | 4 +- .../update/new/indexer/settings_changes.rs | 15 +- .../update/new/indexer/update_by_function.rs | 7 +- crates/milli/src/update/new/mod.rs | 3 +- 16 files changed, 182 insertions(+), 169 deletions(-) diff --git a/crates/milli/src/update/new/document.rs b/crates/milli/src/update/new/document.rs index c7156c120..b07cc0298 100644 --- a/crates/milli/src/update/new/document.rs +++ b/crates/milli/src/update/new/document.rs @@ -1,7 +1,10 @@ +use std::cell::{Cell, RefCell}; use std::collections::{BTreeMap, BTreeSet}; +use std::sync::RwLock; +use bumpalo::Bump; use bumparaw_collections::RawMap; -use heed::RoTxn; +use heed::{RoTxn, WithoutTls}; use rustc_hash::FxBuildHasher; use serde_json::value::RawValue; @@ -9,8 +12,13 @@ use super::vector_document::VectorDocument; use super::{KvReaderFieldId, KvWriterFieldId}; use crate::constants::{RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME}; use crate::documents::FieldIdMapper; +use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; +use crate::update::new::vector_document::VectorDocumentFromDb; use crate::vector::settings::EmbedderAction; -use crate::{DocumentId, GlobalFieldsIdsMap, Index, InternalError, Result, UserError}; +use crate::{ + DocumentId, FieldIdMapWithMetadata, FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, + Result, UserError, +}; /// A view into a document that can represent either the current version from the DB, /// the update data from payload or other means, or the merged updated version. @@ -460,3 +468,127 @@ impl<'doc> Versions<'doc> { self.data.get(k) } } + +pub struct DocumentIdentifiers<'doc> { + docid: DocumentId, + external_document_id: &'doc str, +} + +impl<'doc> DocumentIdentifiers<'doc> { + pub fn create(docid: DocumentId, external_document_id: &'doc str) -> Self { + Self { docid, external_document_id } + } + + pub fn docid(&self) -> DocumentId { + self.docid + } + + pub fn external_document_id(&self) -> &'doc str { + self.external_document_id + } + + pub fn current<'a, Mapper: FieldIdMapper>( + &self, + rtxn: &'a RoTxn, + index: &'a Index, + mapper: &'a Mapper, + ) -> Result> { + Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or( + crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid }, + )?) + } + + pub fn current_vectors<'a, Mapper: FieldIdMapper>( + &self, + rtxn: &'a RoTxn, + index: &'a Index, + mapper: &'a Mapper, + doc_alloc: &'a Bump, + ) -> Result> { + Ok(VectorDocumentFromDb::new(self.docid, index, rtxn, mapper, doc_alloc)?.ok_or( + crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid }, + )?) + } +} + +pub struct DocumentContext< + 'doc, // covariant lifetime of a single `process` call + 'extractor: 'doc, // invariant lifetime of the extractor_allocs + 'fid: 'doc, // invariant lifetime of the new_fields_ids_map + 'indexer: 'doc, // covariant lifetime of objects that outlive a single `process` call + T: MostlySend, +> { + /// The index we're indexing in + pub index: &'indexer Index, + /// The fields ids map as it was at the start of this indexing process. Contains at least all top-level fields from documents + /// inside of the DB. + pub db_fields_ids_map: &'indexer FieldsIdsMap, + /// A transaction providing data from the DB before all indexing operations + pub rtxn: RoTxn<'indexer, WithoutTls>, + + /// Global field id map that is up to date with the current state of the indexing process. + /// + /// - Inserting a field will take a lock + /// - Retrieving a field may take a lock as well + pub new_fields_ids_map: &'doc std::cell::RefCell>, + + /// Data allocated in this allocator is cleared between each call to `process`. + pub doc_alloc: Bump, + + /// Data allocated in this allocator is not cleared between each call to `process`, unless the data spills. + pub extractor_alloc: &'extractor Bump, + + /// Pool of doc allocators, used to retrieve the doc allocator we provided for the documents + pub doc_allocs: &'doc ThreadLocal>>, + + /// Extractor-specific data + pub data: &'doc T, +} + +impl< + 'doc, // covariant lifetime of a single `process` call + 'data: 'doc, // invariant on T lifetime of the datastore + 'extractor: 'doc, // invariant lifetime of extractor_allocs + 'fid: 'doc, // invariant lifetime of fields ids map + 'indexer: 'doc, // covariant lifetime of objects that survive a `process` call + T: MostlySend, + > DocumentContext<'doc, 'extractor, 'fid, 'indexer, T> +{ + #[allow(clippy::too_many_arguments)] + pub fn new( + index: &'indexer Index, + db_fields_ids_map: &'indexer FieldsIdsMap, + new_fields_ids_map: &'fid RwLock, + extractor_allocs: &'extractor ThreadLocal>, + doc_allocs: &'doc ThreadLocal>>, + datastore: &'data ThreadLocal, + fields_ids_map_store: &'doc ThreadLocal>>>, + init_data: F, + ) -> Result + where + F: FnOnce(&'extractor Bump) -> Result, + { + let doc_alloc = + doc_allocs.get_or(|| FullySend(Cell::new(Bump::with_capacity(1024 * 1024)))); + let doc_alloc = doc_alloc.0.take(); + let fields_ids_map = fields_ids_map_store + .get_or(|| RefCell::new(GlobalFieldsIdsMap::new(new_fields_ids_map)).into()); + + let fields_ids_map = &fields_ids_map.0; + let extractor_alloc = extractor_allocs.get_or_default(); + + let data = datastore.get_or_try(move || init_data(&extractor_alloc.0))?; + + let txn = index.read_txn()?; + Ok(DocumentContext { + index, + rtxn: txn, + db_fields_ids_map, + new_fields_ids_map: fields_ids_map, + doc_alloc, + extractor_alloc: &extractor_alloc.0, + data, + doc_allocs, + }) + } +} diff --git a/crates/milli/src/update/new/document_change.rs b/crates/milli/src/update/new/document_change.rs index 2ff96fd24..2b9161319 100644 --- a/crates/milli/src/update/new/document_change.rs +++ b/crates/milli/src/update/new/document_change.rs @@ -10,11 +10,12 @@ use super::vector_document::{ }; use crate::attribute_patterns::PatternMatch; use crate::documents::FieldIdMapper; +use crate::update::new::document::DocumentIdentifiers; use crate::vector::EmbeddingConfigs; use crate::{DocumentId, Index, InternalError, Result}; pub enum DocumentChange<'doc> { - Deletion(DatabaseDocument<'doc>), + Deletion(DocumentIdentifiers<'doc>), Update(Update<'doc>), Insertion(Insertion<'doc>), } @@ -32,11 +33,6 @@ pub struct Insertion<'doc> { new: Versions<'doc>, } -pub struct DatabaseDocument<'doc> { - docid: DocumentId, - external_document_id: &'doc str, -} - impl<'doc> DocumentChange<'doc> { pub fn docid(&self) -> DocumentId { match &self { @@ -279,40 +275,3 @@ impl<'doc> Update<'doc> { } } } - -impl<'doc> DatabaseDocument<'doc> { - pub fn create(docid: DocumentId, external_document_id: &'doc str) -> Self { - Self { docid, external_document_id } - } - - pub fn docid(&self) -> DocumentId { - self.docid - } - - pub fn external_document_id(&self) -> &'doc str { - self.external_document_id - } - - pub fn current<'a, Mapper: FieldIdMapper>( - &self, - rtxn: &'a RoTxn, - index: &'a Index, - mapper: &'a Mapper, - ) -> Result> { - Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or( - crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid }, - )?) - } - - pub fn current_vectors<'a, Mapper: FieldIdMapper>( - &self, - rtxn: &'a RoTxn, - index: &'a Index, - mapper: &'a Mapper, - doc_alloc: &'a Bump, - ) -> Result> { - Ok(VectorDocumentFromDb::new(self.docid, index, rtxn, mapper, doc_alloc)?.ok_or( - crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid }, - )?) - } -} diff --git a/crates/milli/src/update/new/extract/documents.rs b/crates/milli/src/update/new/extract/documents.rs index 4d9a72715..5c1a1927a 100644 --- a/crates/milli/src/update/new/extract/documents.rs +++ b/crates/milli/src/update/new/extract/documents.rs @@ -8,10 +8,10 @@ use super::DelAddRoaringBitmap; use crate::constants::RESERVED_GEO_FIELD_NAME; use crate::update::new::channel::{DocumentsSender, ExtractorBbqueueSender}; use crate::update::new::document::{write_to_obkv, Document}; -use crate::update::new::document_change::DatabaseDocument; -use crate::update::new::indexer::document_changes::{DocumentContext, Extractor, IndexingContext}; +use crate::update::new::document::{DocumentContext, DocumentIdentifiers}; +use crate::update::new::indexer::document_changes::{Extractor, IndexingContext}; use crate::update::new::indexer::settings_changes::{ - settings_change_extract, DatabaseDocuments, SettingsChangeExtractor, + settings_change_extract, DocumentsIndentifiers, SettingsChangeExtractor, }; use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::thread_local::{FullySend, ThreadLocal}; @@ -194,7 +194,7 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentE fn process<'doc>( &self, - documents: impl Iterator>>, + documents: impl Iterator>>, context: &DocumentContext, ) -> Result<()> { let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc); @@ -242,7 +242,7 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentE /// and then updates the database. #[tracing::instrument(level = "trace", skip_all, target = "indexing::documents::extract")] pub fn update_database_documents<'indexer, 'extractor, MSP, SD>( - documents: &'indexer DatabaseDocuments<'indexer>, + documents: &'indexer DocumentsIndentifiers<'indexer>, indexing_context: IndexingContext, extractor_sender: &ExtractorBbqueueSender, settings_delta: &SD, diff --git a/crates/milli/src/update/new/extract/faceted/extract_facets.rs b/crates/milli/src/update/new/extract/faceted/extract_facets.rs index 39e68fb6d..6e9ae7ee4 100644 --- a/crates/milli/src/update/new/extract/faceted/extract_facets.rs +++ b/crates/milli/src/update/new/extract/faceted/extract_facets.rs @@ -15,9 +15,10 @@ use crate::filterable_attributes_rules::match_faceted_field; use crate::heed_codec::facet::OrderedF64Codec; use crate::update::del_add::DelAdd; use crate::update::new::channel::FieldIdDocidFacetSender; +use crate::update::new::document::DocumentContext; use crate::update::new::extract::perm_json_p; use crate::update::new::indexer::document_changes::{ - extract, DocumentContext, DocumentChanges, Extractor, IndexingContext, + extract, DocumentChanges, Extractor, IndexingContext, }; use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::steps::IndexingStep; diff --git a/crates/milli/src/update/new/extract/geo/mod.rs b/crates/milli/src/update/new/extract/geo/mod.rs index 927434ff6..8e164b48f 100644 --- a/crates/milli/src/update/new/extract/geo/mod.rs +++ b/crates/milli/src/update/new/extract/geo/mod.rs @@ -10,8 +10,8 @@ use serde_json::value::RawValue; use serde_json::Value; use crate::error::GeoError; -use crate::update::new::document::Document; -use crate::update::new::indexer::document_changes::{DocumentContext, Extractor}; +use crate::update::new::document::{Document, DocumentContext}; +use crate::update::new::indexer::document_changes::Extractor; use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::thread_local::MostlySend; use crate::update::new::DocumentChange; diff --git a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs index 35bc9f063..5daf34ca4 100644 --- a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs +++ b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs @@ -8,10 +8,11 @@ use bumpalo::Bump; use super::match_searchable_field; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; +use crate::update::new::document::DocumentContext; use crate::update::new::extract::cache::BalancedCaches; use crate::update::new::extract::perm_json_p::contained_in; use crate::update::new::indexer::document_changes::{ - extract, DocumentContext, DocumentChanges, Extractor, IndexingContext, + extract, DocumentChanges, Extractor, IndexingContext, }; use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::steps::IndexingStep; diff --git a/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs b/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs index dffde06c7..c9acb9734 100644 --- a/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs +++ b/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs @@ -7,10 +7,10 @@ use bumpalo::Bump; use super::match_searchable_field; use super::tokenize_document::{tokenizer_builder, DocumentTokenizer}; use crate::proximity::{index_proximity, MAX_DISTANCE}; -use crate::update::new::document::Document; +use crate::update::new::document::{Document, DocumentContext}; use crate::update::new::extract::cache::BalancedCaches; use crate::update::new::indexer::document_changes::{ - extract, DocumentContext, DocumentChanges, Extractor, IndexingContext, + extract, DocumentChanges, Extractor, IndexingContext, }; use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::steps::IndexingStep; diff --git a/crates/milli/src/update/new/extract/vectors/mod.rs b/crates/milli/src/update/new/extract/vectors/mod.rs index edb68b6db..6d5052ac8 100644 --- a/crates/milli/src/update/new/extract/vectors/mod.rs +++ b/crates/milli/src/update/new/extract/vectors/mod.rs @@ -10,8 +10,8 @@ use crate::error::FaultSource; use crate::progress::EmbedderStats; use crate::prompt::Prompt; use crate::update::new::channel::EmbeddingSender; -use crate::update::new::document_change::DatabaseDocument; -use crate::update::new::indexer::document_changes::{DocumentContext, Extractor}; +use crate::update::new::document::{DocumentContext, DocumentIdentifiers}; +use crate::update::new::indexer::document_changes::Extractor; use crate::update::new::indexer::settings_changes::SettingsChangeExtractor; use crate::update::new::thread_local::MostlySend; use crate::update::new::vector_document::VectorDocument; @@ -343,7 +343,7 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeEmbedding fn process<'doc>( &'doc self, - documents: impl Iterator>>, + documents: impl Iterator>>, context: &'doc DocumentContext, ) -> crate::Result<()> { let embedders = self.embedders.inner_as_ref(); diff --git a/crates/milli/src/update/new/indexer/document_changes.rs b/crates/milli/src/update/new/indexer/document_changes.rs index 3069ab29b..c88751ee3 100644 --- a/crates/milli/src/update/new/indexer/document_changes.rs +++ b/crates/milli/src/update/new/indexer/document_changes.rs @@ -3,100 +3,18 @@ use std::sync::atomic::Ordering; use std::sync::{Arc, RwLock}; use bumpalo::Bump; -use heed::{RoTxn, WithoutTls}; use rayon::iter::IndexedParallelIterator; use super::super::document_change::DocumentChange; use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; use crate::progress::{AtomicDocumentStep, Progress}; +use crate::update::new::document::DocumentContext; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; use crate::update::GrenadParameters; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result}; -pub struct DocumentContext< - 'doc, // covariant lifetime of a single `process` call - 'extractor: 'doc, // invariant lifetime of the extractor_allocs - 'fid: 'doc, // invariant lifetime of the new_fields_ids_map - 'indexer: 'doc, // covariant lifetime of objects that outlive a single `process` call - T: MostlySend, -> { - /// The index we're indexing in - pub index: &'indexer Index, - /// The fields ids map as it was at the start of this indexing process. Contains at least all top-level fields from documents - /// inside of the DB. - pub db_fields_ids_map: &'indexer FieldsIdsMap, - /// A transaction providing data from the DB before all indexing operations - pub rtxn: RoTxn<'indexer, WithoutTls>, - - /// Global field id map that is up to date with the current state of the indexing process. - /// - /// - Inserting a field will take a lock - /// - Retrieving a field may take a lock as well - pub new_fields_ids_map: &'doc std::cell::RefCell>, - - /// Data allocated in this allocator is cleared between each call to `process`. - pub doc_alloc: Bump, - - /// Data allocated in this allocator is not cleared between each call to `process`, unless the data spills. - pub extractor_alloc: &'extractor Bump, - - /// Pool of doc allocators, used to retrieve the doc allocator we provided for the documents - pub doc_allocs: &'doc ThreadLocal>>, - - /// Extractor-specific data - pub data: &'doc T, -} - -impl< - 'doc, // covariant lifetime of a single `process` call - 'data: 'doc, // invariant on T lifetime of the datastore - 'extractor: 'doc, // invariant lifetime of extractor_allocs - 'fid: 'doc, // invariant lifetime of fields ids map - 'indexer: 'doc, // covariant lifetime of objects that survive a `process` call - T: MostlySend, - > DocumentContext<'doc, 'extractor, 'fid, 'indexer, T> -{ - #[allow(clippy::too_many_arguments)] - pub fn new( - index: &'indexer Index, - db_fields_ids_map: &'indexer FieldsIdsMap, - new_fields_ids_map: &'fid RwLock, - extractor_allocs: &'extractor ThreadLocal>, - doc_allocs: &'doc ThreadLocal>>, - datastore: &'data ThreadLocal, - fields_ids_map_store: &'doc ThreadLocal>>>, - init_data: F, - ) -> Result - where - F: FnOnce(&'extractor Bump) -> Result, - { - let doc_alloc = - doc_allocs.get_or(|| FullySend(Cell::new(Bump::with_capacity(1024 * 1024)))); - let doc_alloc = doc_alloc.0.take(); - let fields_ids_map = fields_ids_map_store - .get_or(|| RefCell::new(GlobalFieldsIdsMap::new(new_fields_ids_map)).into()); - - let fields_ids_map = &fields_ids_map.0; - let extractor_alloc = extractor_allocs.get_or_default(); - - let data = datastore.get_or_try(move || init_data(&extractor_alloc.0))?; - - let txn = index.read_txn()?; - Ok(DocumentContext { - index, - rtxn: txn, - db_fields_ids_map, - new_fields_ids_map: fields_ids_map, - doc_alloc, - extractor_alloc: &extractor_alloc.0, - data, - doc_allocs, - }) - } -} - /// An internal iterator (i.e. using `foreach`) of `DocumentChange`s pub trait Extractor<'extractor>: Sync { type Data: MostlySend; diff --git a/crates/milli/src/update/new/indexer/document_deletion.rs b/crates/milli/src/update/new/indexer/document_deletion.rs index 292cdc36e..157e20bb0 100644 --- a/crates/milli/src/update/new/indexer/document_deletion.rs +++ b/crates/milli/src/update/new/indexer/document_deletion.rs @@ -4,10 +4,11 @@ use rayon::iter::IndexedParallelIterator; use rayon::slice::ParallelSlice as _; use roaring::RoaringBitmap; -use super::document_changes::{DocumentContext, DocumentChanges}; +use super::document_changes::DocumentChanges; use crate::documents::PrimaryKey; +use crate::update::new::document::DocumentContext; use crate::update::new::thread_local::MostlySend; -use crate::update::new::{DatabaseDocument, DocumentChange}; +use crate::update::new::{DocumentChange, DocumentIdentifiers}; use crate::{DocumentId, Result}; #[derive(Default)] @@ -74,7 +75,10 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> { let external_document_id = external_document_id.to_bump(&context.doc_alloc); - Ok(Some(DocumentChange::Deletion(DatabaseDocument::create(*docid, external_document_id)))) + Ok(Some(DocumentChange::Deletion(DocumentIdentifiers::create( + *docid, + external_document_id, + )))) } fn len(&self) -> usize { @@ -93,9 +97,8 @@ mod test { use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; use crate::index::tests::TempIndex; use crate::progress::Progress; - use crate::update::new::indexer::document_changes::{ - extract, DocumentContext, Extractor, IndexingContext, - }; + use crate::update::new::document::DocumentContext; + use crate::update::new::indexer::document_changes::{extract, Extractor, IndexingContext}; use crate::update::new::indexer::DocumentDeletion; use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::{MostlySend, ThreadLocal}; diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index 4bcfb2d47..98faaf145 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -12,14 +12,14 @@ use serde_json::value::RawValue; use serde_json::Deserializer; use super::super::document_change::DocumentChange; -use super::document_changes::{DocumentContext, DocumentChanges}; +use super::document_changes::DocumentChanges; use super::guess_primary_key::retrieve_or_guess_primary_key; use crate::documents::PrimaryKey; use crate::progress::{AtomicPayloadStep, Progress}; -use crate::update::new::document::Versions; +use crate::update::new::document::{DocumentContext, Versions}; use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::MostlySend; -use crate::update::new::{DatabaseDocument, Insertion, Update}; +use crate::update::new::{DocumentIdentifiers, Insertion, Update}; use crate::update::{AvailableIds, IndexDocumentsMethod}; use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError}; @@ -577,7 +577,7 @@ impl<'pl> PayloadOperations<'pl> { if self.is_new { Ok(None) } else { - let deletion = DatabaseDocument::create(self.docid, external_doc); + let deletion = DocumentIdentifiers::create(self.docid, external_doc); Ok(Some(DocumentChange::Deletion(deletion))) } } diff --git a/crates/milli/src/update/new/indexer/extract.rs b/crates/milli/src/update/new/indexer/extract.rs index 2986d5d57..bb275d8aa 100644 --- a/crates/milli/src/update/new/indexer/extract.rs +++ b/crates/milli/src/update/new/indexer/extract.rs @@ -20,7 +20,7 @@ use crate::progress::EmbedderStats; use crate::progress::MergingWordCache; use crate::proximity::ProximityPrecision; use crate::update::new::extract::EmbeddingExtractor; -use crate::update::new::indexer::settings_changes::DatabaseDocuments; +use crate::update::new::indexer::settings_changes::DocumentsIndentifiers; use crate::update::new::merger::merge_and_send_rtree; use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; use crate::update::settings::SettingsDelta; @@ -345,7 +345,7 @@ where indexing_context.index.documents_ids(&rtxn)?.into_iter().collect::>(); let primary_key = primary_key_from_db(indexing_context.index, &rtxn, &indexing_context.db_fields_ids_map)?; - let documents = DatabaseDocuments::new(&all_document_ids, primary_key); + let documents = DocumentsIndentifiers::new(&all_document_ids, primary_key); let span = tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract"); diff --git a/crates/milli/src/update/new/indexer/partial_dump.rs b/crates/milli/src/update/new/indexer/partial_dump.rs index 614c61353..33e72f532 100644 --- a/crates/milli/src/update/new/indexer/partial_dump.rs +++ b/crates/milli/src/update/new/indexer/partial_dump.rs @@ -5,10 +5,10 @@ use rayon::iter::IndexedParallelIterator; use rustc_hash::FxBuildHasher; use serde_json::value::RawValue; -use super::document_changes::{DocumentContext, DocumentChanges}; +use super::document_changes::DocumentChanges; use crate::documents::PrimaryKey; use crate::update::concurrent_available_ids::ConcurrentAvailableIds; -use crate::update::new::document::Versions; +use crate::update::new::document::{DocumentContext, Versions}; use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::thread_local::MostlySend; use crate::update::new::{DocumentChange, Insertion}; diff --git a/crates/milli/src/update/new/indexer/settings_changes.rs b/crates/milli/src/update/new/indexer/settings_changes.rs index 99e303f16..984ab3a0b 100644 --- a/crates/milli/src/update/new/indexer/settings_changes.rs +++ b/crates/milli/src/update/new/indexer/settings_changes.rs @@ -8,8 +8,7 @@ use rayon::slice::ParallelSlice; use super::document_changes::IndexingContext; use crate::documents::PrimaryKey; use crate::progress::AtomicDocumentStep; -use crate::update::new::document_change::DatabaseDocument; -use crate::update::new::indexer::document_changes::DocumentContext; +use crate::update::new::document::{DocumentContext, DocumentIdentifiers}; use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; @@ -23,16 +22,16 @@ pub trait SettingsChangeExtractor<'extractor>: Sync { fn process<'doc>( &'doc self, - documents: impl Iterator>>, + documents: impl Iterator>>, context: &'doc DocumentContext, ) -> Result<()>; } -pub struct DatabaseDocuments<'indexer> { +pub struct DocumentsIndentifiers<'indexer> { documents: &'indexer [DocumentId], primary_key: PrimaryKey<'indexer>, } -impl<'indexer> DatabaseDocuments<'indexer> { +impl<'indexer> DocumentsIndentifiers<'indexer> { pub fn new(documents: &'indexer [DocumentId], primary_key: PrimaryKey<'indexer>) -> Self { Self { documents, primary_key } } @@ -48,7 +47,7 @@ impl<'indexer> DatabaseDocuments<'indexer> { &'doc self, context: &'doc DocumentContext, docid: &'doc DocumentId, - ) -> Result>> { + ) -> Result>> { let current = context.index.document(&context.rtxn, *docid)?; let external_document_id = self.primary_key.extract_docid_from_db( @@ -59,7 +58,7 @@ impl<'indexer> DatabaseDocuments<'indexer> { let external_document_id = external_document_id.to_bump(&context.doc_alloc); - Ok(Some(DatabaseDocument::create(*docid, external_document_id))) + Ok(Some(DocumentIdentifiers::create(*docid, external_document_id))) } fn len(&self) -> usize { @@ -78,7 +77,7 @@ pub fn settings_change_extract< EX: SettingsChangeExtractor<'extractor>, MSP: Fn() -> bool + Sync, >( - documents: &'indexer DatabaseDocuments<'indexer>, + documents: &'indexer DocumentsIndentifiers<'indexer>, extractor: &EX, IndexingContext { index, diff --git a/crates/milli/src/update/new/indexer/update_by_function.rs b/crates/milli/src/update/new/indexer/update_by_function.rs index b394757d1..daffe42ed 100644 --- a/crates/milli/src/update/new/indexer/update_by_function.rs +++ b/crates/milli/src/update/new/indexer/update_by_function.rs @@ -5,15 +5,14 @@ use rhai::{Dynamic, Engine, OptimizationLevel, Scope, AST}; use roaring::RoaringBitmap; use rustc_hash::FxBuildHasher; -use super::document_changes::DocumentContext; use super::DocumentChanges; use crate::documents::Error::InvalidDocumentFormat; use crate::documents::PrimaryKey; use crate::error::{FieldIdMapMissingEntry, InternalError}; -use crate::update::new::document::Versions; +use crate::update::new::document::{DocumentContext, Versions}; use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::thread_local::MostlySend; -use crate::update::new::{DatabaseDocument, DocumentChange, KvReaderFieldId, Update}; +use crate::update::new::{DocumentChange, DocumentIdentifiers, KvReaderFieldId, Update}; use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError}; pub struct UpdateByFunction { @@ -129,7 +128,7 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> { match scope.remove::("doc") { // If the "doc" variable has been set to (), we effectively delete the document. Some(doc) if doc.is_unit() => Ok(Some(DocumentChange::Deletion( - DatabaseDocument::create(docid, doc_alloc.alloc_str(&document_id)), + DocumentIdentifiers::create(docid, doc_alloc.alloc_str(&document_id)), ))), None => unreachable!("missing doc variable from the Rhai scope"), Some(new_document) => match new_document.try_cast() { diff --git a/crates/milli/src/update/new/mod.rs b/crates/milli/src/update/new/mod.rs index e3adc5bde..ffe27ffda 100644 --- a/crates/milli/src/update/new/mod.rs +++ b/crates/milli/src/update/new/mod.rs @@ -1,4 +1,5 @@ -pub use document_change::{DatabaseDocument, DocumentChange, Insertion, Update}; +pub use document::DocumentIdentifiers; +pub use document_change::{DocumentChange, Insertion, Update}; pub use indexer::ChannelCongestion; pub use merger::{ merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases, FacetFieldIdsDelta,