Introduce an indexation abortion function when indexing documents

This commit is contained in:
Kerollmops 2022-10-05 17:41:07 +02:00
parent fad0de4581
commit 6603437cb1
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
14 changed files with 379 additions and 136 deletions

View file

@ -56,6 +56,8 @@ pub enum InternalError {
Store(#[from] MdbError),
#[error(transparent)]
Utf8(#[from] str::Utf8Error),
#[error("An indexation process was explicitly aborted.")]
AbortedIndexation,
}
#[derive(Error, Debug)]

View file

@ -1245,6 +1245,7 @@ pub(crate) mod tests {
&self.indexer_config,
self.index_documents_config.clone(),
|_| (),
|| false,
)
.unwrap();
let (builder, user_error) = builder.add_documents(documents).unwrap();
@ -1281,7 +1282,7 @@ pub(crate) mod tests {
) -> Result<(), crate::error::Error> {
let mut builder = update::Settings::new(wtxn, &self.inner, &self.indexer_config);
update(&mut builder);
builder.execute(drop)?;
builder.execute(drop, || false)?;
Ok(())
}
}

View file

@ -89,7 +89,7 @@ mod test {
let config = IndexerConfig::default();
let mut update = Settings::new(&mut txn, &index, &config);
update.set_distinct_field(distinct.to_string());
update.execute(|_| ()).unwrap();
update.execute(|_| (), || false).unwrap();
// add documents to the index
let config = IndexerConfig::default();
@ -98,7 +98,8 @@ mod test {
..Default::default()
};
let addition =
IndexDocuments::new(&mut txn, &index, &config, indexing_config, |_| ()).unwrap();
IndexDocuments::new(&mut txn, &index, &config, indexing_config, |_| (), || false)
.unwrap();
let reader =
crate::documents::DocumentsBatchReader::from_reader(Cursor::new(JSON.as_slice()))

View file

@ -33,7 +33,7 @@ pub use self::helpers::{
use self::helpers::{grenad_obkv_into_chunks, GrenadParameters};
pub use self::transform::{Transform, TransformOutput};
use crate::documents::{obkv_to_object, DocumentsBatchReader};
use crate::error::UserError;
use crate::error::{Error, InternalError, UserError};
pub use crate::update::index_documents::helpers::CursorClonableMmap;
use crate::update::{
self, Facets, IndexerConfig, UpdateIndexingStep, WordPrefixDocids,
@ -71,13 +71,14 @@ impl Default for IndexDocumentsMethod {
}
}
pub struct IndexDocuments<'t, 'u, 'i, 'a, F> {
pub struct IndexDocuments<'t, 'u, 'i, 'a, FP, FA> {
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
config: IndexDocumentsConfig,
indexer_config: &'a IndexerConfig,
transform: Option<Transform<'a, 'i>>,
progress: F,
progress: FP,
should_abort: FA,
added_documents: u64,
}
@ -93,17 +94,19 @@ pub struct IndexDocumentsConfig {
pub autogenerate_docids: bool,
}
impl<'t, 'u, 'i, 'a, F> IndexDocuments<'t, 'u, 'i, 'a, F>
impl<'t, 'u, 'i, 'a, FP, FA> IndexDocuments<'t, 'u, 'i, 'a, FP, FA>
where
F: Fn(UpdateIndexingStep) + Sync,
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
{
pub fn new(
wtxn: &'t mut heed::RwTxn<'i, 'u>,
index: &'i Index,
indexer_config: &'a IndexerConfig,
config: IndexDocumentsConfig,
progress: F,
) -> Result<IndexDocuments<'t, 'u, 'i, 'a, F>> {
progress: FP,
should_abort: FA,
) -> Result<IndexDocuments<'t, 'u, 'i, 'a, FP, FA>> {
let transform = Some(Transform::new(
wtxn,
&index,
@ -117,6 +120,7 @@ where
config,
indexer_config,
progress,
should_abort,
wtxn,
index,
added_documents: 0,
@ -151,12 +155,13 @@ where
Err(user_error) => return Ok((self, Err(user_error))),
};
let indexed_documents = self
.transform
.as_mut()
.expect("Invalid document addition state")
.read_documents(enriched_documents_reader, self.wtxn, &self.progress)?
as u64;
let indexed_documents =
self.transform.as_mut().expect("Invalid document addition state").read_documents(
enriched_documents_reader,
self.wtxn,
&self.progress,
&self.should_abort,
)? as u64;
self.added_documents += indexed_documents;
@ -200,7 +205,8 @@ where
#[logging_timer::time("IndexDocuments::{}")]
pub fn execute_raw(self, output: TransformOutput) -> Result<u64>
where
F: Fn(UpdateIndexingStep) + Sync,
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
{
let TransformOutput {
primary_key,
@ -355,6 +361,10 @@ where
});
for result in lmdb_writer_rx {
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
let typed_chunk = match result? {
TypedChunk::WordDocids { word_docids_reader, exact_word_docids_reader } => {
let cloneable_chunk = unsafe { as_cloneable_grenad(&word_docids_reader)? };
@ -431,11 +441,16 @@ where
word_position_docids: Option<grenad::Reader<CursorClonableMmap>>,
) -> Result<()>
where
F: Fn(UpdateIndexingStep) + Sync,
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
{
// Merged databases are already been indexed, we start from this count;
let mut databases_seen = MERGED_DATABASE_COUNT;
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
// Run the facets update operation.
let mut builder = Facets::new(self.wtxn, self.index);
builder.chunk_compression_type = self.indexer_config.chunk_compression_type;
@ -454,6 +469,10 @@ where
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
let previous_words_prefixes_fst =
self.index.words_prefixes_fst(self.wtxn)?.map_data(|cow| cow.into_owned())?;
@ -467,6 +486,10 @@ where
}
builder.execute()?;
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
let current_prefix_fst = self.index.words_prefixes_fst(self.wtxn)?;
// We retrieve the common words between the previous and new prefix word fst.
@ -494,6 +517,10 @@ where
total_databases: TOTAL_POSTING_DATABASE_COUNT,
});
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
if let Some(word_docids) = word_docids {
execute_word_prefix_docids(
self.wtxn,
@ -520,6 +547,10 @@ where
)?;
}
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
@ -541,6 +572,10 @@ where
)?;
}
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,
@ -568,6 +603,10 @@ where
)?;
}
if (self.should_abort)() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
databases_seen += 1;
(self.progress)(UpdateIndexingStep::MergeDataIntoFinalDatabase {
databases_seen,

View file

@ -138,15 +138,17 @@ impl<'a, 'i> Transform<'a, 'i> {
})
}
pub fn read_documents<R, F>(
pub fn read_documents<R, FP, FA>(
&mut self,
reader: EnrichedDocumentsBatchReader<R>,
wtxn: &mut heed::RwTxn,
progress_callback: F,
progress_callback: FP,
should_abort: FA,
) -> Result<usize>
where
R: Read + Seek,
F: Fn(UpdateIndexingStep) + Sync,
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
{
let (mut cursor, fields_index) = reader.into_cursor_and_fields_index();
@ -165,6 +167,10 @@ impl<'a, 'i> Transform<'a, 'i> {
while let Some(enriched_document) = cursor.next_enriched_document()? {
let EnrichedDocument { document, document_id } = enriched_document;
if should_abort() {
return Err(Error::InternalError(InternalError::AbortedIndexation));
}
// drop_and_reuse is called instead of .clear() to communicate to the compiler that field_buffer
// does not keep references from the cursor between loop iterations
let mut field_buffer_cache = drop_and_reuse(field_buffer);

View file

@ -266,9 +266,15 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
self.pagination_max_total_hits = Setting::Reset;
}
fn reindex<F>(&mut self, cb: &F, old_fields_ids_map: FieldsIdsMap) -> Result<()>
fn reindex<FP, FA>(
&mut self,
progress_callback: &FP,
should_abort: &FA,
old_fields_ids_map: FieldsIdsMap,
) -> Result<()>
where
F: Fn(UpdateIndexingStep) + Sync,
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
{
let fields_ids_map = self.index.fields_ids_map(self.wtxn)?;
// if the settings are set before any document update, we don't need to do anything, and
@ -305,7 +311,8 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
self.index,
&self.indexer_config,
IndexDocumentsConfig::default(),
&cb,
&progress_callback,
&should_abort,
)?;
indexing_builder.execute_raw(output)?;
@ -660,9 +667,10 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
Ok(())
}
pub fn execute<F>(mut self, progress_callback: F) -> Result<()>
pub fn execute<FP, FA>(mut self, progress_callback: FP, should_abort: FA) -> Result<()>
where
F: Fn(UpdateIndexingStep) + Sync,
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
{
self.index.set_updated_at(self.wtxn, &OffsetDateTime::now_utc())?;
@ -698,7 +706,7 @@ impl<'a, 't, 'u, 'i> Settings<'a, 't, 'u, 'i> {
|| searchable_updated
|| exact_attributes_updated
{
self.reindex(&progress_callback, old_fields_ids_map)?;
self.reindex(&progress_callback, &should_abort, old_fields_ids_map)?;
}
Ok(())