From 29f6eeff8fc82b55799e1c958249cb53349e603e Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Thu, 26 Jun 2025 12:07:48 +0200 Subject: [PATCH] Remove lots of Arcs --- crates/benchmarks/benches/indexing.rs | 62 +++++++++---------- crates/benchmarks/benches/utils.rs | 2 +- crates/fuzzers/src/bin/fuzz-indexing.rs | 2 +- .../src/scheduler/process_index_operation.rs | 10 +-- .../milli/src/search/new/tests/integration.rs | 2 +- crates/milli/src/test_index.rs | 6 +- .../extract/extract_vector_points.rs | 8 +-- .../src/update/index_documents/extract/mod.rs | 6 +- .../milli/src/update/index_documents/mod.rs | 30 ++++----- .../src/update/new/extract/vectors/mod.rs | 18 +++--- .../milli/src/update/new/indexer/extract.rs | 3 +- crates/milli/src/update/new/indexer/mod.rs | 3 +- crates/milli/src/update/settings.rs | 4 +- crates/milli/src/vector/composite.rs | 9 ++- crates/milli/src/vector/mod.rs | 4 +- crates/milli/src/vector/ollama.rs | 15 +++-- crates/milli/src/vector/openai.rs | 15 +++-- crates/milli/src/vector/rest.rs | 23 ++++--- .../milli/tests/search/facet_distribution.rs | 2 +- crates/milli/tests/search/mod.rs | 2 +- crates/milli/tests/search/query_criteria.rs | 2 +- crates/milli/tests/search/typo_tolerance.rs | 2 +- 22 files changed, 112 insertions(+), 118 deletions(-) diff --git a/crates/benchmarks/benches/indexing.rs b/crates/benchmarks/benches/indexing.rs index 8241da9d2..3afad8ee5 100644 --- a/crates/benchmarks/benches/indexing.rs +++ b/crates/benchmarks/benches/indexing.rs @@ -169,7 +169,7 @@ fn indexing_songs_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); @@ -236,7 +236,7 @@ fn reindexing_songs_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); @@ -281,7 +281,7 @@ fn reindexing_songs_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); @@ -350,7 +350,7 @@ fn deleting_songs_in_batches_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); @@ -427,7 +427,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); @@ -472,7 +472,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); @@ -513,7 +513,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); @@ -581,7 +581,7 @@ fn indexing_songs_without_faceted_numbers(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); @@ -648,7 +648,7 @@ fn indexing_songs_without_faceted_fields(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); @@ -715,7 +715,7 @@ fn indexing_wiki(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); @@ -781,7 +781,7 @@ fn reindexing_wiki(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); @@ -826,7 +826,7 @@ fn reindexing_wiki(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); @@ -894,7 +894,7 @@ fn deleting_wiki_in_batches_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); @@ -971,7 +971,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); @@ -1017,7 +1017,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); @@ -1059,7 +1059,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); @@ -1126,7 +1126,7 @@ fn indexing_movies_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); @@ -1192,7 +1192,7 @@ fn reindexing_movies_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); @@ -1237,7 +1237,7 @@ fn reindexing_movies_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); @@ -1305,7 +1305,7 @@ fn deleting_movies_in_batches_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); @@ -1354,7 +1354,7 @@ fn delete_documents_from_ids(index: Index, document_ids_to_delete: Vec Index { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); diff --git a/crates/fuzzers/src/bin/fuzz-indexing.rs b/crates/fuzzers/src/bin/fuzz-indexing.rs index 23c4cb9c2..0632b7846 100644 --- a/crates/fuzzers/src/bin/fuzz-indexing.rs +++ b/crates/fuzzers/src/bin/fuzz-indexing.rs @@ -144,7 +144,7 @@ fn main() { embedders, &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); diff --git a/crates/index-scheduler/src/scheduler/process_index_operation.rs b/crates/index-scheduler/src/scheduler/process_index_operation.rs index b5338e511..14b07aea0 100644 --- a/crates/index-scheduler/src/scheduler/process_index_operation.rs +++ b/crates/index-scheduler/src/scheduler/process_index_operation.rs @@ -35,7 +35,7 @@ impl IndexScheduler { index: &'i Index, operation: IndexOperation, progress: &Progress, - embedder_stats: Arc, + embedder_stats: Arc, // Cant change ) -> Result<(Vec, Option)> { let indexer_alloc = Bump::new(); let started_processing_at = std::time::Instant::now(); @@ -180,7 +180,7 @@ impl IndexScheduler { embedders, &|| must_stop_processing.get(), progress, - embedder_stats, + &embedder_stats, ) .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?, ); @@ -292,7 +292,7 @@ impl IndexScheduler { embedders, &|| must_stop_processing.get(), progress, - embedder_stats, + &embedder_stats, ) .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?, ); @@ -441,7 +441,7 @@ impl IndexScheduler { embedders, &|| must_stop_processing.get(), progress, - embedder_stats, + &embedder_stats, ) .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?, ); @@ -478,7 +478,7 @@ impl IndexScheduler { .execute( |indexing_step| tracing::debug!(update = ?indexing_step), || must_stop_processing.get(), - embedder_stats, + embedder_stats.clone(), ) .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; diff --git a/crates/milli/src/search/new/tests/integration.rs b/crates/milli/src/search/new/tests/integration.rs index c4e521a88..36917c10e 100644 --- a/crates/milli/src/search/new/tests/integration.rs +++ b/crates/milli/src/search/new/tests/integration.rs @@ -95,7 +95,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { embedders, &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); diff --git a/crates/milli/src/test_index.rs b/crates/milli/src/test_index.rs index 3546660b0..d218bb3a6 100644 --- a/crates/milli/src/test_index.rs +++ b/crates/milli/src/test_index.rs @@ -103,7 +103,7 @@ impl TempIndex { embedders, &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) }) .unwrap()?; @@ -186,7 +186,7 @@ impl TempIndex { embedders, &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) }) .unwrap()?; @@ -261,7 +261,7 @@ fn aborting_indexation() { embedders, &|| should_abort.load(Relaxed), &Progress::default(), - Default::default(), + &Default::default(), ) }) .unwrap() diff --git a/crates/milli/src/update/index_documents/extract/extract_vector_points.rs b/crates/milli/src/update/index_documents/extract/extract_vector_points.rs index e6d874a69..e1981a615 100644 --- a/crates/milli/src/update/index_documents/extract/extract_vector_points.rs +++ b/crates/milli/src/update/index_documents/extract/extract_vector_points.rs @@ -684,7 +684,7 @@ pub fn extract_embeddings( embedder: Arc, embedder_name: &str, possible_embedding_mistakes: &PossibleEmbeddingMistakes, - embedder_stats: Arc, + embedder_stats: &EmbedderStats, unused_vectors_distribution: &UnusedVectorsDistribution, request_threads: &ThreadPoolNoAbort, ) -> Result>> { @@ -727,7 +727,7 @@ pub fn extract_embeddings( std::mem::replace(&mut chunks, Vec::with_capacity(n_chunks)), embedder_name, possible_embedding_mistakes, - embedder_stats.clone(), + embedder_stats, unused_vectors_distribution, request_threads, )?; @@ -750,7 +750,7 @@ pub fn extract_embeddings( std::mem::take(&mut chunks), embedder_name, possible_embedding_mistakes, - embedder_stats.clone(), + embedder_stats, unused_vectors_distribution, request_threads, )?; @@ -789,7 +789,7 @@ fn embed_chunks( text_chunks: Vec>, embedder_name: &str, possible_embedding_mistakes: &PossibleEmbeddingMistakes, - embedder_stats: Arc, + embedder_stats: &EmbedderStats, unused_vectors_distribution: &UnusedVectorsDistribution, request_threads: &ThreadPoolNoAbort, ) -> Result>> { diff --git a/crates/milli/src/update/index_documents/extract/mod.rs b/crates/milli/src/update/index_documents/extract/mod.rs index 1eeddcccb..3af665c67 100644 --- a/crates/milli/src/update/index_documents/extract/mod.rs +++ b/crates/milli/src/update/index_documents/extract/mod.rs @@ -50,7 +50,7 @@ pub(crate) fn data_from_obkv_documents( settings_diff: Arc, max_positions_per_attributes: Option, possible_embedding_mistakes: Arc, - embedder_stats: Arc, + embedder_stats: Arc, // Cant change ) -> Result<()> { let (original_pipeline_result, flattened_pipeline_result): (Result<_>, Result<_>) = rayon::join( || { @@ -234,7 +234,7 @@ fn send_original_documents_data( embedders_configs: Arc>, settings_diff: Arc, possible_embedding_mistakes: Arc, - embedder_stats: Arc, + embedder_stats: Arc, // Cant change ) -> Result<()> { let original_documents_chunk = original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; @@ -274,7 +274,7 @@ fn send_original_documents_data( embedder.clone(), &embedder_name, &possible_embedding_mistakes, - embedder_stats.clone(), + &embedder_stats, &unused_vectors_distribution, request_threads(), ) { diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index f2e1783e4..2bddf1b17 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -81,7 +81,7 @@ pub struct IndexDocuments<'t, 'i, 'a, FP, FA> { added_documents: u64, deleted_documents: u64, embedders: EmbeddingConfigs, - embedder_stats: Arc, + embedder_stats: Arc, // Cant change } #[derive(Default, Debug, Clone)] @@ -104,7 +104,7 @@ where config: IndexDocumentsConfig, progress: FP, should_abort: FA, - embedder_stats: Arc, + embedder_stats: Arc, // Cant change ) -> Result> { let transform = Some(Transform::new( wtxn, @@ -2030,7 +2030,7 @@ mod tests { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2118,7 +2118,7 @@ mod tests { EmbeddingConfigs::default(), &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2304,7 +2304,7 @@ mod tests { embedders, &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2367,7 +2367,7 @@ mod tests { embedders, &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2421,7 +2421,7 @@ mod tests { embedders, &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2474,7 +2474,7 @@ mod tests { embedders, &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2529,7 +2529,7 @@ mod tests { embedders, &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2589,7 +2589,7 @@ mod tests { embedders, &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2642,7 +2642,7 @@ mod tests { embedders, &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2695,7 +2695,7 @@ mod tests { embedders, &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2894,7 +2894,7 @@ mod tests { embedders, &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2954,7 +2954,7 @@ mod tests { embedders, &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -3011,7 +3011,7 @@ mod tests { embedders, &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); diff --git a/crates/milli/src/update/new/extract/vectors/mod.rs b/crates/milli/src/update/new/extract/vectors/mod.rs index c21dabf74..85398aa99 100644 --- a/crates/milli/src/update/new/extract/vectors/mod.rs +++ b/crates/milli/src/update/new/extract/vectors/mod.rs @@ -1,4 +1,4 @@ -use std::{cell::RefCell, sync::Arc}; +use std::cell::RefCell; use bumpalo::collections::Vec as BVec; use bumpalo::Bump; @@ -23,7 +23,7 @@ pub struct EmbeddingExtractor<'a, 'b> { embedders: &'a EmbeddingConfigs, sender: EmbeddingSender<'a, 'b>, possible_embedding_mistakes: PossibleEmbeddingMistakes, - embedder_stats: Arc, + embedder_stats: &'a EmbedderStats, threads: &'a ThreadPoolNoAbort, } @@ -32,7 +32,7 @@ impl<'a, 'b> EmbeddingExtractor<'a, 'b> { embedders: &'a EmbeddingConfigs, sender: EmbeddingSender<'a, 'b>, field_distribution: &'a FieldDistribution, - embedder_stats: Arc, + embedder_stats: &'a EmbedderStats, threads: &'a ThreadPoolNoAbort, ) -> Self { let possible_embedding_mistakes = PossibleEmbeddingMistakes::new(field_distribution); @@ -78,7 +78,7 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> { prompt, context.data, &self.possible_embedding_mistakes, - self.embedder_stats.clone(), + self.embedder_stats, self.threads, self.sender, &context.doc_alloc, @@ -311,7 +311,7 @@ struct Chunks<'a, 'b, 'extractor> { dimensions: usize, prompt: &'a Prompt, possible_embedding_mistakes: &'a PossibleEmbeddingMistakes, - embedder_stats: Arc, + embedder_stats: &'a EmbedderStats, user_provided: &'a RefCell>, threads: &'a ThreadPoolNoAbort, sender: EmbeddingSender<'a, 'b>, @@ -327,7 +327,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> { prompt: &'a Prompt, user_provided: &'a RefCell>, possible_embedding_mistakes: &'a PossibleEmbeddingMistakes, - embedder_stats: Arc, + embedder_stats: &'a EmbedderStats, threads: &'a ThreadPoolNoAbort, sender: EmbeddingSender<'a, 'b>, doc_alloc: &'a Bump, @@ -378,7 +378,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> { self.embedder_id, self.embedder_name, self.possible_embedding_mistakes, - self.embedder_stats.clone(), + self.embedder_stats, unused_vectors_distribution, self.threads, self.sender, @@ -397,7 +397,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> { self.embedder_id, self.embedder_name, self.possible_embedding_mistakes, - self.embedder_stats.clone(), + self.embedder_stats, unused_vectors_distribution, self.threads, self.sender, @@ -416,7 +416,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> { embedder_id: u8, embedder_name: &str, possible_embedding_mistakes: &PossibleEmbeddingMistakes, - embedder_stats: Arc, + embedder_stats: &EmbedderStats, unused_vectors_distribution: &UnusedVectorsDistributionBump, threads: &ThreadPoolNoAbort, sender: EmbeddingSender<'a, 'b>, diff --git a/crates/milli/src/update/new/indexer/extract.rs b/crates/milli/src/update/new/indexer/extract.rs index c721a2563..97ffc8624 100644 --- a/crates/milli/src/update/new/indexer/extract.rs +++ b/crates/milli/src/update/new/indexer/extract.rs @@ -1,6 +1,5 @@ use std::collections::BTreeMap; use std::sync::atomic::AtomicBool; -use std::sync::Arc; use std::sync::OnceLock; use bumpalo::Bump; @@ -36,7 +35,7 @@ pub(super) fn extract_all<'pl, 'extractor, DC, MSP>( mut index_embeddings: Vec, document_ids: &mut RoaringBitmap, modified_docids: &mut RoaringBitmap, - embedder_stats: Arc, + embedder_stats: &EmbedderStats, ) -> Result<(FacetFieldIdsDelta, Vec)> where DC: DocumentChanges<'pl>, diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 33774f892..bb6ba0102 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -1,5 +1,4 @@ use std::sync::atomic::AtomicBool; -use std::sync::Arc; use std::sync::{Once, RwLock}; use std::thread::{self, Builder}; @@ -56,7 +55,7 @@ pub fn index<'pl, 'indexer, 'index, DC, MSP>( embedders: EmbeddingConfigs, must_stop_processing: &'indexer MSP, progress: &'indexer Progress, - embedder_stats: Arc, + embedder_stats: &'indexer EmbedderStats, ) -> Result where DC: DocumentChanges<'pl>, diff --git a/crates/milli/src/update/settings.rs b/crates/milli/src/update/settings.rs index b3f70d1b6..71cedf456 100644 --- a/crates/milli/src/update/settings.rs +++ b/crates/milli/src/update/settings.rs @@ -475,7 +475,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { progress_callback: &FP, should_abort: &FA, settings_diff: InnerIndexSettingsDiff, - embedder_stats: Arc, + embedder_stats: Arc, // Cant change ) -> Result<()> where FP: Fn(UpdateIndexingStep) + Sync, @@ -1362,7 +1362,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { mut self, progress_callback: FP, should_abort: FA, - embedder_stats: Arc, + embedder_stats: Arc, // Cant change ) -> Result<()> where FP: Fn(UpdateIndexingStep) + Sync, diff --git a/crates/milli/src/vector/composite.rs b/crates/milli/src/vector/composite.rs index 87f05d4fe..8314b8649 100644 --- a/crates/milli/src/vector/composite.rs +++ b/crates/milli/src/vector/composite.rs @@ -1,4 +1,3 @@ -use std::sync::Arc; use std::time::Instant; use arroy::Distance; @@ -154,7 +153,7 @@ impl SubEmbedder { &self, texts: Vec, deadline: Option, - embedder_stats: Option>, + embedder_stats: Option<&EmbedderStats>, ) -> std::result::Result, EmbedError> { match self { SubEmbedder::HuggingFace(embedder) => embedder.embed(texts), @@ -169,7 +168,7 @@ impl SubEmbedder { &self, text: &str, deadline: Option, - embedder_stats: Option>, + embedder_stats: Option<&EmbedderStats>, ) -> std::result::Result { match self { SubEmbedder::HuggingFace(embedder) => embedder.embed_one(text), @@ -196,7 +195,7 @@ impl SubEmbedder { &self, text_chunks: Vec>, threads: &ThreadPoolNoAbort, - embedder_stats: Arc, + embedder_stats: &EmbedderStats, ) -> std::result::Result>, EmbedError> { match self { SubEmbedder::HuggingFace(embedder) => embedder.embed_index(text_chunks), @@ -218,7 +217,7 @@ impl SubEmbedder { &self, texts: &[&str], threads: &ThreadPoolNoAbort, - embedder_stats: Arc, + embedder_stats: &EmbedderStats, ) -> std::result::Result, EmbedError> { match self { SubEmbedder::HuggingFace(embedder) => embedder.embed_index_ref(texts), diff --git a/crates/milli/src/vector/mod.rs b/crates/milli/src/vector/mod.rs index 481eb6c99..065beb5fb 100644 --- a/crates/milli/src/vector/mod.rs +++ b/crates/milli/src/vector/mod.rs @@ -749,7 +749,7 @@ impl Embedder { &self, text_chunks: Vec>, threads: &ThreadPoolNoAbort, - embedder_stats: Arc, + embedder_stats: &EmbedderStats, ) -> std::result::Result>, EmbedError> { match self { Embedder::HuggingFace(embedder) => embedder.embed_index(text_chunks), @@ -772,7 +772,7 @@ impl Embedder { &self, texts: &[&str], threads: &ThreadPoolNoAbort, - embedder_stats: Arc, + embedder_stats: &EmbedderStats, ) -> std::result::Result, EmbedError> { match self { Embedder::HuggingFace(embedder) => embedder.embed_index_ref(texts), diff --git a/crates/milli/src/vector/ollama.rs b/crates/milli/src/vector/ollama.rs index 045b65b72..d4329a2de 100644 --- a/crates/milli/src/vector/ollama.rs +++ b/crates/milli/src/vector/ollama.rs @@ -1,4 +1,3 @@ -use std::sync::Arc; use std::time::Instant; use rayon::iter::{IntoParallelIterator as _, ParallelIterator as _}; @@ -106,7 +105,7 @@ impl Embedder { &self, texts: &[S], deadline: Option, - embedder_stats: Option>, + embedder_stats: Option<&EmbedderStats>, ) -> Result, EmbedError> { match self.rest_embedder.embed_ref(texts, deadline, embedder_stats) { Ok(embeddings) => Ok(embeddings), @@ -121,21 +120,21 @@ impl Embedder { &self, text_chunks: Vec>, threads: &ThreadPoolNoAbort, - embedder_stats: Arc, + embedder_stats: &EmbedderStats, ) -> Result>, EmbedError> { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { text_chunks .into_iter() - .map(move |chunk| self.embed(&chunk, None, Some(embedder_stats.clone()))) + .map(move |chunk| self.embed(&chunk, None, Some(embedder_stats))) .collect() } else { threads .install(move || { text_chunks .into_par_iter() - .map(move |chunk| self.embed(&chunk, None, Some(embedder_stats.clone()))) + .map(move |chunk| self.embed(&chunk, None, Some(embedder_stats))) .collect() }) .map_err(|error| EmbedError { @@ -149,14 +148,14 @@ impl Embedder { &self, texts: &[&str], threads: &ThreadPoolNoAbort, - embedder_stats: Arc, + embedder_stats: &EmbedderStats, ) -> Result>, EmbedError> { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { let embeddings: Result>, _> = texts .chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed(chunk, None, Some(embedder_stats.clone()))) + .map(move |chunk| self.embed(chunk, None, Some(embedder_stats))) .collect(); let embeddings = embeddings?; @@ -166,7 +165,7 @@ impl Embedder { .install(move || { let embeddings: Result>, _> = texts .par_chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed(chunk, None, Some(embedder_stats.clone()))) + .map(move |chunk| self.embed(chunk, None, Some(embedder_stats))) .collect(); let embeddings = embeddings?; diff --git a/crates/milli/src/vector/openai.rs b/crates/milli/src/vector/openai.rs index b64e3d467..0159d5c76 100644 --- a/crates/milli/src/vector/openai.rs +++ b/crates/milli/src/vector/openai.rs @@ -1,5 +1,4 @@ use std::fmt; -use std::sync::Arc; use std::time::Instant; use ordered_float::OrderedFloat; @@ -217,7 +216,7 @@ impl Embedder { &self, texts: &[S], deadline: Option, - embedder_stats: Option>, + embedder_stats: Option<&EmbedderStats>, ) -> Result, EmbedError> { match self.rest_embedder.embed_ref(texts, deadline, embedder_stats) { Ok(embeddings) => Ok(embeddings), @@ -262,21 +261,21 @@ impl Embedder { &self, text_chunks: Vec>, threads: &ThreadPoolNoAbort, - embedder_stats: Arc, + embedder_stats: &EmbedderStats, ) -> Result>, EmbedError> { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { text_chunks .into_iter() - .map(move |chunk| self.embed(&chunk, None, Some(embedder_stats.clone()))) + .map(move |chunk| self.embed(&chunk, None, Some(embedder_stats))) .collect() } else { threads .install(move || { text_chunks .into_par_iter() - .map(move |chunk| self.embed(&chunk, None, Some(embedder_stats.clone()))) + .map(move |chunk| self.embed(&chunk, None, Some(embedder_stats))) .collect() }) .map_err(|error| EmbedError { @@ -290,14 +289,14 @@ impl Embedder { &self, texts: &[&str], threads: &ThreadPoolNoAbort, - embedder_stats: Arc, + embedder_stats: &EmbedderStats, ) -> Result>, EmbedError> { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { let embeddings: Result>, _> = texts .chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed(chunk, None, Some(embedder_stats.clone()))) + .map(move |chunk| self.embed(chunk, None, Some(embedder_stats))) .collect(); let embeddings = embeddings?; Ok(embeddings.into_iter().flatten().collect()) @@ -306,7 +305,7 @@ impl Embedder { .install(move || { let embeddings: Result>, _> = texts .par_chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed(chunk, None, Some(embedder_stats.clone()))) + .map(move |chunk| self.embed(chunk, None, Some(embedder_stats))) .collect(); let embeddings = embeddings?; diff --git a/crates/milli/src/vector/rest.rs b/crates/milli/src/vector/rest.rs index dd08c6a5e..fbe3c1129 100644 --- a/crates/milli/src/vector/rest.rs +++ b/crates/milli/src/vector/rest.rs @@ -1,5 +1,4 @@ use std::collections::BTreeMap; -use std::sync::Arc; use std::time::Instant; use deserr::Deserr; @@ -170,7 +169,7 @@ impl Embedder { &self, texts: Vec, deadline: Option, - embedder_stats: Option>, + embedder_stats: Option<&EmbedderStats>, ) -> Result, EmbedError> { embed( &self.data, @@ -186,7 +185,7 @@ impl Embedder { &self, texts: &[S], deadline: Option, - embedder_stats: Option>, + embedder_stats: Option<&EmbedderStats>, ) -> Result, EmbedError> where S: AsRef + Serialize, @@ -208,21 +207,21 @@ impl Embedder { &self, text_chunks: Vec>, threads: &ThreadPoolNoAbort, - embedder_stats: Arc, + embedder_stats: &EmbedderStats, ) -> Result>, EmbedError> { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { text_chunks .into_iter() - .map(move |chunk| self.embed(chunk, None, Some(embedder_stats.clone()))) + .map(move |chunk| self.embed(chunk, None, Some(embedder_stats))) .collect() } else { threads .install(move || { text_chunks .into_par_iter() - .map(move |chunk| self.embed(chunk, None, Some(embedder_stats.clone()))) + .map(move |chunk| self.embed(chunk, None, Some(embedder_stats))) .collect() }) .map_err(|error| EmbedError { @@ -236,14 +235,14 @@ impl Embedder { &self, texts: &[&str], threads: &ThreadPoolNoAbort, - embedder_stats: Arc, + embedder_stats: &EmbedderStats, ) -> Result, EmbedError> { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { let embeddings: Result>, _> = texts .chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed_ref(chunk, None, Some(embedder_stats.clone()))) + .map(move |chunk| self.embed_ref(chunk, None, Some(embedder_stats))) .collect(); let embeddings = embeddings?; @@ -253,7 +252,7 @@ impl Embedder { .install(move || { let embeddings: Result>, _> = texts .par_chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed_ref(chunk, None, Some(embedder_stats.clone()))) + .map(move |chunk| self.embed_ref(chunk, None, Some(embedder_stats))) .collect(); let embeddings = embeddings?; @@ -303,7 +302,7 @@ fn embed( expected_count: usize, expected_dimension: Option, deadline: Option, - embedder_stats: Option>, + embedder_stats: Option<&EmbedderStats>, ) -> Result, EmbedError> where S: Serialize, @@ -323,7 +322,7 @@ where for attempt in 0..10 { if let Some(embedder_stats) = &embedder_stats { - embedder_stats.as_ref().total_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + embedder_stats.total_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); } let response = request.clone().send_json(&body); let result = check_response(response, data.configuration_source).and_then(|response| { @@ -367,7 +366,7 @@ where } if let Some(embedder_stats) = &embedder_stats { - embedder_stats.as_ref().total_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + embedder_stats.total_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); } let response = request.send_json(&body); let result = check_response(response, data.configuration_source).and_then(|response| { diff --git a/crates/milli/tests/search/facet_distribution.rs b/crates/milli/tests/search/facet_distribution.rs index 5ed223400..8548f0d01 100644 --- a/crates/milli/tests/search/facet_distribution.rs +++ b/crates/milli/tests/search/facet_distribution.rs @@ -74,7 +74,7 @@ fn test_facet_distribution_with_no_facet_values() { embedders, &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); diff --git a/crates/milli/tests/search/mod.rs b/crates/milli/tests/search/mod.rs index beee4ac54..4098af736 100644 --- a/crates/milli/tests/search/mod.rs +++ b/crates/milli/tests/search/mod.rs @@ -114,7 +114,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { embedders, &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); diff --git a/crates/milli/tests/search/query_criteria.rs b/crates/milli/tests/search/query_criteria.rs index 04b8374de..b72978330 100644 --- a/crates/milli/tests/search/query_criteria.rs +++ b/crates/milli/tests/search/query_criteria.rs @@ -344,7 +344,7 @@ fn criteria_ascdesc() { embedders, &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap(); diff --git a/crates/milli/tests/search/typo_tolerance.rs b/crates/milli/tests/search/typo_tolerance.rs index e2cdab550..9aacbf82a 100644 --- a/crates/milli/tests/search/typo_tolerance.rs +++ b/crates/milli/tests/search/typo_tolerance.rs @@ -153,7 +153,7 @@ fn test_typo_disabled_on_word() { embedders, &|| false, &Progress::default(), - Default::default(), + &Default::default(), ) .unwrap();