diff --git a/crates/milli/src/update/new/document_change.rs b/crates/milli/src/update/new/document_change.rs index 8a8ac4bb3..2ff96fd24 100644 --- a/crates/milli/src/update/new/document_change.rs +++ b/crates/milli/src/update/new/document_change.rs @@ -14,16 +14,11 @@ use crate::vector::EmbeddingConfigs; use crate::{DocumentId, Index, InternalError, Result}; pub enum DocumentChange<'doc> { - Deletion(Deletion<'doc>), + Deletion(DatabaseDocument<'doc>), Update(Update<'doc>), Insertion(Insertion<'doc>), } -pub struct Deletion<'doc> { - docid: DocumentId, - external_document_id: &'doc str, -} - pub struct Update<'doc> { docid: DocumentId, external_document_id: &'doc str, @@ -37,6 +32,11 @@ pub struct Insertion<'doc> { new: Versions<'doc>, } +pub struct DatabaseDocument<'doc> { + docid: DocumentId, + external_document_id: &'doc str, +} + impl<'doc> DocumentChange<'doc> { pub fn docid(&self) -> DocumentId { match &self { @@ -55,31 +55,6 @@ impl<'doc> DocumentChange<'doc> { } } -impl<'doc> Deletion<'doc> { - pub fn create(docid: DocumentId, external_document_id: &'doc str) -> Self { - Self { docid, external_document_id } - } - - pub fn docid(&self) -> DocumentId { - self.docid - } - - pub fn external_document_id(&self) -> &'doc str { - self.external_document_id - } - - pub fn current<'a, Mapper: FieldIdMapper>( - &self, - rtxn: &'a RoTxn, - index: &'a Index, - mapper: &'a Mapper, - ) -> Result> { - Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or( - crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid }, - )?) - } -} - impl<'doc> Insertion<'doc> { pub fn create(docid: DocumentId, external_document_id: &'doc str, new: Versions<'doc>) -> Self { Insertion { docid, external_document_id, new } @@ -304,3 +279,40 @@ impl<'doc> Update<'doc> { } } } + +impl<'doc> DatabaseDocument<'doc> { + pub fn create(docid: DocumentId, external_document_id: &'doc str) -> Self { + Self { docid, external_document_id } + } + + pub fn docid(&self) -> DocumentId { + self.docid + } + + pub fn external_document_id(&self) -> &'doc str { + self.external_document_id + } + + pub fn current<'a, Mapper: FieldIdMapper>( + &self, + rtxn: &'a RoTxn, + index: &'a Index, + mapper: &'a Mapper, + ) -> Result> { + Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or( + crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid }, + )?) + } + + pub fn current_vectors<'a, Mapper: FieldIdMapper>( + &self, + rtxn: &'a RoTxn, + index: &'a Index, + mapper: &'a Mapper, + doc_alloc: &'a Bump, + ) -> Result> { + Ok(VectorDocumentFromDb::new(self.docid, index, rtxn, mapper, doc_alloc)?.ok_or( + crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid }, + )?) + } +} diff --git a/crates/milli/src/update/new/indexer/document_deletion.rs b/crates/milli/src/update/new/indexer/document_deletion.rs index c4a72a2a1..114ce0a69 100644 --- a/crates/milli/src/update/new/indexer/document_deletion.rs +++ b/crates/milli/src/update/new/indexer/document_deletion.rs @@ -7,7 +7,7 @@ use roaring::RoaringBitmap; use super::document_changes::{DocumentChangeContext, DocumentChanges}; use crate::documents::PrimaryKey; use crate::update::new::thread_local::MostlySend; -use crate::update::new::{Deletion, DocumentChange}; +use crate::update::new::{DatabaseDocument, DocumentChange}; use crate::{DocumentId, Result}; #[derive(Default)] @@ -74,7 +74,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> { let external_document_id = external_document_id.to_bump(&context.doc_alloc); - Ok(Some(DocumentChange::Deletion(Deletion::create(*docid, external_document_id)))) + Ok(Some(DocumentChange::Deletion(DatabaseDocument::create(*docid, external_document_id)))) } fn len(&self) -> usize { diff --git a/crates/milli/src/update/new/indexer/document_operation.rs b/crates/milli/src/update/new/indexer/document_operation.rs index ca433c043..70dc5f35c 100644 --- a/crates/milli/src/update/new/indexer/document_operation.rs +++ b/crates/milli/src/update/new/indexer/document_operation.rs @@ -19,7 +19,7 @@ use crate::progress::{AtomicPayloadStep, Progress}; use crate::update::new::document::Versions; use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::MostlySend; -use crate::update::new::{Deletion, Insertion, Update}; +use crate::update::new::{DatabaseDocument, Insertion, Update}; use crate::update::{AvailableIds, IndexDocumentsMethod}; use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError}; @@ -577,7 +577,7 @@ impl<'pl> PayloadOperations<'pl> { if self.is_new { Ok(None) } else { - let deletion = Deletion::create(self.docid, external_doc); + let deletion = DatabaseDocument::create(self.docid, external_doc); Ok(Some(DocumentChange::Deletion(deletion))) } } diff --git a/crates/milli/src/update/new/indexer/extract.rs b/crates/milli/src/update/new/indexer/extract.rs index bbba9cfe8..328a1e25f 100644 --- a/crates/milli/src/update/new/indexer/extract.rs +++ b/crates/milli/src/update/new/indexer/extract.rs @@ -18,6 +18,7 @@ use crate::index::IndexEmbeddingConfig; use crate::progress::MergingWordCache; use crate::proximity::ProximityPrecision; use crate::update::new::extract::EmbeddingExtractor; +use crate::update::new::indexer::settings_changes::DatabaseDocuments; use crate::update::new::merger::merge_and_send_rtree; use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases}; use crate::update::settings::SettingsDelta; @@ -332,6 +333,13 @@ where MSP: Fn() -> bool + Sync, SD: SettingsDelta, { + // Create the list of document ids to extract + let rtxn = indexing_context.index.read_txn()?; + let all_document_ids = + indexing_context.index.documents_ids(&rtxn)?.into_iter().collect::>(); + let primary_key = + primary_key_from_db(&indexing_context.index, &rtxn, &indexing_context.db_fields_ids_map)?; + let documents = DatabaseDocuments::new(&all_document_ids, primary_key); indexing_context.progress.update_progress(IndexingStep::WaitingForDatabaseWrites); finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed); @@ -339,6 +347,24 @@ where Result::Ok(index_embeddings) } +fn primary_key_from_db<'indexer, 'index>( + index: &'indexer Index, + rtxn: &'indexer heed::RoTxn<'index>, + fields: &'indexer impl FieldIdMapper, +) -> Result> { + let Some(primary_key) = index.primary_key(rtxn)? else { + return Err(InternalError::DatabaseMissingEntry { + db_name: crate::index::db_name::MAIN, + key: Some(crate::index::main_key::PRIMARY_KEY_KEY), + } + .into()); + }; + let Some(primary_key) = PrimaryKey::new(primary_key, fields) else { + unreachable!("Primary key must exist at this point"); + }; + Ok(primary_key) +} + fn request_threads() -> &'static ThreadPoolNoAbort { static REQUEST_THREADS: OnceLock = OnceLock::new(); diff --git a/crates/milli/src/update/new/indexer/settings_changes.rs b/crates/milli/src/update/new/indexer/settings_changes.rs new file mode 100644 index 000000000..2e3d9c917 --- /dev/null +++ b/crates/milli/src/update/new/indexer/settings_changes.rs @@ -0,0 +1,55 @@ +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use bumpalo::Bump; +use rayon::iter::IndexedParallelIterator; +use rayon::slice::ParallelSlice; + +use super::document_changes::IndexingContext; +use crate::documents::PrimaryKey; +use crate::progress::AtomicDocumentStep; +use crate::update::new::document_change::DatabaseDocument; +use crate::update::new::indexer::document_changes::DocumentChangeContext; +use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; +use crate::update::new::steps::IndexingStep; +use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; +use crate::{DocumentId, InternalError, Result}; +pub struct DatabaseDocuments<'indexer> { + documents: &'indexer [DocumentId], + primary_key: PrimaryKey<'indexer>, +} + +impl<'indexer> DatabaseDocuments<'indexer> { + pub fn new(documents: &'indexer [DocumentId], primary_key: PrimaryKey<'indexer>) -> Self { + Self { documents, primary_key } + } + + fn iter(&self, chunk_size: usize) -> impl IndexedParallelIterator { + self.documents.par_chunks(chunk_size) + } + + fn item_to_database_document< + 'doc, // lifetime of a single `process` call + T: MostlySend, + >( + &'doc self, + context: &'doc DocumentChangeContext, + docid: &'doc DocumentId, + ) -> Result>> { + let current = context.index.document(&context.rtxn, *docid)?; + + let external_document_id = self.primary_key.extract_docid_from_db( + current, + &context.db_fields_ids_map, + &context.doc_alloc, + )?; + + let external_document_id = external_document_id.to_bump(&context.doc_alloc); + + Ok(Some(DatabaseDocument::create(*docid, external_document_id))) + } + + fn len(&self) -> usize { + self.documents.len() + } +} diff --git a/crates/milli/src/update/new/indexer/update_by_function.rs b/crates/milli/src/update/new/indexer/update_by_function.rs index 3001648e6..694645d28 100644 --- a/crates/milli/src/update/new/indexer/update_by_function.rs +++ b/crates/milli/src/update/new/indexer/update_by_function.rs @@ -13,7 +13,7 @@ use crate::error::{FieldIdMapMissingEntry, InternalError}; use crate::update::new::document::Versions; use crate::update::new::ref_cell_ext::RefCellExt as _; use crate::update::new::thread_local::MostlySend; -use crate::update::new::{Deletion, DocumentChange, KvReaderFieldId, Update}; +use crate::update::new::{DatabaseDocument, DocumentChange, KvReaderFieldId, Update}; use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError}; pub struct UpdateByFunction { @@ -128,10 +128,9 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> { match scope.remove::("doc") { // If the "doc" variable has been set to (), we effectively delete the document. - Some(doc) if doc.is_unit() => Ok(Some(DocumentChange::Deletion(Deletion::create( - docid, - doc_alloc.alloc_str(&document_id), - )))), + Some(doc) if doc.is_unit() => Ok(Some(DocumentChange::Deletion( + DatabaseDocument::create(docid, doc_alloc.alloc_str(&document_id)), + ))), None => unreachable!("missing doc variable from the Rhai scope"), Some(new_document) => match new_document.try_cast() { Some(new_rhai_document) => { diff --git a/crates/milli/src/update/new/mod.rs b/crates/milli/src/update/new/mod.rs index 81ff93e54..e3adc5bde 100644 --- a/crates/milli/src/update/new/mod.rs +++ b/crates/milli/src/update/new/mod.rs @@ -1,4 +1,4 @@ -pub use document_change::{Deletion, DocumentChange, Insertion, Update}; +pub use document_change::{DatabaseDocument, DocumentChange, Insertion, Update}; pub use indexer::ChannelCongestion; pub use merger::{ merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases, FacetFieldIdsDelta,