Check for at least one valid task after setting their statuses

This commit is contained in:
Clément Renault 2024-11-14 16:13:38 +01:00
parent 575d029011
commit 692c59cdd3
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F

View File

@ -1229,9 +1229,7 @@ impl IndexScheduler {
const PRINT_SECS_DELTA: u64 = 1; const PRINT_SECS_DELTA: u64 = 1;
let processing_tasks = self.processing_tasks.clone(); let processing_tasks = self.processing_tasks.clone();
let must_stop_processing = self.must_stop_processing.clone(); let must_stop_processing = self.must_stop_processing.clone();
let send_progress = |progress| { let send_progress = |progress| {
let now = std::time::Instant::now(); let now = std::time::Instant::now();
let elapsed = secs_since_started_processing_at.load(atomic::Ordering::Relaxed); let elapsed = secs_since_started_processing_at.load(atomic::Ordering::Relaxed);
@ -1327,6 +1325,7 @@ impl IndexScheduler {
)? )?
.map_err(milli::Error::from)?; .map_err(milli::Error::from)?;
let indexer_config = self.index_mapper.indexer_config();
let mut content_files_iter = content_files.iter(); let mut content_files_iter = content_files.iter();
let mut indexer = indexer::DocumentOperation::new(method); let mut indexer = indexer::DocumentOperation::new(method);
let embedders = index.embedding_configs(index_wtxn)?; let embedders = index.embedding_configs(index_wtxn)?;
@ -1348,54 +1347,56 @@ impl IndexScheduler {
} }
} }
if tasks.iter().any(|res| res.error.is_none()) { let local_pool;
let local_pool; let pool = match &indexer_config.thread_pool {
let pool = match &self.index_mapper.indexer_config().thread_pool { Some(pool) => pool,
Some(pool) => pool, None => {
None => { local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); &local_pool
&local_pool }
} };
};
let (document_changes, operation_stats) = indexer.into_changes( let (document_changes, operation_stats) = indexer.into_changes(
&indexer_alloc, &indexer_alloc,
index, index,
&rtxn, &rtxn,
&primary_key, &primary_key,
&mut new_fields_ids_map, &mut new_fields_ids_map,
)?; )?;
for (stats, task) in operation_stats.into_iter().zip(&mut tasks) { let mut addition = 0;
match stats.error { for (stats, task) in operation_stats.into_iter().zip(&mut tasks) {
Some(error) => { addition += stats.document_count;
task.status = Status::Failed; match stats.error {
task.error = Some(milli::Error::UserError(error).into()); Some(error) => {
} task.status = Status::Failed;
None => task.status = Status::Succeeded, task.error = Some(milli::Error::UserError(error).into());
}
task.details = match task.details {
Some(Details::DocumentAdditionOrUpdate {
received_documents, ..
}) => Some(Details::DocumentAdditionOrUpdate {
received_documents,
indexed_documents: Some(stats.document_count),
}),
Some(Details::DocumentDeletion { provided_ids, .. }) => {
Some(Details::DocumentDeletion {
provided_ids,
deleted_documents: Some(stats.document_count),
})
}
_ => {
// In the case of a `documentAdditionOrUpdate` or `DocumentDeletion`
// the details MUST be set to either addition or deletion
unreachable!();
}
} }
None => task.status = Status::Succeeded,
} }
task.details = match task.details {
Some(Details::DocumentAdditionOrUpdate { received_documents, .. }) => {
Some(Details::DocumentAdditionOrUpdate {
received_documents,
indexed_documents: Some(stats.document_count),
})
}
Some(Details::DocumentDeletion { provided_ids, .. }) => {
Some(Details::DocumentDeletion {
provided_ids,
deleted_documents: Some(stats.document_count),
})
}
_ => {
// In the case of a `documentAdditionOrUpdate` or `DocumentDeletion`
// the details MUST be set to either addition or deletion
unreachable!();
}
}
}
if tasks.iter().any(|res| res.error.is_none()) {
pool.install(|| { pool.install(|| {
indexer::index( indexer::index(
index_wtxn, index_wtxn,
@ -1411,7 +1412,7 @@ impl IndexScheduler {
}) })
.unwrap()?; .unwrap()?;
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
} }
// else if primary_key_has_been_set { // else if primary_key_has_been_set {
// // Everything failed but we've set a primary key. // // Everything failed but we've set a primary key.