From 8cacc021fbe5209b364e374b9da5e0de3d1d43cb Mon Sep 17 00:00:00 2001 From: ManyTheFish Date: Wed, 25 Jun 2025 14:42:07 +0200 Subject: [PATCH] Introduce extractor for setting changes --- .../update/new/indexer/document_changes.rs | 2 +- .../update/new/indexer/settings_changes.rs | 93 +++++++++++++++++++ 2 files changed, 94 insertions(+), 1 deletion(-) diff --git a/crates/milli/src/update/new/indexer/document_changes.rs b/crates/milli/src/update/new/indexer/document_changes.rs index 5302c9d05..ca5bc8dc5 100644 --- a/crates/milli/src/update/new/indexer/document_changes.rs +++ b/crates/milli/src/update/new/indexer/document_changes.rs @@ -43,7 +43,7 @@ pub struct DocumentChangeContext< pub extractor_alloc: &'extractor Bump, /// Pool of doc allocators, used to retrieve the doc allocator we provided for the documents - doc_allocs: &'doc ThreadLocal>>, + pub doc_allocs: &'doc ThreadLocal>>, /// Extractor-specific data pub data: &'doc T, diff --git a/crates/milli/src/update/new/indexer/settings_changes.rs b/crates/milli/src/update/new/indexer/settings_changes.rs index 2e3d9c917..f92935399 100644 --- a/crates/milli/src/update/new/indexer/settings_changes.rs +++ b/crates/milli/src/update/new/indexer/settings_changes.rs @@ -14,6 +14,19 @@ use crate::update::new::parallel_iterator_ext::ParallelIteratorExt as _; use crate::update::new::steps::IndexingStep; use crate::update::new::thread_local::{FullySend, MostlySend, ThreadLocal}; use crate::{DocumentId, InternalError, Result}; + +/// An internal iterator (i.e. using `foreach`) of `DocumentChange`s +pub trait SettingsChangeExtractor<'extractor>: Sync { + type Data: MostlySend; + + fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result; + + fn process<'doc>( + &'doc self, + changes: impl Iterator>>, + context: &'doc DocumentChangeContext, + ) -> Result<()>; +} pub struct DatabaseDocuments<'indexer> { documents: &'indexer [DocumentId], primary_key: PrimaryKey<'indexer>, @@ -53,3 +66,83 @@ impl<'indexer> DatabaseDocuments<'indexer> { self.documents.len() } } + +const CHUNK_SIZE: usize = 100; + +pub fn settings_change_extract< + 'extractor, // invariant lifetime of extractor_alloc + 'fid, // invariant lifetime of fields ids map + 'indexer, // covariant lifetime of objects that are borrowed during the entire indexing + 'data, // invariant on EX::Data lifetime of datastore + 'index, // covariant lifetime of the index + EX: SettingsChangeExtractor<'extractor>, + MSP: Fn() -> bool + Sync, +>( + documents: &'indexer DatabaseDocuments<'indexer>, + extractor: &EX, + IndexingContext { + index, + db_fields_ids_map, + new_fields_ids_map, + doc_allocs, + fields_ids_map_store, + must_stop_processing, + progress, + grenad_parameters: _, + }: IndexingContext<'fid, 'indexer, 'index, MSP>, + extractor_allocs: &'extractor mut ThreadLocal>, + datastore: &'data ThreadLocal, + step: IndexingStep, +) -> Result<()> { + tracing::trace!("We are resetting the extractor allocators"); + progress.update_progress(step); + // Clean up and reuse the extractor allocs + for extractor_alloc in extractor_allocs.iter_mut() { + tracing::trace!("\tWith {} bytes reset", extractor_alloc.0.allocated_bytes()); + extractor_alloc.0.reset(); + } + + let total_documents = documents.len() as u32; + let (step, progress_step) = AtomicDocumentStep::new(total_documents); + progress.update_progress(progress_step); + + let pi = documents.iter(CHUNK_SIZE); + pi.try_arc_for_each_try_init( + || { + DocumentChangeContext::new( + index, + db_fields_ids_map, + new_fields_ids_map, + extractor_allocs, + doc_allocs, + datastore, + fields_ids_map_store, + move |index_alloc| extractor.init_data(index_alloc), + ) + }, + |context, items| { + if (must_stop_processing)() { + return Err(Arc::new(InternalError::AbortedIndexation.into())); + } + + // Clean up and reuse the document-specific allocator + context.doc_alloc.reset(); + + let items = items.as_ref(); + let changes = items + .iter() + .filter_map(|item| documents.item_to_database_document(context, item).transpose()); + + let res = extractor.process(changes, context).map_err(Arc::new); + step.fetch_add(items.as_ref().len() as u32, Ordering::Relaxed); + + // send back the doc_alloc in the pool + context.doc_allocs.get_or_default().0.set(std::mem::take(&mut context.doc_alloc)); + + res + }, + )?; + step.store(total_documents, Ordering::Relaxed); + + Ok(()) +}