mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-23 13:24:27 +01:00
Check for at least one valid task after setting their statuses
This commit is contained in:
parent
83865d2ebd
commit
bd31ea2174
@ -1229,9 +1229,7 @@ impl IndexScheduler {
|
|||||||
const PRINT_SECS_DELTA: u64 = 1;
|
const PRINT_SECS_DELTA: u64 = 1;
|
||||||
|
|
||||||
let processing_tasks = self.processing_tasks.clone();
|
let processing_tasks = self.processing_tasks.clone();
|
||||||
|
|
||||||
let must_stop_processing = self.must_stop_processing.clone();
|
let must_stop_processing = self.must_stop_processing.clone();
|
||||||
|
|
||||||
let send_progress = |progress| {
|
let send_progress = |progress| {
|
||||||
let now = std::time::Instant::now();
|
let now = std::time::Instant::now();
|
||||||
let elapsed = secs_since_started_processing_at.load(atomic::Ordering::Relaxed);
|
let elapsed = secs_since_started_processing_at.load(atomic::Ordering::Relaxed);
|
||||||
@ -1327,6 +1325,7 @@ impl IndexScheduler {
|
|||||||
)?
|
)?
|
||||||
.map_err(milli::Error::from)?;
|
.map_err(milli::Error::from)?;
|
||||||
|
|
||||||
|
let indexer_config = self.index_mapper.indexer_config();
|
||||||
let mut content_files_iter = content_files.iter();
|
let mut content_files_iter = content_files.iter();
|
||||||
let mut indexer = indexer::DocumentOperation::new(method);
|
let mut indexer = indexer::DocumentOperation::new(method);
|
||||||
let embedders = index.embedding_configs(index_wtxn)?;
|
let embedders = index.embedding_configs(index_wtxn)?;
|
||||||
@ -1348,54 +1347,56 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if tasks.iter().any(|res| res.error.is_none()) {
|
let local_pool;
|
||||||
let local_pool;
|
let pool = match &indexer_config.thread_pool {
|
||||||
let pool = match &self.index_mapper.indexer_config().thread_pool {
|
Some(pool) => pool,
|
||||||
Some(pool) => pool,
|
None => {
|
||||||
None => {
|
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
|
||||||
local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap();
|
&local_pool
|
||||||
&local_pool
|
}
|
||||||
}
|
};
|
||||||
};
|
|
||||||
|
|
||||||
let (document_changes, operation_stats) = indexer.into_changes(
|
let (document_changes, operation_stats) = indexer.into_changes(
|
||||||
&indexer_alloc,
|
&indexer_alloc,
|
||||||
index,
|
index,
|
||||||
&rtxn,
|
&rtxn,
|
||||||
&primary_key,
|
&primary_key,
|
||||||
&mut new_fields_ids_map,
|
&mut new_fields_ids_map,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
for (stats, task) in operation_stats.into_iter().zip(&mut tasks) {
|
let mut addition = 0;
|
||||||
match stats.error {
|
for (stats, task) in operation_stats.into_iter().zip(&mut tasks) {
|
||||||
Some(error) => {
|
addition += stats.document_count;
|
||||||
task.status = Status::Failed;
|
match stats.error {
|
||||||
task.error = Some(milli::Error::UserError(error).into());
|
Some(error) => {
|
||||||
}
|
task.status = Status::Failed;
|
||||||
None => task.status = Status::Succeeded,
|
task.error = Some(milli::Error::UserError(error).into());
|
||||||
}
|
|
||||||
|
|
||||||
task.details = match task.details {
|
|
||||||
Some(Details::DocumentAdditionOrUpdate {
|
|
||||||
received_documents, ..
|
|
||||||
}) => Some(Details::DocumentAdditionOrUpdate {
|
|
||||||
received_documents,
|
|
||||||
indexed_documents: Some(stats.document_count),
|
|
||||||
}),
|
|
||||||
Some(Details::DocumentDeletion { provided_ids, .. }) => {
|
|
||||||
Some(Details::DocumentDeletion {
|
|
||||||
provided_ids,
|
|
||||||
deleted_documents: Some(stats.document_count),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
// In the case of a `documentAdditionOrUpdate` or `DocumentDeletion`
|
|
||||||
// the details MUST be set to either addition or deletion
|
|
||||||
unreachable!();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
None => task.status = Status::Succeeded,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
task.details = match task.details {
|
||||||
|
Some(Details::DocumentAdditionOrUpdate { received_documents, .. }) => {
|
||||||
|
Some(Details::DocumentAdditionOrUpdate {
|
||||||
|
received_documents,
|
||||||
|
indexed_documents: Some(stats.document_count),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
Some(Details::DocumentDeletion { provided_ids, .. }) => {
|
||||||
|
Some(Details::DocumentDeletion {
|
||||||
|
provided_ids,
|
||||||
|
deleted_documents: Some(stats.document_count),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
// In the case of a `documentAdditionOrUpdate` or `DocumentDeletion`
|
||||||
|
// the details MUST be set to either addition or deletion
|
||||||
|
unreachable!();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if tasks.iter().any(|res| res.error.is_none()) {
|
||||||
pool.install(|| {
|
pool.install(|| {
|
||||||
indexer::index(
|
indexer::index(
|
||||||
index_wtxn,
|
index_wtxn,
|
||||||
@ -1411,7 +1412,7 @@ impl IndexScheduler {
|
|||||||
})
|
})
|
||||||
.unwrap()?;
|
.unwrap()?;
|
||||||
|
|
||||||
// tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
|
tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done");
|
||||||
}
|
}
|
||||||
// else if primary_key_has_been_set {
|
// else if primary_key_has_been_set {
|
||||||
// // Everything failed but we've set a primary key.
|
// // Everything failed but we've set a primary key.
|
||||||
|
Loading…
Reference in New Issue
Block a user