make the number of document in the document tasks more incremental

This commit is contained in:
Tamo 2024-12-10 16:58:13 +01:00
parent df9b68f8ed
commit 6f4823fc97
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
2 changed files with 5 additions and 8 deletions

View File

@ -230,7 +230,7 @@ where
send_progress.update_progress(progress_step); send_progress.update_progress(progress_step);
let pi = document_changes.iter(CHUNK_SIZE); let pi = document_changes.iter(CHUNK_SIZE);
pi.enumerate().try_arc_for_each_try_init( pi.try_arc_for_each_try_init(
|| { || {
DocumentChangeContext::new( DocumentChangeContext::new(
index, index,
@ -243,13 +243,10 @@ where
move |index_alloc| extractor.init_data(index_alloc), move |index_alloc| extractor.init_data(index_alloc),
) )
}, },
|context, (finished_documents, items)| { |context, items| {
if (must_stop_processing)() { if (must_stop_processing)() {
return Err(Arc::new(InternalError::AbortedIndexation.into())); return Err(Arc::new(InternalError::AbortedIndexation.into()));
} }
let finished_documents = (finished_documents * CHUNK_SIZE) as u32;
step.store(finished_documents, Ordering::Relaxed);
// Clean up and reuse the document-specific allocator // Clean up and reuse the document-specific allocator
context.doc_alloc.reset(); context.doc_alloc.reset();
@ -260,6 +257,7 @@ where
}); });
let res = extractor.process(changes, context).map_err(Arc::new); let res = extractor.process(changes, context).map_err(Arc::new);
step.fetch_add(items.as_ref().len() as u32, Ordering::Relaxed);
// send back the doc_alloc in the pool // send back the doc_alloc in the pool
context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc)); context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc));

View File

@ -15,7 +15,7 @@ use super::super::document_change::DocumentChange;
use super::document_changes::{DocumentChangeContext, DocumentChanges}; use super::document_changes::{DocumentChangeContext, DocumentChanges};
use super::retrieve_or_guess_primary_key; use super::retrieve_or_guess_primary_key;
use crate::documents::PrimaryKey; use crate::documents::PrimaryKey;
use crate::progress::{AtomicSubStep, Progress}; use crate::progress::{AtomicDocumentStep, Progress};
use crate::update::new::document::Versions; use crate::update::new::document::Versions;
use crate::update::new::steps::IndexingStep; use crate::update::new::steps::IndexingStep;
use crate::update::new::thread_local::MostlySend; use crate::update::new::thread_local::MostlySend;
@ -71,8 +71,7 @@ impl<'pl> DocumentOperation<'pl> {
let mut primary_key = None; let mut primary_key = None;
let payload_count = operations.len(); let payload_count = operations.len();
let (step, progress_step) = let (step, progress_step) = AtomicDocumentStep::new(payload_count as u32);
AtomicSubStep::<crate::progress::Document>::new(payload_count as u32);
progress.update_progress(progress_step); progress.update_progress(progress_step);
for (payload_index, operation) in operations.into_iter().enumerate() { for (payload_index, operation) in operations.into_iter().enumerate() {