Introduce DatabaseDocument type

This commit is contained in:
ManyTheFish 2025-06-25 14:37:24 +02:00
parent 576f2c64cd
commit 93ffb90275
7 changed files with 133 additions and 41 deletions

View File

@ -14,16 +14,11 @@ use crate::vector::EmbeddingConfigs;
use crate::{DocumentId, Index, InternalError, Result};
pub enum DocumentChange<'doc> {
Deletion(Deletion<'doc>),
Deletion(DatabaseDocument<'doc>),
Update(Update<'doc>),
Insertion(Insertion<'doc>),
}
pub struct Deletion<'doc> {
docid: DocumentId,
external_document_id: &'doc str,
}
pub struct Update<'doc> {
docid: DocumentId,
external_document_id: &'doc str,
@ -37,6 +32,11 @@ pub struct Insertion<'doc> {
new: Versions<'doc>,
}
pub struct DatabaseDocument<'doc> {
docid: DocumentId,
external_document_id: &'doc str,
}
impl<'doc> DocumentChange<'doc> {
pub fn docid(&self) -> DocumentId {
match &self {
@ -55,31 +55,6 @@ impl<'doc> DocumentChange<'doc> {
}
}
impl<'doc> Deletion<'doc> {
pub fn create(docid: DocumentId, external_document_id: &'doc str) -> Self {
Self { docid, external_document_id }
}
pub fn docid(&self) -> DocumentId {
self.docid
}
pub fn external_document_id(&self) -> &'doc str {
self.external_document_id
}
pub fn current<'a, Mapper: FieldIdMapper>(
&self,
rtxn: &'a RoTxn,
index: &'a Index,
mapper: &'a Mapper,
) -> Result<DocumentFromDb<'a, Mapper>> {
Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or(
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
)?)
}
}
impl<'doc> Insertion<'doc> {
pub fn create(docid: DocumentId, external_document_id: &'doc str, new: Versions<'doc>) -> Self {
Insertion { docid, external_document_id, new }
@ -304,3 +279,40 @@ impl<'doc> Update<'doc> {
}
}
}
impl<'doc> DatabaseDocument<'doc> {
pub fn create(docid: DocumentId, external_document_id: &'doc str) -> Self {
Self { docid, external_document_id }
}
pub fn docid(&self) -> DocumentId {
self.docid
}
pub fn external_document_id(&self) -> &'doc str {
self.external_document_id
}
pub fn current<'a, Mapper: FieldIdMapper>(
&self,
rtxn: &'a RoTxn,
index: &'a Index,
mapper: &'a Mapper,
) -> Result<DocumentFromDb<'a, Mapper>> {
Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or(
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
)?)
}
pub fn current_vectors<'a, Mapper: FieldIdMapper>(
&self,
rtxn: &'a RoTxn,
index: &'a Index,
mapper: &'a Mapper,
doc_alloc: &'a Bump,
) -> Result<VectorDocumentFromDb<'a>> {
Ok(VectorDocumentFromDb::new(self.docid, index, rtxn, mapper, doc_alloc)?.ok_or(
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
)?)
}
}

View File

@ -7,7 +7,7 @@ use roaring::RoaringBitmap;
use super::document_changes::{DocumentChangeContext, DocumentChanges};
use crate::documents::PrimaryKey;
use crate::update::new::thread_local::MostlySend;
use crate::update::new::{Deletion, DocumentChange};
use crate::update::new::{DatabaseDocument, DocumentChange};
use crate::{DocumentId, Result};
#[derive(Default)]
@ -74,7 +74,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
let external_document_id = external_document_id.to_bump(&context.doc_alloc);
Ok(Some(DocumentChange::Deletion(Deletion::create(*docid, external_document_id))))
Ok(Some(DocumentChange::Deletion(DatabaseDocument::create(*docid, external_document_id))))
}
fn len(&self) -> usize {

View File

@ -19,7 +19,7 @@ use crate::progress::{AtomicPayloadStep, Progress};
use crate::update::new::document::Versions;
use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::MostlySend;
use crate::update::new::{Deletion, Insertion, Update};
use crate::update::new::{DatabaseDocument, Insertion, Update};
use crate::update::{AvailableIds, IndexDocumentsMethod};
use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError};
@ -577,7 +577,7 @@ impl<'pl> PayloadOperations<'pl> {
if self.is_new {
Ok(None)
} else {
let deletion = Deletion::create(self.docid, external_doc);
let deletion = DatabaseDocument::create(self.docid, external_doc);
Ok(Some(DocumentChange::Deletion(deletion)))
}
}

View File

@ -18,6 +18,7 @@ use crate::index::IndexEmbeddingConfig;
use crate::progress::MergingWordCache;
use crate::proximity::ProximityPrecision;
use crate::update::new::extract::EmbeddingExtractor;
use crate::update::new::indexer::settings_changes::DatabaseDocuments;
use crate::update::new::merger::merge_and_send_rtree;
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
use crate::update::settings::SettingsDelta;
@ -332,6 +333,13 @@ where
MSP: Fn() -> bool + Sync,
SD: SettingsDelta,
{
// Create the list of document ids to extract
let rtxn = indexing_context.index.read_txn()?;
let all_document_ids =
indexing_context.index.documents_ids(&rtxn)?.into_iter().collect::<Vec<_>>();
let primary_key =
primary_key_from_db(&indexing_context.index, &rtxn, &indexing_context.db_fields_ids_map)?;
let documents = DatabaseDocuments::new(&all_document_ids, primary_key);
indexing_context.progress.update_progress(IndexingStep::WaitingForDatabaseWrites);
finished_extraction.store(true, std::sync::atomic::Ordering::Relaxed);
@ -339,6 +347,24 @@ where
Result::Ok(index_embeddings)
}
fn primary_key_from_db<'indexer, 'index>(
index: &'indexer Index,
rtxn: &'indexer heed::RoTxn<'index>,
fields: &'indexer impl FieldIdMapper,
) -> Result<PrimaryKey<'indexer>> {
let Some(primary_key) = index.primary_key(rtxn)? else {
return Err(InternalError::DatabaseMissingEntry {
db_name: crate::index::db_name::MAIN,
key: Some(crate::index::main_key::PRIMARY_KEY_KEY),
}
.into());
};
let Some(primary_key) = PrimaryKey::new(primary_key, fields) else {
unreachable!("Primary key must exist at this point");
};
Ok(primary_key)
}
fn request_threads() -> &'static ThreadPoolNoAbort {
static REQUEST_THREADS: OnceLock<ThreadPoolNoAbort> = OnceLock::new();

View File

@ -0,0 +1,55 @@
use std::sync::atomic::Ordering;
use std::sync::Arc;
use bumpalo::Bump;
use rayon::iter::IndexedParallelIterator;
use rayon::slice::ParallelSlice;
use super::document_changes::IndexingContext;
use crate::documents::PrimaryKey;
use crate::progress::AtomicDocumentStep;
use crate::update::new::document_change::DatabaseDocument;
use crate::update::new::indexer::document_changes::DocumentChangeContext;
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
use crate::{DocumentId, InternalError, Result};
pub struct DatabaseDocuments<'indexer> {
documents: &'indexer [DocumentId],
primary_key: PrimaryKey<'indexer>,
}
impl<'indexer> DatabaseDocuments<'indexer> {
pub fn new(documents: &'indexer [DocumentId], primary_key: PrimaryKey<'indexer>) -> Self {
Self { documents, primary_key }
}
fn iter(&self, chunk_size: usize) -> impl IndexedParallelIterator<Item = &[DocumentId]> {
self.documents.par_chunks(chunk_size)
}
fn item_to_database_document<
'doc, // lifetime of a single `process` call
T: MostlySend,
>(
&'doc self,
context: &'doc DocumentChangeContext<T>,
docid: &'doc DocumentId,
) -> Result<Option<DatabaseDocument<'doc>>> {
let current = context.index.document(&context.rtxn, *docid)?;
let external_document_id = self.primary_key.extract_docid_from_db(
current,
&context.db_fields_ids_map,
&context.doc_alloc,
)?;
let external_document_id = external_document_id.to_bump(&context.doc_alloc);
Ok(Some(DatabaseDocument::create(*docid, external_document_id)))
}
fn len(&self) -> usize {
self.documents.len()
}
}

View File

@ -13,7 +13,7 @@ use crate::error::{FieldIdMapMissingEntry, InternalError};
use crate::update::new::document::Versions;
use crate::update::new::ref_cell_ext::RefCellExt as _;
use crate::update::new::thread_local::MostlySend;
use crate::update::new::{Deletion, DocumentChange, KvReaderFieldId, Update};
use crate::update::new::{DatabaseDocument, DocumentChange, KvReaderFieldId, Update};
use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError};
pub struct UpdateByFunction {
@ -128,10 +128,9 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
match scope.remove::<Dynamic>("doc") {
// If the "doc" variable has been set to (), we effectively delete the document.
Some(doc) if doc.is_unit() => Ok(Some(DocumentChange::Deletion(Deletion::create(
docid,
doc_alloc.alloc_str(&document_id),
)))),
Some(doc) if doc.is_unit() => Ok(Some(DocumentChange::Deletion(
DatabaseDocument::create(docid, doc_alloc.alloc_str(&document_id)),
))),
None => unreachable!("missing doc variable from the Rhai scope"),
Some(new_document) => match new_document.try_cast() {
Some(new_rhai_document) => {

View File

@ -1,4 +1,4 @@
pub use document_change::{Deletion, DocumentChange, Insertion, Update};
pub use document_change::{DatabaseDocument, DocumentChange, Insertion, Update};
pub use indexer::ChannelCongestion;
pub use merger::{
merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases, FacetFieldIdsDelta,