mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-04 12:27:13 +02:00
Move document context and identifiers in document.rs
This commit is contained in:
parent
6b2b8ed676
commit
7a204609fe
16 changed files with 182 additions and 169 deletions
|
@ -1,7 +1,10 @@
|
|||
use std::cell::{Cell, RefCell};
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::sync::RwLock;
|
||||
|
||||
use bumpalo::Bump;
|
||||
use bumparaw_collections::RawMap;
|
||||
use heed::RoTxn;
|
||||
use heed::{RoTxn, WithoutTls};
|
||||
use rustc_hash::FxBuildHasher;
|
||||
use serde_json::value::RawValue;
|
||||
|
||||
|
@ -9,8 +12,13 @@ use super::vector_document::VectorDocument;
|
|||
use super::{KvReaderFieldId, KvWriterFieldId};
|
||||
use crate::constants::{RESERVED_GEO_FIELD_NAME, RESERVED_VECTORS_FIELD_NAME};
|
||||
use crate::documents::FieldIdMapper;
|
||||
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
|
||||
use crate::update::new::vector_document::VectorDocumentFromDb;
|
||||
use crate::vector::settings::EmbedderAction;
|
||||
use crate::{DocumentId, GlobalFieldsIdsMap, Index, InternalError, Result, UserError};
|
||||
use crate::{
|
||||
DocumentId, FieldIdMapWithMetadata, FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError,
|
||||
Result, UserError,
|
||||
};
|
||||
|
||||
/// A view into a document that can represent either the current version from the DB,
|
||||
/// the update data from payload or other means, or the merged updated version.
|
||||
|
@ -460,3 +468,127 @@ impl<'doc> Versions<'doc> {
|
|||
self.data.get(k)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DocumentIdentifiers<'doc> {
|
||||
docid: DocumentId,
|
||||
external_document_id: &'doc str,
|
||||
}
|
||||
|
||||
impl<'doc> DocumentIdentifiers<'doc> {
|
||||
pub fn create(docid: DocumentId, external_document_id: &'doc str) -> Self {
|
||||
Self { docid, external_document_id }
|
||||
}
|
||||
|
||||
pub fn docid(&self) -> DocumentId {
|
||||
self.docid
|
||||
}
|
||||
|
||||
pub fn external_document_id(&self) -> &'doc str {
|
||||
self.external_document_id
|
||||
}
|
||||
|
||||
pub fn current<'a, Mapper: FieldIdMapper>(
|
||||
&self,
|
||||
rtxn: &'a RoTxn,
|
||||
index: &'a Index,
|
||||
mapper: &'a Mapper,
|
||||
) -> Result<DocumentFromDb<'a, Mapper>> {
|
||||
Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or(
|
||||
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
|
||||
)?)
|
||||
}
|
||||
|
||||
pub fn current_vectors<'a, Mapper: FieldIdMapper>(
|
||||
&self,
|
||||
rtxn: &'a RoTxn,
|
||||
index: &'a Index,
|
||||
mapper: &'a Mapper,
|
||||
doc_alloc: &'a Bump,
|
||||
) -> Result<VectorDocumentFromDb<'a>> {
|
||||
Ok(VectorDocumentFromDb::new(self.docid, index, rtxn, mapper, doc_alloc)?.ok_or(
|
||||
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
|
||||
)?)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DocumentContext<
|
||||
'doc, // covariant lifetime of a single `process` call
|
||||
'extractor: 'doc, // invariant lifetime of the extractor_allocs
|
||||
'fid: 'doc, // invariant lifetime of the new_fields_ids_map
|
||||
'indexer: 'doc, // covariant lifetime of objects that outlive a single `process` call
|
||||
T: MostlySend,
|
||||
> {
|
||||
/// The index we're indexing in
|
||||
pub index: &'indexer Index,
|
||||
/// The fields ids map as it was at the start of this indexing process. Contains at least all top-level fields from documents
|
||||
/// inside of the DB.
|
||||
pub db_fields_ids_map: &'indexer FieldsIdsMap,
|
||||
/// A transaction providing data from the DB before all indexing operations
|
||||
pub rtxn: RoTxn<'indexer, WithoutTls>,
|
||||
|
||||
/// Global field id map that is up to date with the current state of the indexing process.
|
||||
///
|
||||
/// - Inserting a field will take a lock
|
||||
/// - Retrieving a field may take a lock as well
|
||||
pub new_fields_ids_map: &'doc std::cell::RefCell<GlobalFieldsIdsMap<'fid>>,
|
||||
|
||||
/// Data allocated in this allocator is cleared between each call to `process`.
|
||||
pub doc_alloc: Bump,
|
||||
|
||||
/// Data allocated in this allocator is not cleared between each call to `process`, unless the data spills.
|
||||
pub extractor_alloc: &'extractor Bump,
|
||||
|
||||
/// Pool of doc allocators, used to retrieve the doc allocator we provided for the documents
|
||||
pub doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
|
||||
|
||||
/// Extractor-specific data
|
||||
pub data: &'doc T,
|
||||
}
|
||||
|
||||
impl<
|
||||
'doc, // covariant lifetime of a single `process` call
|
||||
'data: 'doc, // invariant on T lifetime of the datastore
|
||||
'extractor: 'doc, // invariant lifetime of extractor_allocs
|
||||
'fid: 'doc, // invariant lifetime of fields ids map
|
||||
'indexer: 'doc, // covariant lifetime of objects that survive a `process` call
|
||||
T: MostlySend,
|
||||
> DocumentContext<'doc, 'extractor, 'fid, 'indexer, T>
|
||||
{
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new<F>(
|
||||
index: &'indexer Index,
|
||||
db_fields_ids_map: &'indexer FieldsIdsMap,
|
||||
new_fields_ids_map: &'fid RwLock<FieldIdMapWithMetadata>,
|
||||
extractor_allocs: &'extractor ThreadLocal<FullySend<Bump>>,
|
||||
doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
|
||||
datastore: &'data ThreadLocal<T>,
|
||||
fields_ids_map_store: &'doc ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>,
|
||||
init_data: F,
|
||||
) -> Result<Self>
|
||||
where
|
||||
F: FnOnce(&'extractor Bump) -> Result<T>,
|
||||
{
|
||||
let doc_alloc =
|
||||
doc_allocs.get_or(|| FullySend(Cell::new(Bump::with_capacity(1024 * 1024))));
|
||||
let doc_alloc = doc_alloc.0.take();
|
||||
let fields_ids_map = fields_ids_map_store
|
||||
.get_or(|| RefCell::new(GlobalFieldsIdsMap::new(new_fields_ids_map)).into());
|
||||
|
||||
let fields_ids_map = &fields_ids_map.0;
|
||||
let extractor_alloc = extractor_allocs.get_or_default();
|
||||
|
||||
let data = datastore.get_or_try(move || init_data(&extractor_alloc.0))?;
|
||||
|
||||
let txn = index.read_txn()?;
|
||||
Ok(DocumentContext {
|
||||
index,
|
||||
rtxn: txn,
|
||||
db_fields_ids_map,
|
||||
new_fields_ids_map: fields_ids_map,
|
||||
doc_alloc,
|
||||
extractor_alloc: &extractor_alloc.0,
|
||||
data,
|
||||
doc_allocs,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,11 +10,12 @@ use super::vector_document::{
|
|||
};
|
||||
use crate::attribute_patterns::PatternMatch;
|
||||
use crate::documents::FieldIdMapper;
|
||||
use crate::update::new::document::DocumentIdentifiers;
|
||||
use crate::vector::EmbeddingConfigs;
|
||||
use crate::{DocumentId, Index, InternalError, Result};
|
||||
|
||||
pub enum DocumentChange<'doc> {
|
||||
Deletion(DatabaseDocument<'doc>),
|
||||
Deletion(DocumentIdentifiers<'doc>),
|
||||
Update(Update<'doc>),
|
||||
Insertion(Insertion<'doc>),
|
||||
}
|
||||
|
@ -32,11 +33,6 @@ pub struct Insertion<'doc> {
|
|||
new: Versions<'doc>,
|
||||
}
|
||||
|
||||
pub struct DatabaseDocument<'doc> {
|
||||
docid: DocumentId,
|
||||
external_document_id: &'doc str,
|
||||
}
|
||||
|
||||
impl<'doc> DocumentChange<'doc> {
|
||||
pub fn docid(&self) -> DocumentId {
|
||||
match &self {
|
||||
|
@ -279,40 +275,3 @@ impl<'doc> Update<'doc> {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'doc> DatabaseDocument<'doc> {
|
||||
pub fn create(docid: DocumentId, external_document_id: &'doc str) -> Self {
|
||||
Self { docid, external_document_id }
|
||||
}
|
||||
|
||||
pub fn docid(&self) -> DocumentId {
|
||||
self.docid
|
||||
}
|
||||
|
||||
pub fn external_document_id(&self) -> &'doc str {
|
||||
self.external_document_id
|
||||
}
|
||||
|
||||
pub fn current<'a, Mapper: FieldIdMapper>(
|
||||
&self,
|
||||
rtxn: &'a RoTxn,
|
||||
index: &'a Index,
|
||||
mapper: &'a Mapper,
|
||||
) -> Result<DocumentFromDb<'a, Mapper>> {
|
||||
Ok(DocumentFromDb::new(self.docid, rtxn, index, mapper)?.ok_or(
|
||||
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
|
||||
)?)
|
||||
}
|
||||
|
||||
pub fn current_vectors<'a, Mapper: FieldIdMapper>(
|
||||
&self,
|
||||
rtxn: &'a RoTxn,
|
||||
index: &'a Index,
|
||||
mapper: &'a Mapper,
|
||||
doc_alloc: &'a Bump,
|
||||
) -> Result<VectorDocumentFromDb<'a>> {
|
||||
Ok(VectorDocumentFromDb::new(self.docid, index, rtxn, mapper, doc_alloc)?.ok_or(
|
||||
crate::error::UserError::UnknownInternalDocumentId { document_id: self.docid },
|
||||
)?)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,10 +8,10 @@ use super::DelAddRoaringBitmap;
|
|||
use crate::constants::RESERVED_GEO_FIELD_NAME;
|
||||
use crate::update::new::channel::{DocumentsSender, ExtractorBbqueueSender};
|
||||
use crate::update::new::document::{write_to_obkv, Document};
|
||||
use crate::update::new::document_change::DatabaseDocument;
|
||||
use crate::update::new::indexer::document_changes::{DocumentContext, Extractor, IndexingContext};
|
||||
use crate::update::new::document::{DocumentContext, DocumentIdentifiers};
|
||||
use crate::update::new::indexer::document_changes::{Extractor, IndexingContext};
|
||||
use crate::update::new::indexer::settings_changes::{
|
||||
settings_change_extract, DatabaseDocuments, SettingsChangeExtractor,
|
||||
settings_change_extract, DocumentsIndentifiers, SettingsChangeExtractor,
|
||||
};
|
||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||
use crate::update::new::thread_local::{FullySend, ThreadLocal};
|
||||
|
@ -194,7 +194,7 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentE
|
|||
|
||||
fn process<'doc>(
|
||||
&self,
|
||||
documents: impl Iterator<Item = Result<DatabaseDocument<'doc>>>,
|
||||
documents: impl Iterator<Item = Result<DocumentIdentifiers<'doc>>>,
|
||||
context: &DocumentContext<Self::Data>,
|
||||
) -> Result<()> {
|
||||
let mut document_buffer = bumpalo::collections::Vec::new_in(&context.doc_alloc);
|
||||
|
@ -242,7 +242,7 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeDocumentE
|
|||
/// and then updates the database.
|
||||
#[tracing::instrument(level = "trace", skip_all, target = "indexing::documents::extract")]
|
||||
pub fn update_database_documents<'indexer, 'extractor, MSP, SD>(
|
||||
documents: &'indexer DatabaseDocuments<'indexer>,
|
||||
documents: &'indexer DocumentsIndentifiers<'indexer>,
|
||||
indexing_context: IndexingContext<MSP>,
|
||||
extractor_sender: &ExtractorBbqueueSender,
|
||||
settings_delta: &SD,
|
||||
|
|
|
@ -15,9 +15,10 @@ use crate::filterable_attributes_rules::match_faceted_field;
|
|||
use crate::heed_codec::facet::OrderedF64Codec;
|
||||
use crate::update::del_add::DelAdd;
|
||||
use crate::update::new::channel::FieldIdDocidFacetSender;
|
||||
use crate::update::new::document::DocumentContext;
|
||||
use crate::update::new::extract::perm_json_p;
|
||||
use crate::update::new::indexer::document_changes::{
|
||||
extract, DocumentContext, DocumentChanges, Extractor, IndexingContext,
|
||||
extract, DocumentChanges, Extractor, IndexingContext,
|
||||
};
|
||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||
use crate::update::new::steps::IndexingStep;
|
||||
|
|
|
@ -10,8 +10,8 @@ use serde_json::value::RawValue;
|
|||
use serde_json::Value;
|
||||
|
||||
use crate::error::GeoError;
|
||||
use crate::update::new::document::Document;
|
||||
use crate::update::new::indexer::document_changes::{DocumentContext, Extractor};
|
||||
use crate::update::new::document::{Document, DocumentContext};
|
||||
use crate::update::new::indexer::document_changes::Extractor;
|
||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||
use crate::update::new::thread_local::MostlySend;
|
||||
use crate::update::new::DocumentChange;
|
||||
|
|
|
@ -8,10 +8,11 @@ use bumpalo::Bump;
|
|||
|
||||
use super::match_searchable_field;
|
||||
use super::tokenize_document::{tokenizer_builder, DocumentTokenizer};
|
||||
use crate::update::new::document::DocumentContext;
|
||||
use crate::update::new::extract::cache::BalancedCaches;
|
||||
use crate::update::new::extract::perm_json_p::contained_in;
|
||||
use crate::update::new::indexer::document_changes::{
|
||||
extract, DocumentContext, DocumentChanges, Extractor, IndexingContext,
|
||||
extract, DocumentChanges, Extractor, IndexingContext,
|
||||
};
|
||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||
use crate::update::new::steps::IndexingStep;
|
||||
|
|
|
@ -7,10 +7,10 @@ use bumpalo::Bump;
|
|||
use super::match_searchable_field;
|
||||
use super::tokenize_document::{tokenizer_builder, DocumentTokenizer};
|
||||
use crate::proximity::{index_proximity, MAX_DISTANCE};
|
||||
use crate::update::new::document::Document;
|
||||
use crate::update::new::document::{Document, DocumentContext};
|
||||
use crate::update::new::extract::cache::BalancedCaches;
|
||||
use crate::update::new::indexer::document_changes::{
|
||||
extract, DocumentContext, DocumentChanges, Extractor, IndexingContext,
|
||||
extract, DocumentChanges, Extractor, IndexingContext,
|
||||
};
|
||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||
use crate::update::new::steps::IndexingStep;
|
||||
|
|
|
@ -10,8 +10,8 @@ use crate::error::FaultSource;
|
|||
use crate::progress::EmbedderStats;
|
||||
use crate::prompt::Prompt;
|
||||
use crate::update::new::channel::EmbeddingSender;
|
||||
use crate::update::new::document_change::DatabaseDocument;
|
||||
use crate::update::new::indexer::document_changes::{DocumentContext, Extractor};
|
||||
use crate::update::new::document::{DocumentContext, DocumentIdentifiers};
|
||||
use crate::update::new::indexer::document_changes::Extractor;
|
||||
use crate::update::new::indexer::settings_changes::SettingsChangeExtractor;
|
||||
use crate::update::new::thread_local::MostlySend;
|
||||
use crate::update::new::vector_document::VectorDocument;
|
||||
|
@ -343,7 +343,7 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeEmbedding
|
|||
|
||||
fn process<'doc>(
|
||||
&'doc self,
|
||||
documents: impl Iterator<Item = crate::Result<DatabaseDocument<'doc>>>,
|
||||
documents: impl Iterator<Item = crate::Result<DocumentIdentifiers<'doc>>>,
|
||||
context: &'doc DocumentContext<Self::Data>,
|
||||
) -> crate::Result<()> {
|
||||
let embedders = self.embedders.inner_as_ref();
|
||||
|
|
|
@ -3,100 +3,18 @@ use std::sync::atomic::Ordering;
|
|||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use bumpalo::Bump;
|
||||
use heed::{RoTxn, WithoutTls};
|
||||
use rayon::iter::IndexedParallelIterator;
|
||||
|
||||
use super::super::document_change::DocumentChange;
|
||||
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
|
||||
use crate::progress::{AtomicDocumentStep, Progress};
|
||||
use crate::update::new::document::DocumentContext;
|
||||
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
|
||||
use crate::update::new::steps::IndexingStep;
|
||||
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
|
||||
use crate::update::GrenadParameters;
|
||||
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result};
|
||||
|
||||
pub struct DocumentContext<
|
||||
'doc, // covariant lifetime of a single `process` call
|
||||
'extractor: 'doc, // invariant lifetime of the extractor_allocs
|
||||
'fid: 'doc, // invariant lifetime of the new_fields_ids_map
|
||||
'indexer: 'doc, // covariant lifetime of objects that outlive a single `process` call
|
||||
T: MostlySend,
|
||||
> {
|
||||
/// The index we're indexing in
|
||||
pub index: &'indexer Index,
|
||||
/// The fields ids map as it was at the start of this indexing process. Contains at least all top-level fields from documents
|
||||
/// inside of the DB.
|
||||
pub db_fields_ids_map: &'indexer FieldsIdsMap,
|
||||
/// A transaction providing data from the DB before all indexing operations
|
||||
pub rtxn: RoTxn<'indexer, WithoutTls>,
|
||||
|
||||
/// Global field id map that is up to date with the current state of the indexing process.
|
||||
///
|
||||
/// - Inserting a field will take a lock
|
||||
/// - Retrieving a field may take a lock as well
|
||||
pub new_fields_ids_map: &'doc std::cell::RefCell<GlobalFieldsIdsMap<'fid>>,
|
||||
|
||||
/// Data allocated in this allocator is cleared between each call to `process`.
|
||||
pub doc_alloc: Bump,
|
||||
|
||||
/// Data allocated in this allocator is not cleared between each call to `process`, unless the data spills.
|
||||
pub extractor_alloc: &'extractor Bump,
|
||||
|
||||
/// Pool of doc allocators, used to retrieve the doc allocator we provided for the documents
|
||||
pub doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
|
||||
|
||||
/// Extractor-specific data
|
||||
pub data: &'doc T,
|
||||
}
|
||||
|
||||
impl<
|
||||
'doc, // covariant lifetime of a single `process` call
|
||||
'data: 'doc, // invariant on T lifetime of the datastore
|
||||
'extractor: 'doc, // invariant lifetime of extractor_allocs
|
||||
'fid: 'doc, // invariant lifetime of fields ids map
|
||||
'indexer: 'doc, // covariant lifetime of objects that survive a `process` call
|
||||
T: MostlySend,
|
||||
> DocumentContext<'doc, 'extractor, 'fid, 'indexer, T>
|
||||
{
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn new<F>(
|
||||
index: &'indexer Index,
|
||||
db_fields_ids_map: &'indexer FieldsIdsMap,
|
||||
new_fields_ids_map: &'fid RwLock<FieldIdMapWithMetadata>,
|
||||
extractor_allocs: &'extractor ThreadLocal<FullySend<Bump>>,
|
||||
doc_allocs: &'doc ThreadLocal<FullySend<Cell<Bump>>>,
|
||||
datastore: &'data ThreadLocal<T>,
|
||||
fields_ids_map_store: &'doc ThreadLocal<FullySend<RefCell<GlobalFieldsIdsMap<'fid>>>>,
|
||||
init_data: F,
|
||||
) -> Result<Self>
|
||||
where
|
||||
F: FnOnce(&'extractor Bump) -> Result<T>,
|
||||
{
|
||||
let doc_alloc =
|
||||
doc_allocs.get_or(|| FullySend(Cell::new(Bump::with_capacity(1024 * 1024))));
|
||||
let doc_alloc = doc_alloc.0.take();
|
||||
let fields_ids_map = fields_ids_map_store
|
||||
.get_or(|| RefCell::new(GlobalFieldsIdsMap::new(new_fields_ids_map)).into());
|
||||
|
||||
let fields_ids_map = &fields_ids_map.0;
|
||||
let extractor_alloc = extractor_allocs.get_or_default();
|
||||
|
||||
let data = datastore.get_or_try(move || init_data(&extractor_alloc.0))?;
|
||||
|
||||
let txn = index.read_txn()?;
|
||||
Ok(DocumentContext {
|
||||
index,
|
||||
rtxn: txn,
|
||||
db_fields_ids_map,
|
||||
new_fields_ids_map: fields_ids_map,
|
||||
doc_alloc,
|
||||
extractor_alloc: &extractor_alloc.0,
|
||||
data,
|
||||
doc_allocs,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// An internal iterator (i.e. using `foreach`) of `DocumentChange`s
|
||||
pub trait Extractor<'extractor>: Sync {
|
||||
type Data: MostlySend;
|
||||
|
|
|
@ -4,10 +4,11 @@ use rayon::iter::IndexedParallelIterator;
|
|||
use rayon::slice::ParallelSlice as _;
|
||||
use roaring::RoaringBitmap;
|
||||
|
||||
use super::document_changes::{DocumentContext, DocumentChanges};
|
||||
use super::document_changes::DocumentChanges;
|
||||
use crate::documents::PrimaryKey;
|
||||
use crate::update::new::document::DocumentContext;
|
||||
use crate::update::new::thread_local::MostlySend;
|
||||
use crate::update::new::{DatabaseDocument, DocumentChange};
|
||||
use crate::update::new::{DocumentChange, DocumentIdentifiers};
|
||||
use crate::{DocumentId, Result};
|
||||
|
||||
#[derive(Default)]
|
||||
|
@ -74,7 +75,10 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
|
|||
|
||||
let external_document_id = external_document_id.to_bump(&context.doc_alloc);
|
||||
|
||||
Ok(Some(DocumentChange::Deletion(DatabaseDocument::create(*docid, external_document_id))))
|
||||
Ok(Some(DocumentChange::Deletion(DocumentIdentifiers::create(
|
||||
*docid,
|
||||
external_document_id,
|
||||
))))
|
||||
}
|
||||
|
||||
fn len(&self) -> usize {
|
||||
|
@ -93,9 +97,8 @@ mod test {
|
|||
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
|
||||
use crate::index::tests::TempIndex;
|
||||
use crate::progress::Progress;
|
||||
use crate::update::new::indexer::document_changes::{
|
||||
extract, DocumentContext, Extractor, IndexingContext,
|
||||
};
|
||||
use crate::update::new::document::DocumentContext;
|
||||
use crate::update::new::indexer::document_changes::{extract, Extractor, IndexingContext};
|
||||
use crate::update::new::indexer::DocumentDeletion;
|
||||
use crate::update::new::steps::IndexingStep;
|
||||
use crate::update::new::thread_local::{MostlySend, ThreadLocal};
|
||||
|
|
|
@ -12,14 +12,14 @@ use serde_json::value::RawValue;
|
|||
use serde_json::Deserializer;
|
||||
|
||||
use super::super::document_change::DocumentChange;
|
||||
use super::document_changes::{DocumentContext, DocumentChanges};
|
||||
use super::document_changes::DocumentChanges;
|
||||
use super::guess_primary_key::retrieve_or_guess_primary_key;
|
||||
use crate::documents::PrimaryKey;
|
||||
use crate::progress::{AtomicPayloadStep, Progress};
|
||||
use crate::update::new::document::Versions;
|
||||
use crate::update::new::document::{DocumentContext, Versions};
|
||||
use crate::update::new::steps::IndexingStep;
|
||||
use crate::update::new::thread_local::MostlySend;
|
||||
use crate::update::new::{DatabaseDocument, Insertion, Update};
|
||||
use crate::update::new::{DocumentIdentifiers, Insertion, Update};
|
||||
use crate::update::{AvailableIds, IndexDocumentsMethod};
|
||||
use crate::{DocumentId, Error, FieldsIdsMap, Index, InternalError, Result, UserError};
|
||||
|
||||
|
@ -577,7 +577,7 @@ impl<'pl> PayloadOperations<'pl> {
|
|||
if self.is_new {
|
||||
Ok(None)
|
||||
} else {
|
||||
let deletion = DatabaseDocument::create(self.docid, external_doc);
|
||||
let deletion = DocumentIdentifiers::create(self.docid, external_doc);
|
||||
Ok(Some(DocumentChange::Deletion(deletion)))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ use crate::progress::EmbedderStats;
|
|||
use crate::progress::MergingWordCache;
|
||||
use crate::proximity::ProximityPrecision;
|
||||
use crate::update::new::extract::EmbeddingExtractor;
|
||||
use crate::update::new::indexer::settings_changes::DatabaseDocuments;
|
||||
use crate::update::new::indexer::settings_changes::DocumentsIndentifiers;
|
||||
use crate::update::new::merger::merge_and_send_rtree;
|
||||
use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
|
||||
use crate::update::settings::SettingsDelta;
|
||||
|
@ -345,7 +345,7 @@ where
|
|||
indexing_context.index.documents_ids(&rtxn)?.into_iter().collect::<Vec<_>>();
|
||||
let primary_key =
|
||||
primary_key_from_db(indexing_context.index, &rtxn, &indexing_context.db_fields_ids_map)?;
|
||||
let documents = DatabaseDocuments::new(&all_document_ids, primary_key);
|
||||
let documents = DocumentsIndentifiers::new(&all_document_ids, primary_key);
|
||||
|
||||
let span =
|
||||
tracing::trace_span!(target: "indexing::documents", parent: &indexer_span, "extract");
|
||||
|
|
|
@ -5,10 +5,10 @@ use rayon::iter::IndexedParallelIterator;
|
|||
use rustc_hash::FxBuildHasher;
|
||||
use serde_json::value::RawValue;
|
||||
|
||||
use super::document_changes::{DocumentContext, DocumentChanges};
|
||||
use super::document_changes::DocumentChanges;
|
||||
use crate::documents::PrimaryKey;
|
||||
use crate::update::concurrent_available_ids::ConcurrentAvailableIds;
|
||||
use crate::update::new::document::Versions;
|
||||
use crate::update::new::document::{DocumentContext, Versions};
|
||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||
use crate::update::new::thread_local::MostlySend;
|
||||
use crate::update::new::{DocumentChange, Insertion};
|
||||
|
|
|
@ -8,8 +8,7 @@ use rayon::slice::ParallelSlice;
|
|||
use super::document_changes::IndexingContext;
|
||||
use crate::documents::PrimaryKey;
|
||||
use crate::progress::AtomicDocumentStep;
|
||||
use crate::update::new::document_change::DatabaseDocument;
|
||||
use crate::update::new::indexer::document_changes::DocumentContext;
|
||||
use crate::update::new::document::{DocumentContext, DocumentIdentifiers};
|
||||
use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _;
|
||||
use crate::update::new::steps::IndexingStep;
|
||||
use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal};
|
||||
|
@ -23,16 +22,16 @@ pub trait SettingsChangeExtractor<'extractor>: Sync {
|
|||
|
||||
fn process<'doc>(
|
||||
&'doc self,
|
||||
documents: impl Iterator<Item = Result<DatabaseDocument<'doc>>>,
|
||||
documents: impl Iterator<Item = Result<DocumentIdentifiers<'doc>>>,
|
||||
context: &'doc DocumentContext<Self::Data>,
|
||||
) -> Result<()>;
|
||||
}
|
||||
pub struct DatabaseDocuments<'indexer> {
|
||||
pub struct DocumentsIndentifiers<'indexer> {
|
||||
documents: &'indexer [DocumentId],
|
||||
primary_key: PrimaryKey<'indexer>,
|
||||
}
|
||||
|
||||
impl<'indexer> DatabaseDocuments<'indexer> {
|
||||
impl<'indexer> DocumentsIndentifiers<'indexer> {
|
||||
pub fn new(documents: &'indexer [DocumentId], primary_key: PrimaryKey<'indexer>) -> Self {
|
||||
Self { documents, primary_key }
|
||||
}
|
||||
|
@ -48,7 +47,7 @@ impl<'indexer> DatabaseDocuments<'indexer> {
|
|||
&'doc self,
|
||||
context: &'doc DocumentContext<T>,
|
||||
docid: &'doc DocumentId,
|
||||
) -> Result<Option<DatabaseDocument<'doc>>> {
|
||||
) -> Result<Option<DocumentIdentifiers<'doc>>> {
|
||||
let current = context.index.document(&context.rtxn, *docid)?;
|
||||
|
||||
let external_document_id = self.primary_key.extract_docid_from_db(
|
||||
|
@ -59,7 +58,7 @@ impl<'indexer> DatabaseDocuments<'indexer> {
|
|||
|
||||
let external_document_id = external_document_id.to_bump(&context.doc_alloc);
|
||||
|
||||
Ok(Some(DatabaseDocument::create(*docid, external_document_id)))
|
||||
Ok(Some(DocumentIdentifiers::create(*docid, external_document_id)))
|
||||
}
|
||||
|
||||
fn len(&self) -> usize {
|
||||
|
@ -78,7 +77,7 @@ pub fn settings_change_extract<
|
|||
EX: SettingsChangeExtractor<'extractor>,
|
||||
MSP: Fn() -> bool + Sync,
|
||||
>(
|
||||
documents: &'indexer DatabaseDocuments<'indexer>,
|
||||
documents: &'indexer DocumentsIndentifiers<'indexer>,
|
||||
extractor: &EX,
|
||||
IndexingContext {
|
||||
index,
|
||||
|
|
|
@ -5,15 +5,14 @@ use rhai::{Dynamic, Engine, OptimizationLevel, Scope, AST};
|
|||
use roaring::RoaringBitmap;
|
||||
use rustc_hash::FxBuildHasher;
|
||||
|
||||
use super::document_changes::DocumentContext;
|
||||
use super::DocumentChanges;
|
||||
use crate::documents::Error::InvalidDocumentFormat;
|
||||
use crate::documents::PrimaryKey;
|
||||
use crate::error::{FieldIdMapMissingEntry, InternalError};
|
||||
use crate::update::new::document::Versions;
|
||||
use crate::update::new::document::{DocumentContext, Versions};
|
||||
use crate::update::new::ref_cell_ext::RefCellExt as _;
|
||||
use crate::update::new::thread_local::MostlySend;
|
||||
use crate::update::new::{DatabaseDocument, DocumentChange, KvReaderFieldId, Update};
|
||||
use crate::update::new::{DocumentChange, DocumentIdentifiers, KvReaderFieldId, Update};
|
||||
use crate::{all_obkv_to_json, Error, FieldsIdsMap, Object, Result, UserError};
|
||||
|
||||
pub struct UpdateByFunction {
|
||||
|
@ -129,7 +128,7 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
|
|||
match scope.remove::<Dynamic>("doc") {
|
||||
// If the "doc" variable has been set to (), we effectively delete the document.
|
||||
Some(doc) if doc.is_unit() => Ok(Some(DocumentChange::Deletion(
|
||||
DatabaseDocument::create(docid, doc_alloc.alloc_str(&document_id)),
|
||||
DocumentIdentifiers::create(docid, doc_alloc.alloc_str(&document_id)),
|
||||
))),
|
||||
None => unreachable!("missing doc variable from the Rhai scope"),
|
||||
Some(new_document) => match new_document.try_cast() {
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
pub use document_change::{DatabaseDocument, DocumentChange, Insertion, Update};
|
||||
pub use document::DocumentIdentifiers;
|
||||
pub use document_change::{DocumentChange, Insertion, Update};
|
||||
pub use indexer::ChannelCongestion;
|
||||
pub use merger::{
|
||||
merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases, FacetFieldIdsDelta,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue