Expose intermediate errors when processing batches

This commit is contained in:
Clément Renault 2024-11-14 16:00:11 +01:00 committed by Louis Dureuil
parent 4ff2b3c2ee
commit 83865d2ebd
No known key found for this signature in database
3 changed files with 409 additions and 216 deletions

View file

@ -39,7 +39,7 @@ use meilisearch_types::milli::update::{IndexDocumentsMethod, Settings as MilliSe
use meilisearch_types::milli::vector::parsed_vectors::{
ExplicitVectors, VectorOrArrayOfVectors, RESERVED_VECTORS_FIELD_NAME,
};
use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
use meilisearch_types::milli::{self, Filter, ThreadPoolNoAbortBuilder};
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
@ -1331,55 +1331,19 @@ impl IndexScheduler {
let mut indexer = indexer::DocumentOperation::new(method);
let embedders = index.embedding_configs(index_wtxn)?;
let embedders = self.embedders(embedders)?;
for (operation, task) in operations.into_iter().zip(tasks.iter_mut()) {
for operation in operations {
match operation {
DocumentOperation::Add(_content_uuid) => {
let mmap = content_files_iter.next().unwrap();
let stats = indexer.add_documents(mmap)?;
indexer.add_documents(mmap)?;
// builder = builder.with_embedders(embedders.clone());
let received_documents =
if let Some(Details::DocumentAdditionOrUpdate {
received_documents,
..
}) = task.details
{
received_documents
} else {
// In the case of a `documentAdditionOrUpdate` the details MUST be set
unreachable!();
};
task.status = Status::Succeeded;
task.details = Some(Details::DocumentAdditionOrUpdate {
received_documents,
indexed_documents: Some(stats.document_count as u64),
})
}
DocumentOperation::Delete(document_ids) => {
let count = document_ids.len();
let document_ids: bumpalo::collections::vec::Vec<_> = document_ids
.iter()
.map(|s| &*indexer_alloc.alloc_str(s))
.collect_in(&indexer_alloc);
indexer.delete_documents(document_ids.into_bump_slice());
// Uses Invariant: remove documents actually always returns Ok for the inner result
// let count = user_result.unwrap();
let provided_ids =
if let Some(Details::DocumentDeletion { provided_ids, .. }) =
task.details
{
provided_ids
} else {
// In the case of a `documentAdditionOrUpdate` the details MUST be set
unreachable!();
};
task.status = Status::Succeeded;
task.details = Some(Details::DocumentDeletion {
provided_ids,
deleted_documents: Some(count as u64),
});
}
}
}
@ -1394,8 +1358,7 @@ impl IndexScheduler {
}
};
// TODO we want to multithread this
let document_changes = indexer.into_changes(
let (document_changes, operation_stats) = indexer.into_changes(
&indexer_alloc,
index,
&rtxn,
@ -1403,6 +1366,36 @@ impl IndexScheduler {
&mut new_fields_ids_map,
)?;
for (stats, task) in operation_stats.into_iter().zip(&mut tasks) {
match stats.error {
Some(error) => {
task.status = Status::Failed;
task.error = Some(milli::Error::UserError(error).into());
}
None => task.status = Status::Succeeded,
}
task.details = match task.details {
Some(Details::DocumentAdditionOrUpdate {
received_documents, ..
}) => Some(Details::DocumentAdditionOrUpdate {
received_documents,
indexed_documents: Some(stats.document_count),
}),
Some(Details::DocumentDeletion { provided_ids, .. }) => {
Some(Details::DocumentDeletion {
provided_ids,
deleted_documents: Some(stats.document_count),
})
}
_ => {
// In the case of a `documentAdditionOrUpdate` or `DocumentDeletion`
// the details MUST be set to either addition or deletion
unreachable!();
}
}
}
pool.install(|| {
indexer::index(
index_wtxn,