From 4cadc8113b2d93be6b59333e5ba49e0e6f4d906a Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Fri, 20 Jun 2025 12:42:22 +0200 Subject: [PATCH] Add embedder stats in batches --- crates/benchmarks/benches/indexing.rs | 2 +- crates/benchmarks/benches/utils.rs | 2 +- crates/dump/src/lib.rs | 1 + .../src/scheduler/process_batch.rs | 1 + .../src/scheduler/process_index_operation.rs | 3 ++ crates/meilisearch-types/src/batches.rs | 10 ++++ crates/meilisearch/src/lib.rs | 3 +- crates/milli/src/progress.rs | 34 +++++++++++- .../milli/src/search/new/tests/integration.rs | 2 +- crates/milli/src/test_index.rs | 2 +- .../extract/extract_vector_points.rs | 8 ++- .../src/update/index_documents/extract/mod.rs | 5 ++ .../milli/src/update/index_documents/mod.rs | 9 +++- .../src/update/new/extract/vectors/mod.rs | 2 +- crates/milli/src/update/settings.rs | 9 ++-- crates/milli/src/vector/composite.rs | 32 ++++++----- crates/milli/src/vector/mod.rs | 28 +++++----- crates/milli/src/vector/ollama.rs | 15 ++++-- crates/milli/src/vector/openai.rs | 17 +++--- crates/milli/src/vector/rest.rs | 54 ++++++++++++++----- crates/milli/tests/search/distinct.rs | 2 +- .../milli/tests/search/facet_distribution.rs | 2 +- crates/milli/tests/search/mod.rs | 2 +- crates/milli/tests/search/phrase_search.rs | 2 +- crates/milli/tests/search/query_criteria.rs | 6 +-- crates/milli/tests/search/typo_tolerance.rs | 8 +-- 26 files changed, 188 insertions(+), 73 deletions(-) diff --git a/crates/benchmarks/benches/indexing.rs b/crates/benchmarks/benches/indexing.rs index 9199c3877..b882b598d 100644 --- a/crates/benchmarks/benches/indexing.rs +++ b/crates/benchmarks/benches/indexing.rs @@ -65,7 +65,7 @@ fn setup_settings<'t>( let sortable_fields = sortable_fields.iter().map(|s| s.to_string()).collect(); builder.set_sortable_fields(sortable_fields); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, None).unwrap(); } fn setup_index_with_settings( diff --git a/crates/benchmarks/benches/utils.rs b/crates/benchmarks/benches/utils.rs index aaa2d50a0..913807b45 100644 --- a/crates/benchmarks/benches/utils.rs +++ b/crates/benchmarks/benches/utils.rs @@ -90,7 +90,7 @@ pub fn base_setup(conf: &Conf) -> Index { (conf.configure)(&mut builder); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, None).unwrap(); wtxn.commit().unwrap(); let config = IndexerConfig::default(); diff --git a/crates/dump/src/lib.rs b/crates/dump/src/lib.rs index 285818a87..c48c68f62 100644 --- a/crates/dump/src/lib.rs +++ b/crates/dump/src/lib.rs @@ -328,6 +328,7 @@ pub(crate) mod test { progress_trace: Default::default(), write_channel_congestion: None, internal_database_sizes: Default::default(), + embeddings: Default::default(), }, enqueued_at: Some(BatchEnqueuedAt { earliest: datetime!(2022-11-11 0:00 UTC), diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index c349f90ad..71e423a58 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -242,6 +242,7 @@ impl IndexScheduler { .execute( |indexing_step| tracing::debug!(update = ?indexing_step), || must_stop_processing.get(), + Some(progress.embedder_stats), ) .map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?; index_wtxn.commit()?; diff --git a/crates/index-scheduler/src/scheduler/process_index_operation.rs b/crates/index-scheduler/src/scheduler/process_index_operation.rs index 093c6209d..92d13e7e7 100644 --- a/crates/index-scheduler/src/scheduler/process_index_operation.rs +++ b/crates/index-scheduler/src/scheduler/process_index_operation.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use bumpalo::collections::CollectIn; use bumpalo::Bump; use meilisearch_types::heed::RwTxn; @@ -472,6 +474,7 @@ impl IndexScheduler { .execute( |indexing_step| tracing::debug!(update = ?indexing_step), || must_stop_processing.get(), + Some(Arc::clone(&progress.embedder_stats)) ) .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; diff --git a/crates/meilisearch-types/src/batches.rs b/crates/meilisearch-types/src/batches.rs index 4d40189db..2ef373eac 100644 --- a/crates/meilisearch-types/src/batches.rs +++ b/crates/meilisearch-types/src/batches.rs @@ -82,4 +82,14 @@ pub struct BatchStats { pub write_channel_congestion: Option>, #[serde(default, skip_serializing_if = "serde_json::Map::is_empty")] pub internal_database_sizes: serde_json::Map, + pub embeddings: BatchEmbeddingStats +} + +#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +#[schema(rename_all = "camelCase")] +pub struct BatchEmbeddingStats { + pub total_count: usize, + pub error_count: usize, + pub last_error: Option, } diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 1e0c205d0..782d6172f 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -543,7 +543,7 @@ fn import_dump( let settings = index_reader.settings()?; apply_settings_to_builder(&settings, &mut builder); builder - .execute(|indexing_step| tracing::debug!("update: {:?}", indexing_step), || false)?; + .execute(|indexing_step| tracing::debug!("update: {:?}", indexing_step), || false, None)?; // 4.3 Import the documents. // 4.3.1 We need to recreate the grenad+obkv format accepted by the index. @@ -574,6 +574,7 @@ fn import_dump( }, |indexing_step| tracing::trace!("update: {:?}", indexing_step), || false, + None, )?; let builder = builder.with_embedders(embedders); diff --git a/crates/milli/src/progress.rs b/crates/milli/src/progress.rs index fa651e17f..ff795b220 100644 --- a/crates/milli/src/progress.rs +++ b/crates/milli/src/progress.rs @@ -1,7 +1,7 @@ use std::any::TypeId; use std::borrow::Cow; use std::marker::PhantomData; -use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; @@ -20,6 +20,13 @@ pub trait Step: 'static + Send + Sync { #[derive(Clone, Default)] pub struct Progress { steps: Arc>, + pub embedder_stats: Arc, +} + +#[derive(Default)] +pub struct EmbedderStats { + pub errors: Arc, u32)>>, + pub total_requests: AtomicUsize } #[derive(Default)] @@ -65,7 +72,19 @@ impl Progress { }); } - ProgressView { steps: step_view, percentage: percentage * 100.0 } + let embedder_view = { + let (last_error, error_count) = match self.embedder_stats.errors.read() { + Ok(guard) => (guard.0.clone(), guard.1), + Err(_) => (None, 0), + }; + EmbedderStatsView { + last_error, + request_count: self.embedder_stats.total_requests.load(Ordering::Relaxed) as u32, + error_count, + } + }; + + ProgressView { steps: step_view, percentage: percentage * 100.0, embedder: embedder_view } } pub fn accumulated_durations(&self) -> IndexMap { @@ -209,6 +228,7 @@ make_enum_progress! { pub struct ProgressView { pub steps: Vec, pub percentage: f32, + pub embedder: EmbedderStatsView, } #[derive(Debug, Serialize, Clone, ToSchema)] @@ -220,6 +240,16 @@ pub struct ProgressStepView { pub total: u32, } +#[derive(Debug, Serialize, Clone, ToSchema)] +#[serde(rename_all = "camelCase")] +#[schema(rename_all = "camelCase")] +pub struct EmbedderStatsView { + #[serde(skip_serializing_if = "Option::is_none")] + pub last_error: Option, + pub request_count: u32, + pub error_count: u32, +} + /// Used when the name can change but it's still the same step. /// To avoid conflicts on the `TypeId`, create a unique type every time you use this step: /// ```text diff --git a/crates/milli/src/search/new/tests/integration.rs b/crates/milli/src/search/new/tests/integration.rs index 4a6cc9b90..e7634a4eb 100644 --- a/crates/milli/src/search/new/tests/integration.rs +++ b/crates/milli/src/search/new/tests/integration.rs @@ -44,7 +44,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { S("america") => vec![S("the united states")], }); builder.set_searchable_fields(vec![S("title"), S("description")]); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, None).unwrap(); wtxn.commit().unwrap(); // index documents diff --git a/crates/milli/src/test_index.rs b/crates/milli/src/test_index.rs index dfd570b96..634d45195 100644 --- a/crates/milli/src/test_index.rs +++ b/crates/milli/src/test_index.rs @@ -134,7 +134,7 @@ impl TempIndex { ) -> Result<(), crate::error::Error> { let mut builder = update::Settings::new(wtxn, &self.inner, &self.indexer_config); update(&mut builder); - builder.execute(drop, || false)?; + builder.execute(drop, || false, None)?; Ok(()) } diff --git a/crates/milli/src/update/index_documents/extract/extract_vector_points.rs b/crates/milli/src/update/index_documents/extract/extract_vector_points.rs index cb8c121ce..5e6bde53d 100644 --- a/crates/milli/src/update/index_documents/extract/extract_vector_points.rs +++ b/crates/milli/src/update/index_documents/extract/extract_vector_points.rs @@ -17,6 +17,7 @@ use crate::constants::RESERVED_VECTORS_FIELD_NAME; use crate::error::FaultSource; use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; use crate::index::IndexEmbeddingConfig; +use crate::progress::EmbedderStats; use crate::prompt::Prompt; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::settings::InnerIndexSettingsDiff; @@ -682,6 +683,7 @@ pub fn extract_embeddings( embedder: Arc, embedder_name: &str, possible_embedding_mistakes: &PossibleEmbeddingMistakes, + embedder_stats: Option>, unused_vectors_distribution: &UnusedVectorsDistribution, request_threads: &ThreadPoolNoAbort, ) -> Result>> { @@ -724,6 +726,7 @@ pub fn extract_embeddings( std::mem::replace(&mut chunks, Vec::with_capacity(n_chunks)), embedder_name, possible_embedding_mistakes, + embedder_stats.clone(), unused_vectors_distribution, request_threads, )?; @@ -746,6 +749,7 @@ pub fn extract_embeddings( std::mem::take(&mut chunks), embedder_name, possible_embedding_mistakes, + embedder_stats.clone(), unused_vectors_distribution, request_threads, )?; @@ -764,6 +768,7 @@ pub fn extract_embeddings( vec![std::mem::take(&mut current_chunk)], embedder_name, possible_embedding_mistakes, + embedder_stats, unused_vectors_distribution, request_threads, )?; @@ -783,10 +788,11 @@ fn embed_chunks( text_chunks: Vec>, embedder_name: &str, possible_embedding_mistakes: &PossibleEmbeddingMistakes, + embedder_stats: Option>, unused_vectors_distribution: &UnusedVectorsDistribution, request_threads: &ThreadPoolNoAbort, ) -> Result>> { - match embedder.embed_index(text_chunks, request_threads) { + match embedder.embed_index(text_chunks, request_threads, embedder_stats) { Ok(chunks) => Ok(chunks), Err(error) => { if let FaultSource::Bug = error.fault { diff --git a/crates/milli/src/update/index_documents/extract/mod.rs b/crates/milli/src/update/index_documents/extract/mod.rs index 8cd664a2f..020b48f2c 100644 --- a/crates/milli/src/update/index_documents/extract/mod.rs +++ b/crates/milli/src/update/index_documents/extract/mod.rs @@ -31,6 +31,7 @@ use self::extract_word_position_docids::extract_word_position_docids; use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters}; use super::{helpers, TypedChunk}; use crate::index::IndexEmbeddingConfig; +use crate::progress::EmbedderStats; use crate::update::settings::InnerIndexSettingsDiff; use crate::vector::error::PossibleEmbeddingMistakes; use crate::{FieldId, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; @@ -49,6 +50,7 @@ pub(crate) fn data_from_obkv_documents( settings_diff: Arc, max_positions_per_attributes: Option, possible_embedding_mistakes: Arc, + embedder_stats: Option>, ) -> Result<()> { let (original_pipeline_result, flattened_pipeline_result): (Result<_>, Result<_>) = rayon::join( || { @@ -62,6 +64,7 @@ pub(crate) fn data_from_obkv_documents( embedders_configs.clone(), settings_diff.clone(), possible_embedding_mistakes.clone(), + embedder_stats.clone(), ) }) .collect::>() @@ -231,6 +234,7 @@ fn send_original_documents_data( embedders_configs: Arc>, settings_diff: Arc, possible_embedding_mistakes: Arc, + embedder_stats: Option>, ) -> Result<()> { let original_documents_chunk = original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; @@ -270,6 +274,7 @@ fn send_original_documents_data( embedder.clone(), &embedder_name, &possible_embedding_mistakes, + embedder_stats.clone(), &unused_vectors_distribution, request_threads(), ) { diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index f547c68d4..fad43bd30 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -32,7 +32,7 @@ use crate::database_stats::DatabaseStats; use crate::documents::{obkv_to_object, DocumentsBatchReader}; use crate::error::{Error, InternalError}; use crate::index::{PrefixSearch, PrefixSettings}; -use crate::progress::Progress; +use crate::progress::{EmbedderStats, Progress}; pub use crate::update::index_documents::helpers::CursorClonableMmap; use crate::update::{ IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst, @@ -81,6 +81,7 @@ pub struct IndexDocuments<'t, 'i, 'a, FP, FA> { added_documents: u64, deleted_documents: u64, embedders: EmbeddingConfigs, + embedder_stats: Option>, } #[derive(Default, Debug, Clone)] @@ -103,6 +104,7 @@ where config: IndexDocumentsConfig, progress: FP, should_abort: FA, + embedder_stats: Option>, ) -> Result> { let transform = Some(Transform::new( wtxn, @@ -123,6 +125,7 @@ where added_documents: 0, deleted_documents: 0, embedders: Default::default(), + embedder_stats, }) } @@ -292,6 +295,7 @@ where // Run extraction pipeline in parallel. let mut modified_docids = RoaringBitmap::new(); + let embedder_stats = self.embedder_stats.clone(); pool.install(|| { let settings_diff_cloned = settings_diff.clone(); rayon::spawn(move || { @@ -326,7 +330,8 @@ where embedders_configs.clone(), settings_diff_cloned, max_positions_per_attributes, - Arc::new(possible_embedding_mistakes) + Arc::new(possible_embedding_mistakes), + embedder_stats.clone() ) }); diff --git a/crates/milli/src/update/new/extract/vectors/mod.rs b/crates/milli/src/update/new/extract/vectors/mod.rs index 43647e786..5b6559d74 100644 --- a/crates/milli/src/update/new/extract/vectors/mod.rs +++ b/crates/milli/src/update/new/extract/vectors/mod.rs @@ -450,7 +450,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> { return Err(crate::Error::UserError(crate::UserError::DocumentEmbeddingError(msg))); } - let res = match embedder.embed_index_ref(texts.as_slice(), threads) { + let res = match embedder.embed_index_ref(texts.as_slice(), threads, None) { Ok(embeddings) => { for (docid, embedding) in ids.into_iter().zip(embeddings) { sender.set_vector(*docid, embedder_id, embedding).unwrap(); diff --git a/crates/milli/src/update/settings.rs b/crates/milli/src/update/settings.rs index f396cd079..7c5a70aa3 100644 --- a/crates/milli/src/update/settings.rs +++ b/crates/milli/src/update/settings.rs @@ -27,6 +27,7 @@ use crate::index::{ DEFAULT_MIN_WORD_LEN_ONE_TYPO, DEFAULT_MIN_WORD_LEN_TWO_TYPOS, }; use crate::order_by_map::OrderByMap; +use crate::progress::EmbedderStats; use crate::prompt::{default_max_bytes, default_template_text, PromptData}; use crate::proximity::ProximityPrecision; use crate::update::index_documents::IndexDocumentsMethod; @@ -466,7 +467,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { #[tracing::instrument( level = "trace" - skip(self, progress_callback, should_abort, settings_diff), + skip(self, progress_callback, should_abort, settings_diff, embedder_stats), target = "indexing::documents" )] fn reindex( @@ -474,6 +475,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { progress_callback: &FP, should_abort: &FA, settings_diff: InnerIndexSettingsDiff, + embedder_stats: Option>, ) -> Result<()> where FP: Fn(UpdateIndexingStep) + Sync, @@ -505,6 +507,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { IndexDocumentsConfig::default(), &progress_callback, &should_abort, + embedder_stats, )?; indexing_builder.execute_raw(output)?; @@ -1355,7 +1358,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { } } - pub fn execute(mut self, progress_callback: FP, should_abort: FA) -> Result<()> + pub fn execute(mut self, progress_callback: FP, should_abort: FA, embedder_stats: Option>) -> Result<()> where FP: Fn(UpdateIndexingStep) + Sync, FA: Fn() -> bool + Sync, @@ -1413,7 +1416,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { ); if inner_settings_diff.any_reindexing_needed() { - self.reindex(&progress_callback, &should_abort, inner_settings_diff)?; + self.reindex(&progress_callback, &should_abort, inner_settings_diff, embedder_stats)?; } Ok(()) diff --git a/crates/milli/src/vector/composite.rs b/crates/milli/src/vector/composite.rs index 9c5992bd3..daec50e4b 100644 --- a/crates/milli/src/vector/composite.rs +++ b/crates/milli/src/vector/composite.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::time::Instant; use arroy::Distance; @@ -7,6 +8,7 @@ use super::{ hf, manual, ollama, openai, rest, DistributionShift, EmbedError, Embedding, EmbeddingCache, NewEmbedderError, }; +use crate::progress::EmbedderStats; use crate::ThreadPoolNoAbort; #[derive(Debug)] @@ -81,6 +83,7 @@ impl Embedder { "This is a sample text. It is meant to compare similarity.".into(), ], None, + None, ) .map_err(|error| NewEmbedderError::composite_test_embedding_failed(error, "search"))?; @@ -92,6 +95,7 @@ impl Embedder { "This is a sample text. It is meant to compare similarity.".into(), ], None, + None, ) .map_err(|error| { NewEmbedderError::composite_test_embedding_failed(error, "indexing") @@ -150,13 +154,14 @@ impl SubEmbedder { &self, texts: Vec, deadline: Option, + embedder_stats: Option>, ) -> std::result::Result, EmbedError> { match self { SubEmbedder::HuggingFace(embedder) => embedder.embed(texts), - SubEmbedder::OpenAi(embedder) => embedder.embed(&texts, deadline), - SubEmbedder::Ollama(embedder) => embedder.embed(&texts, deadline), + SubEmbedder::OpenAi(embedder) => embedder.embed(&texts, deadline, embedder_stats), + SubEmbedder::Ollama(embedder) => embedder.embed(&texts, deadline, embedder_stats), SubEmbedder::UserProvided(embedder) => embedder.embed(&texts), - SubEmbedder::Rest(embedder) => embedder.embed(texts, deadline), + SubEmbedder::Rest(embedder) => embedder.embed(texts, deadline, embedder_stats), } } @@ -164,18 +169,19 @@ impl SubEmbedder { &self, text: &str, deadline: Option, + embedder_stats: Option>, ) -> std::result::Result { match self { SubEmbedder::HuggingFace(embedder) => embedder.embed_one(text), SubEmbedder::OpenAi(embedder) => { - embedder.embed(&[text], deadline)?.pop().ok_or_else(EmbedError::missing_embedding) + embedder.embed(&[text], deadline, embedder_stats)?.pop().ok_or_else(EmbedError::missing_embedding) } SubEmbedder::Ollama(embedder) => { - embedder.embed(&[text], deadline)?.pop().ok_or_else(EmbedError::missing_embedding) + embedder.embed(&[text], deadline, embedder_stats)?.pop().ok_or_else(EmbedError::missing_embedding) } SubEmbedder::UserProvided(embedder) => embedder.embed_one(text), SubEmbedder::Rest(embedder) => embedder - .embed_ref(&[text], deadline)? + .embed_ref(&[text], deadline, embedder_stats)? .pop() .ok_or_else(EmbedError::missing_embedding), } @@ -188,13 +194,14 @@ impl SubEmbedder { &self, text_chunks: Vec>, threads: &ThreadPoolNoAbort, + embedder_stats: Option>, ) -> std::result::Result>, EmbedError> { match self { SubEmbedder::HuggingFace(embedder) => embedder.embed_index(text_chunks), - SubEmbedder::OpenAi(embedder) => embedder.embed_index(text_chunks, threads), - SubEmbedder::Ollama(embedder) => embedder.embed_index(text_chunks, threads), + SubEmbedder::OpenAi(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), + SubEmbedder::Ollama(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), SubEmbedder::UserProvided(embedder) => embedder.embed_index(text_chunks), - SubEmbedder::Rest(embedder) => embedder.embed_index(text_chunks, threads), + SubEmbedder::Rest(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), } } @@ -203,13 +210,14 @@ impl SubEmbedder { &self, texts: &[&str], threads: &ThreadPoolNoAbort, + embedder_stats: Option>, ) -> std::result::Result, EmbedError> { match self { SubEmbedder::HuggingFace(embedder) => embedder.embed_index_ref(texts), - SubEmbedder::OpenAi(embedder) => embedder.embed_index_ref(texts, threads), - SubEmbedder::Ollama(embedder) => embedder.embed_index_ref(texts, threads), + SubEmbedder::OpenAi(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), + SubEmbedder::Ollama(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), SubEmbedder::UserProvided(embedder) => embedder.embed_index_ref(texts), - SubEmbedder::Rest(embedder) => embedder.embed_index_ref(texts, threads), + SubEmbedder::Rest(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), } } diff --git a/crates/milli/src/vector/mod.rs b/crates/milli/src/vector/mod.rs index c2978f5db..124e17cff 100644 --- a/crates/milli/src/vector/mod.rs +++ b/crates/milli/src/vector/mod.rs @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use self::error::{EmbedError, NewEmbedderError}; -use crate::progress::Progress; +use crate::progress::{EmbedderStats, Progress}; use crate::prompt::{Prompt, PromptData}; use crate::ThreadPoolNoAbort; @@ -720,17 +720,17 @@ impl Embedder { let embedding = match self { Embedder::HuggingFace(embedder) => embedder.embed_one(text), Embedder::OpenAi(embedder) => { - embedder.embed(&[text], deadline)?.pop().ok_or_else(EmbedError::missing_embedding) + embedder.embed(&[text], deadline, None)?.pop().ok_or_else(EmbedError::missing_embedding) } Embedder::Ollama(embedder) => { - embedder.embed(&[text], deadline)?.pop().ok_or_else(EmbedError::missing_embedding) + embedder.embed(&[text], deadline, None)?.pop().ok_or_else(EmbedError::missing_embedding) } Embedder::UserProvided(embedder) => embedder.embed_one(text), Embedder::Rest(embedder) => embedder - .embed_ref(&[text], deadline)? + .embed_ref(&[text], deadline, None)? .pop() .ok_or_else(EmbedError::missing_embedding), - Embedder::Composite(embedder) => embedder.search.embed_one(text, deadline), + Embedder::Composite(embedder) => embedder.search.embed_one(text, deadline, None), }?; if let Some(cache) = self.cache() { @@ -747,14 +747,15 @@ impl Embedder { &self, text_chunks: Vec>, threads: &ThreadPoolNoAbort, + embedder_stats: Option>, ) -> std::result::Result>, EmbedError> { match self { Embedder::HuggingFace(embedder) => embedder.embed_index(text_chunks), - Embedder::OpenAi(embedder) => embedder.embed_index(text_chunks, threads), - Embedder::Ollama(embedder) => embedder.embed_index(text_chunks, threads), + Embedder::OpenAi(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), + Embedder::Ollama(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), Embedder::UserProvided(embedder) => embedder.embed_index(text_chunks), - Embedder::Rest(embedder) => embedder.embed_index(text_chunks, threads), - Embedder::Composite(embedder) => embedder.index.embed_index(text_chunks, threads), + Embedder::Rest(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), + Embedder::Composite(embedder) => embedder.index.embed_index(text_chunks, threads, embedder_stats), } } @@ -763,14 +764,15 @@ impl Embedder { &self, texts: &[&str], threads: &ThreadPoolNoAbort, + embedder_stats: Option>, ) -> std::result::Result, EmbedError> { match self { Embedder::HuggingFace(embedder) => embedder.embed_index_ref(texts), - Embedder::OpenAi(embedder) => embedder.embed_index_ref(texts, threads), - Embedder::Ollama(embedder) => embedder.embed_index_ref(texts, threads), + Embedder::OpenAi(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), + Embedder::Ollama(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), Embedder::UserProvided(embedder) => embedder.embed_index_ref(texts), - Embedder::Rest(embedder) => embedder.embed_index_ref(texts, threads), - Embedder::Composite(embedder) => embedder.index.embed_index_ref(texts, threads), + Embedder::Rest(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), + Embedder::Composite(embedder) => embedder.index.embed_index_ref(texts, threads, embedder_stats), } } diff --git a/crates/milli/src/vector/ollama.rs b/crates/milli/src/vector/ollama.rs index 8beae6205..b3ee925e6 100644 --- a/crates/milli/src/vector/ollama.rs +++ b/crates/milli/src/vector/ollama.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::time::Instant; use rayon::iter::{IntoParallelIterator as _, ParallelIterator as _}; @@ -7,6 +8,7 @@ use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErro use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions}; use super::{DistributionShift, EmbeddingCache, REQUEST_PARALLELISM}; use crate::error::FaultSource; +use crate::progress::EmbedderStats; use crate::vector::Embedding; use crate::ThreadPoolNoAbort; @@ -104,8 +106,9 @@ impl Embedder { &self, texts: &[S], deadline: Option, + embedder_stats: Option> ) -> Result, EmbedError> { - match self.rest_embedder.embed_ref(texts, deadline) { + match self.rest_embedder.embed_ref(texts, deadline, embedder_stats) { Ok(embeddings) => Ok(embeddings), Err(EmbedError { kind: EmbedErrorKind::RestOtherStatusCode(404, error), fault: _ }) => { Err(EmbedError::ollama_model_not_found(error)) @@ -118,15 +121,16 @@ impl Embedder { &self, text_chunks: Vec>, threads: &ThreadPoolNoAbort, + embedder_stats: Option>, ) -> Result>, EmbedError> { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { - text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect() + text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None, embedder_stats.clone())).collect() } else { threads .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect() + text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None, embedder_stats.clone())).collect() }) .map_err(|error| EmbedError { kind: EmbedErrorKind::PanicInThreadPool(error), @@ -139,13 +143,14 @@ impl Embedder { &self, texts: &[&str], threads: &ThreadPoolNoAbort, + embedder_stats: Option> ) -> Result>, EmbedError> { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { let embeddings: Result>, _> = texts .chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed(chunk, None)) + .map(move |chunk| self.embed(chunk, None, embedder_stats.clone())) .collect(); let embeddings = embeddings?; @@ -155,7 +160,7 @@ impl Embedder { .install(move || { let embeddings: Result>, _> = texts .par_chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed(chunk, None)) + .map(move |chunk| self.embed(chunk, None, embedder_stats.clone())) .collect(); let embeddings = embeddings?; diff --git a/crates/milli/src/vector/openai.rs b/crates/milli/src/vector/openai.rs index df29f6916..384abe880 100644 --- a/crates/milli/src/vector/openai.rs +++ b/crates/milli/src/vector/openai.rs @@ -1,4 +1,5 @@ use std::fmt; +use std::sync::Arc; use std::time::Instant; use ordered_float::OrderedFloat; @@ -9,6 +10,7 @@ use super::error::{EmbedError, NewEmbedderError}; use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions}; use super::{DistributionShift, EmbeddingCache, REQUEST_PARALLELISM}; use crate::error::FaultSource; +use crate::progress::EmbedderStats; use crate::vector::error::EmbedErrorKind; use crate::vector::Embedding; use crate::ThreadPoolNoAbort; @@ -215,8 +217,9 @@ impl Embedder { &self, texts: &[S], deadline: Option, + embedder_stats: Option>, ) -> Result, EmbedError> { - match self.rest_embedder.embed_ref(texts, deadline) { + match self.rest_embedder.embed_ref(texts, deadline, embedder_stats) { Ok(embeddings) => Ok(embeddings), Err(EmbedError { kind: EmbedErrorKind::RestBadRequest(error, _), fault: _ }) => { tracing::warn!(error=?error, "OpenAI: received `BAD_REQUEST`. Input was maybe too long, retrying on tokenized version. For best performance, limit the size of your document template."); @@ -238,7 +241,7 @@ impl Embedder { let encoded = self.tokenizer.encode_ordinary(text); let len = encoded.len(); if len < max_token_count { - all_embeddings.append(&mut self.rest_embedder.embed_ref(&[text], deadline)?); + all_embeddings.append(&mut self.rest_embedder.embed_ref(&[text], deadline, None)?); continue; } @@ -255,15 +258,16 @@ impl Embedder { &self, text_chunks: Vec>, threads: &ThreadPoolNoAbort, + embedder_stats: Option>, ) -> Result>, EmbedError> { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { - text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect() + text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None, embedder_stats.clone())).collect() } else { threads .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect() + text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None, embedder_stats.clone())).collect() }) .map_err(|error| EmbedError { kind: EmbedErrorKind::PanicInThreadPool(error), @@ -276,13 +280,14 @@ impl Embedder { &self, texts: &[&str], threads: &ThreadPoolNoAbort, + embedder_stats: Option>, ) -> Result>, EmbedError> { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { let embeddings: Result>, _> = texts .chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed(chunk, None)) + .map(move |chunk| self.embed(chunk, None, embedder_stats.clone())) .collect(); let embeddings = embeddings?; Ok(embeddings.into_iter().flatten().collect()) @@ -291,7 +296,7 @@ impl Embedder { .install(move || { let embeddings: Result>, _> = texts .par_chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed(chunk, None)) + .map(move |chunk| self.embed(chunk, None, embedder_stats.clone())) .collect(); let embeddings = embeddings?; diff --git a/crates/milli/src/vector/rest.rs b/crates/milli/src/vector/rest.rs index b87ac9f77..fc0ff308b 100644 --- a/crates/milli/src/vector/rest.rs +++ b/crates/milli/src/vector/rest.rs @@ -1,4 +1,5 @@ use std::collections::BTreeMap; +use std::sync::Arc; use std::time::Instant; use deserr::Deserr; @@ -14,6 +15,7 @@ use super::{ }; use crate::error::FaultSource; use crate::ThreadPoolNoAbort; +use crate::progress::EmbedderStats; // retrying in case of failure pub struct Retry { @@ -168,19 +170,21 @@ impl Embedder { &self, texts: Vec, deadline: Option, + embedder_stats: Option>, ) -> Result, EmbedError> { - embed(&self.data, texts.as_slice(), texts.len(), Some(self.dimensions), deadline) + embed(&self.data, texts.as_slice(), texts.len(), Some(self.dimensions), deadline, embedder_stats) } pub fn embed_ref( &self, texts: &[S], deadline: Option, + embedder_stats: Option>, ) -> Result, EmbedError> where S: AsRef + Serialize, { - embed(&self.data, texts, texts.len(), Some(self.dimensions), deadline) + embed(&self.data, texts, texts.len(), Some(self.dimensions), deadline, embedder_stats) } pub fn embed_tokens( @@ -188,7 +192,7 @@ impl Embedder { tokens: &[u32], deadline: Option, ) -> Result { - let mut embeddings = embed(&self.data, tokens, 1, Some(self.dimensions), deadline)?; + let mut embeddings = embed(&self.data, tokens, 1, Some(self.dimensions), deadline, None)?; // unwrap: guaranteed that embeddings.len() == 1, otherwise the previous line terminated in error Ok(embeddings.pop().unwrap()) } @@ -197,15 +201,16 @@ impl Embedder { &self, text_chunks: Vec>, threads: &ThreadPoolNoAbort, + embedder_stats: Option>, ) -> Result>, EmbedError> { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { - text_chunks.into_iter().map(move |chunk| self.embed(chunk, None)).collect() + text_chunks.into_iter().map(move |chunk| self.embed(chunk, None, embedder_stats.clone())).collect() } else { threads .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect() + text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None, embedder_stats.clone())).collect() }) .map_err(|error| EmbedError { kind: EmbedErrorKind::PanicInThreadPool(error), @@ -218,13 +223,14 @@ impl Embedder { &self, texts: &[&str], threads: &ThreadPoolNoAbort, + embedder_stats: Option> ) -> Result, EmbedError> { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { let embeddings: Result>, _> = texts .chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed_ref(chunk, None)) + .map(move |chunk| self.embed_ref(chunk, None, embedder_stats.clone())) .collect(); let embeddings = embeddings?; @@ -234,7 +240,7 @@ impl Embedder { .install(move || { let embeddings: Result>, _> = texts .par_chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed_ref(chunk, None)) + .map(move |chunk| self.embed_ref(chunk, None, embedder_stats.clone())) .collect(); let embeddings = embeddings?; @@ -272,7 +278,7 @@ impl Embedder { } fn infer_dimensions(data: &EmbedderData) -> Result { - let v = embed(data, ["test"].as_slice(), 1, None, None) + let v = embed(data, ["test"].as_slice(), 1, None, None, None) .map_err(NewEmbedderError::could_not_determine_dimension)?; // unwrap: guaranteed that v.len() == 1, otherwise the previous line terminated in error Ok(v.first().unwrap().len()) @@ -284,6 +290,7 @@ fn embed( expected_count: usize, expected_dimension: Option, deadline: Option, + embedder_stats: Option>, ) -> Result, EmbedError> where S: Serialize, @@ -302,6 +309,9 @@ where let body = data.request.inject_texts(inputs); for attempt in 0..10 { + if let Some(embedder_stats) = &embedder_stats { + embedder_stats.as_ref().total_requests.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } let response = request.clone().send_json(&body); let result = check_response(response, data.configuration_source).and_then(|response| { response_to_embedding(response, data, expected_count, expected_dimension) @@ -311,6 +321,12 @@ where Ok(response) => return Ok(response), Err(retry) => { tracing::warn!("Failed: {}", retry.error); + if let Some(embedder_stats) = &embedder_stats { + if let Ok(mut errors) = embedder_stats.errors.write() { + errors.0 = Some(retry.error.to_string()); + errors.1 += 1; + } + } if let Some(deadline) = deadline { let now = std::time::Instant::now(); if now > deadline { @@ -336,12 +352,26 @@ where std::thread::sleep(retry_duration); } + if let Some(embedder_stats) = &embedder_stats { + embedder_stats.as_ref().total_requests.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } let response = request.send_json(&body); - let result = check_response(response, data.configuration_source); - result.map_err(Retry::into_error).and_then(|response| { + let result = check_response(response, data.configuration_source).and_then(|response| { response_to_embedding(response, data, expected_count, expected_dimension) - .map_err(Retry::into_error) - }) + }); + + match result { + Ok(response) => Ok(response), + Err(retry) => { + if let Some(embedder_stats) = &embedder_stats { + if let Ok(mut errors) = embedder_stats.errors.write() { + errors.0 = Some(retry.error.to_string()); + errors.1 += 1; + } + } + Err(retry.into_error()) + } + } } fn check_response( diff --git a/crates/milli/tests/search/distinct.rs b/crates/milli/tests/search/distinct.rs index fc890dfe8..55e43c8fa 100644 --- a/crates/milli/tests/search/distinct.rs +++ b/crates/milli/tests/search/distinct.rs @@ -19,7 +19,7 @@ macro_rules! test_distinct { let config = milli::update::IndexerConfig::default(); let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_distinct_field(S(stringify!($distinct))); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, None).unwrap(); wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); diff --git a/crates/milli/tests/search/facet_distribution.rs b/crates/milli/tests/search/facet_distribution.rs index 8934cbea4..588662735 100644 --- a/crates/milli/tests/search/facet_distribution.rs +++ b/crates/milli/tests/search/facet_distribution.rs @@ -25,7 +25,7 @@ fn test_facet_distribution_with_no_facet_values() { FilterableAttributesRule::Field(S("genres")), FilterableAttributesRule::Field(S("tags")), ]); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, None).unwrap(); wtxn.commit().unwrap(); // index documents diff --git a/crates/milli/tests/search/mod.rs b/crates/milli/tests/search/mod.rs index 906956716..1e0c24608 100644 --- a/crates/milli/tests/search/mod.rs +++ b/crates/milli/tests/search/mod.rs @@ -63,7 +63,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { S("america") => vec![S("the united states")], }); builder.set_searchable_fields(vec![S("title"), S("description")]); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, None).unwrap(); wtxn.commit().unwrap(); // index documents diff --git a/crates/milli/tests/search/phrase_search.rs b/crates/milli/tests/search/phrase_search.rs index b7f792bfc..c5a95f7cd 100644 --- a/crates/milli/tests/search/phrase_search.rs +++ b/crates/milli/tests/search/phrase_search.rs @@ -10,7 +10,7 @@ fn set_stop_words(index: &Index, stop_words: &[&str]) { let mut builder = Settings::new(&mut wtxn, index, &config); let stop_words = stop_words.iter().map(|s| s.to_string()).collect(); builder.set_stop_words(stop_words); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, None).unwrap(); wtxn.commit().unwrap(); } diff --git a/crates/milli/tests/search/query_criteria.rs b/crates/milli/tests/search/query_criteria.rs index 1acc89484..b7614c215 100644 --- a/crates/milli/tests/search/query_criteria.rs +++ b/crates/milli/tests/search/query_criteria.rs @@ -236,7 +236,7 @@ fn criteria_mixup() { let mut wtxn = index.write_txn().unwrap(); let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_criteria(criteria.clone()); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, None).unwrap(); wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); @@ -276,7 +276,7 @@ fn criteria_ascdesc() { S("name"), S("age"), }); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, None).unwrap(); wtxn.commit().unwrap(); let mut wtxn = index.write_txn().unwrap(); @@ -358,7 +358,7 @@ fn criteria_ascdesc() { let mut wtxn = index.write_txn().unwrap(); let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_criteria(vec![criterion.clone()]); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, None).unwrap(); wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); diff --git a/crates/milli/tests/search/typo_tolerance.rs b/crates/milli/tests/search/typo_tolerance.rs index 3c0717063..bf9a730c9 100644 --- a/crates/milli/tests/search/typo_tolerance.rs +++ b/crates/milli/tests/search/typo_tolerance.rs @@ -46,7 +46,7 @@ fn test_typo_tolerance_one_typo() { let config = IndexerConfig::default(); let mut builder = Settings::new(&mut txn, &index, &config); builder.set_min_word_len_one_typo(4); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, None).unwrap(); // typo is now supported for 4 letters words let mut search = Search::new(&txn, &index); @@ -92,7 +92,7 @@ fn test_typo_tolerance_two_typo() { let config = IndexerConfig::default(); let mut builder = Settings::new(&mut txn, &index, &config); builder.set_min_word_len_two_typos(7); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, None).unwrap(); // typo is now supported for 4 letters words let mut search = Search::new(&txn, &index); @@ -180,7 +180,7 @@ fn test_typo_disabled_on_word() { // `zealand` doesn't allow typos anymore exact_words.insert("zealand".to_string()); builder.set_exact_words(exact_words); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, None).unwrap(); let mut search = Search::new(&txn, &index); search.query("zealand"); @@ -218,7 +218,7 @@ fn test_disable_typo_on_attribute() { let mut builder = Settings::new(&mut txn, &index, &config); // disable typos on `description` builder.set_exact_attributes(vec!["description".to_string()].into_iter().collect()); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, None).unwrap(); let mut search = Search::new(&txn, &index); search.query("antebelum");