diff --git a/crates/index-scheduler/src/batch.rs b/crates/index-scheduler/src/batch.rs index fb9cfbe6c..43c5e5df6 100644 --- a/crates/index-scheduler/src/batch.rs +++ b/crates/index-scheduler/src/batch.rs @@ -1229,9 +1229,7 @@ impl IndexScheduler { const PRINT_SECS_DELTA: u64 = 1; let processing_tasks = self.processing_tasks.clone(); - let must_stop_processing = self.must_stop_processing.clone(); - let send_progress = |progress| { let now = std::time::Instant::now(); let elapsed = secs_since_started_processing_at.load(atomic::Ordering::Relaxed); @@ -1327,6 +1325,7 @@ impl IndexScheduler { )? .map_err(milli::Error::from)?; + let indexer_config = self.index_mapper.indexer_config(); let mut content_files_iter = content_files.iter(); let mut indexer = indexer::DocumentOperation::new(method); let embedders = index.embedding_configs(index_wtxn)?; @@ -1348,54 +1347,56 @@ impl IndexScheduler { } } - if tasks.iter().any(|res| res.error.is_none()) { - let local_pool; - let pool = match &self.index_mapper.indexer_config().thread_pool { - Some(pool) => pool, - None => { - local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); - &local_pool - } - }; + let local_pool; + let pool = match &indexer_config.thread_pool { + Some(pool) => pool, + None => { + local_pool = ThreadPoolNoAbortBuilder::new().build().unwrap(); + &local_pool + } + }; - let (document_changes, operation_stats) = indexer.into_changes( - &indexer_alloc, - index, - &rtxn, - &primary_key, - &mut new_fields_ids_map, - )?; + let (document_changes, operation_stats) = indexer.into_changes( + &indexer_alloc, + index, + &rtxn, + &primary_key, + &mut new_fields_ids_map, + )?; - for (stats, task) in operation_stats.into_iter().zip(&mut tasks) { - match stats.error { - Some(error) => { - task.status = Status::Failed; - task.error = Some(milli::Error::UserError(error).into()); - } - None => task.status = Status::Succeeded, - } - - task.details = match task.details { - Some(Details::DocumentAdditionOrUpdate { - received_documents, .. - }) => Some(Details::DocumentAdditionOrUpdate { - received_documents, - indexed_documents: Some(stats.document_count), - }), - Some(Details::DocumentDeletion { provided_ids, .. }) => { - Some(Details::DocumentDeletion { - provided_ids, - deleted_documents: Some(stats.document_count), - }) - } - _ => { - // In the case of a `documentAdditionOrUpdate` or `DocumentDeletion` - // the details MUST be set to either addition or deletion - unreachable!(); - } + let mut addition = 0; + for (stats, task) in operation_stats.into_iter().zip(&mut tasks) { + addition += stats.document_count; + match stats.error { + Some(error) => { + task.status = Status::Failed; + task.error = Some(milli::Error::UserError(error).into()); } + None => task.status = Status::Succeeded, } + task.details = match task.details { + Some(Details::DocumentAdditionOrUpdate { received_documents, .. }) => { + Some(Details::DocumentAdditionOrUpdate { + received_documents, + indexed_documents: Some(stats.document_count), + }) + } + Some(Details::DocumentDeletion { provided_ids, .. }) => { + Some(Details::DocumentDeletion { + provided_ids, + deleted_documents: Some(stats.document_count), + }) + } + _ => { + // In the case of a `documentAdditionOrUpdate` or `DocumentDeletion` + // the details MUST be set to either addition or deletion + unreachable!(); + } + } + } + + if tasks.iter().any(|res| res.error.is_none()) { pool.install(|| { indexer::index( index_wtxn, @@ -1411,7 +1412,7 @@ impl IndexScheduler { }) .unwrap()?; - // tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); + tracing::info!(indexing_result = ?addition, processed_in = ?started_processing_at.elapsed(), "document indexing done"); } // else if primary_key_has_been_set { // // Everything failed but we've set a primary key.