Merge remote-tracking branch 'origin/main' into facet-levels-refactor

This commit is contained in:
Loïc Lecrenier 2022-10-26 14:03:23 +02:00
commit 2741756248
14 changed files with 414 additions and 136 deletions

View file

@ -32,7 +32,7 @@ pub use self::helpers::{
use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
pub use self::transform::{Transform, TransformOutput};
use crate::documents::{obkv_to_object, DocumentsBatchReader};
use crate::error::UserError;
use crate::error::{Error, InternalError, UserError};
pub use crate::update::index_documents::helpers::CursorClonableMmap;
use crate::update::{
self, IndexerConfig, PrefixWordPairsProximityDocids, UpdateIndexingStep, WordPrefixDocids,
@ -70,13 +70,14 @@ impl Default for IndexDocumentsMethod {
}
}
pub struct IndexDocuments<'t, 'u, 'i, 'a, F> {
pub struct IndexDocuments<'t, 'u, 'i, 'a, FP, FA> {
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
config: IndexDocumentsConfig,
indexer_config: &'a IndexerConfig,
transform: Option<Transform<'a, 'i>>,
progress: F,
progress: FP,
should_abort: FA,
added_documents: u64,
}
@ -90,17 +91,19 @@ pub struct IndexDocumentsConfig {
pub autogenerate_docids: bool,
}
impl<'t, 'u, 'i, 'a, F> IndexDocuments<'t, 'u, 'i, 'a, F>
impl<'t, 'u, 'i, 'a, FP, FA> IndexDocuments<'t, 'u, 'i, 'a, FP, FA>
where
F: Fn(UpdateIndexingStep) + Sync,
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
{
pub fn new(
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
indexer_config: &'a IndexerConfig,
config: IndexDocumentsConfig,
progress: F,
) -> Result<IndexDocuments<'t, 'u, 'i, 'a, F>> {
progress: FP,
should_abort: FA,
) -> Result<IndexDocuments<'t, 'u, 'i, 'a, FP, FA>> {
let transform = Some(Transform::new(
wtxn,
index,
@ -114,6 +117,7 @@ where
config,
indexer_config,
progress,
should_abort,
wtxn,
index,
added_documents: 0,
@ -148,12 +152,13 @@ where
Err(user_error) => return Ok((self, Err(user_error))),
};
let indexed_documents = self
.transform
.as_mut()
.expect("Invalid document addition state")
.read_documents(enriched_documents_reader, self.wtxn, &self.progress)?
as u64;
let indexed_documents =
self.transform.as_mut().expect("Invalid document addition state").read_documents(
enriched_documents_reader,
self.wtxn,
&self.progress,
&self.should_abort,
)? as u64;
self.added_documents += indexed_documents;
@ -197,7 +202,8 @@ where
#[logging_timer::time("IndexDocuments::{}")]
pub fn execute_raw(self, output: TransformOutput) -> Result<u64>
where
F: Fn(UpdateIndexingStep) + Sync,
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
{
let TransformOutput {
primary_key,
@ -346,6 +352,10 @@ where
});
for result in lmdb_writer_rx {
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
let typed_chunk = match result? {
TypedChunk::WordDocids { word_docids_reader, exact_word_docids_reader } => {
let cloneable_chunk = unsafe { as_cloneable_grenad(&word_docids_reader)? };
@ -422,17 +432,26 @@ where
word_position_docids: Option<grenad::Reader<CursorClonableMmap>>,
) -> Result<()>
where
F: Fn(UpdateIndexingStep) + Sync,
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
{
// Merged databases are already been indexed, we start from this count;
let mut databases_seen = MERGED_DATABASE_COUNT;
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
let previous_words_prefixes_fst =
self.index.words_prefixes_fst(self.wtxn)?.map_data(|cow| cow.into_owned())?;
@ -446,6 +465,10 @@ where
}
builder.execute()?;
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
let current_prefix_fst = self.index.words_prefixes_fst(self.wtxn)?;
// We retrieve the common words between the previous and new prefix word fst.
@ -473,6 +496,10 @@ where
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
if let Some(word_docids) = word_docids {
execute_word_prefix_docids(
self.wtxn,
@ -499,6 +526,10 @@ where
)?;
}
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
@ -521,6 +552,10 @@ where
)?;
}
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
@ -548,6 +583,10 @@ where
)?;
}
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,