diff --git a/.gitignore b/.gitignore index 07453a58f..fc24b8306 100644 --- a/.gitignore +++ b/.gitignore @@ -18,5 +18,8 @@ ## ... unreviewed *.snap.new +# Database snapshot +crates/meilisearch/db.snapshot + # Fuzzcheck data for the facet indexing fuzz test crates/milli/fuzz/update::facet::incremental::fuzz::fuzz/ diff --git a/crates/benchmarks/benches/indexing.rs b/crates/benchmarks/benches/indexing.rs index 9199c3877..3afad8ee5 100644 --- a/crates/benchmarks/benches/indexing.rs +++ b/crates/benchmarks/benches/indexing.rs @@ -65,7 +65,7 @@ fn setup_settings<'t>( let sortable_fields = sortable_fields.iter().map(|s| s.to_string()).collect(); builder.set_sortable_fields(sortable_fields); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, Default::default()).unwrap(); } fn setup_index_with_settings( @@ -169,6 +169,7 @@ fn indexing_songs_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -235,6 +236,7 @@ fn reindexing_songs_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -279,6 +281,7 @@ fn reindexing_songs_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -347,6 +350,7 @@ fn deleting_songs_in_batches_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -423,6 +427,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -467,6 +472,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -507,6 +513,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -574,6 +581,7 @@ fn indexing_songs_without_faceted_numbers(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -640,6 +648,7 @@ fn indexing_songs_without_faceted_fields(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -706,6 +715,7 @@ fn indexing_wiki(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -771,6 +781,7 @@ fn reindexing_wiki(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -815,6 +826,7 @@ fn reindexing_wiki(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -882,6 +894,7 @@ fn deleting_wiki_in_batches_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -958,6 +971,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -1003,6 +1017,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -1044,6 +1059,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -1110,6 +1126,7 @@ fn indexing_movies_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -1175,6 +1192,7 @@ fn reindexing_movies_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -1219,6 +1237,7 @@ fn reindexing_movies_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -1286,6 +1305,7 @@ fn deleting_movies_in_batches_default(c: &mut Criterion) { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -1334,6 +1354,7 @@ fn delete_documents_from_ids(index: Index, document_ids_to_delete: Vec Index { (conf.configure)(&mut builder); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, Default::default()).unwrap(); wtxn.commit().unwrap(); let config = IndexerConfig::default(); @@ -128,6 +128,7 @@ pub fn base_setup(conf: &Conf) -> Index { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); diff --git a/crates/dump/src/lib.rs b/crates/dump/src/lib.rs index 285818a87..a84ec4ba5 100644 --- a/crates/dump/src/lib.rs +++ b/crates/dump/src/lib.rs @@ -329,6 +329,7 @@ pub(crate) mod test { write_channel_congestion: None, internal_database_sizes: Default::default(), }, + embedder_stats: Default::default(), enqueued_at: Some(BatchEnqueuedAt { earliest: datetime!(2022-11-11 0:00 UTC), oldest: datetime!(2022-11-11 0:00 UTC), diff --git a/crates/fuzzers/src/bin/fuzz-indexing.rs b/crates/fuzzers/src/bin/fuzz-indexing.rs index 4df989b51..0632b7846 100644 --- a/crates/fuzzers/src/bin/fuzz-indexing.rs +++ b/crates/fuzzers/src/bin/fuzz-indexing.rs @@ -144,6 +144,7 @@ fn main() { embedders, &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index d01548319..a5bb1ea56 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -343,6 +343,7 @@ pub fn snapshot_batch(batch: &Batch) -> String { uid, details, stats, + embedder_stats, started_at, finished_at, progress: _, @@ -366,6 +367,12 @@ pub fn snapshot_batch(batch: &Batch) -> String { snap.push_str(&format!("uid: {uid}, ")); snap.push_str(&format!("details: {}, ", serde_json::to_string(details).unwrap())); snap.push_str(&format!("stats: {}, ", serde_json::to_string(&stats).unwrap())); + if !embedder_stats.skip_serializing() { + snap.push_str(&format!( + "embedder stats: {}, ", + serde_json::to_string(&embedder_stats).unwrap() + )); + } snap.push_str(&format!("stop reason: {}, ", serde_json::to_string(&stop_reason).unwrap())); snap.push('}'); snap diff --git a/crates/index-scheduler/src/queue/batches.rs b/crates/index-scheduler/src/queue/batches.rs index b5b63e1d7..b96f65836 100644 --- a/crates/index-scheduler/src/queue/batches.rs +++ b/crates/index-scheduler/src/queue/batches.rs @@ -179,6 +179,7 @@ impl BatchQueue { progress: None, details: batch.details, stats: batch.stats, + embedder_stats: batch.embedder_stats.as_ref().into(), started_at: batch.started_at, finished_at: batch.finished_at, enqueued_at: batch.enqueued_at, diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index c349f90ad..5261692b6 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -162,8 +162,13 @@ impl IndexScheduler { .set_currently_updating_index(Some((index_uid.clone(), index.clone()))); let pre_commit_dabases_sizes = index.database_sizes(&index_wtxn)?; - let (tasks, congestion) = - self.apply_index_operation(&mut index_wtxn, &index, op, &progress)?; + let (tasks, congestion) = self.apply_index_operation( + &mut index_wtxn, + &index, + op, + &progress, + current_batch.embedder_stats.clone(), + )?; { progress.update_progress(FinalizingIndexStep::Committing); @@ -238,10 +243,12 @@ impl IndexScheduler { ); builder.set_primary_key(primary_key); let must_stop_processing = self.scheduler.must_stop_processing.clone(); + builder .execute( |indexing_step| tracing::debug!(update = ?indexing_step), || must_stop_processing.get(), + current_batch.embedder_stats.clone(), ) .map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?; index_wtxn.commit()?; diff --git a/crates/index-scheduler/src/scheduler/process_index_operation.rs b/crates/index-scheduler/src/scheduler/process_index_operation.rs index 093c6209d..4c0db9ce4 100644 --- a/crates/index-scheduler/src/scheduler/process_index_operation.rs +++ b/crates/index-scheduler/src/scheduler/process_index_operation.rs @@ -1,8 +1,10 @@ +use std::sync::Arc; + use bumpalo::collections::CollectIn; use bumpalo::Bump; use meilisearch_types::heed::RwTxn; use meilisearch_types::milli::documents::PrimaryKey; -use meilisearch_types::milli::progress::Progress; +use meilisearch_types::milli::progress::{EmbedderStats, Progress}; use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction}; use meilisearch_types::milli::update::DocumentAdditionResult; use meilisearch_types::milli::{self, ChannelCongestion, Filter}; @@ -24,7 +26,7 @@ impl IndexScheduler { /// The list of processed tasks. #[tracing::instrument( level = "trace", - skip(self, index_wtxn, index, progress), + skip(self, index_wtxn, index, progress, embedder_stats), target = "indexing::scheduler" )] pub(crate) fn apply_index_operation<'i>( @@ -33,6 +35,7 @@ impl IndexScheduler { index: &'i Index, operation: IndexOperation, progress: &Progress, + embedder_stats: Arc, ) -> Result<(Vec, Option)> { let indexer_alloc = Bump::new(); let started_processing_at = std::time::Instant::now(); @@ -177,6 +180,7 @@ impl IndexScheduler { embedders, &|| must_stop_processing.get(), progress, + &embedder_stats, ) .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?, ); @@ -288,6 +292,7 @@ impl IndexScheduler { embedders, &|| must_stop_processing.get(), progress, + &embedder_stats, ) .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?, ); @@ -436,6 +441,7 @@ impl IndexScheduler { embedders, &|| must_stop_processing.get(), progress, + &embedder_stats, ) .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?, ); @@ -472,6 +478,7 @@ impl IndexScheduler { .execute( |indexing_step| tracing::debug!(update = ?indexing_step), || must_stop_processing.get(), + embedder_stats, ) .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; @@ -491,6 +498,7 @@ impl IndexScheduler { tasks: cleared_tasks, }, progress, + embedder_stats.clone(), )?; let (settings_tasks, _congestion) = self.apply_index_operation( @@ -498,6 +506,7 @@ impl IndexScheduler { index, IndexOperation::Settings { index_uid, settings, tasks: settings_tasks }, progress, + embedder_stats, )?; let mut tasks = settings_tasks; diff --git a/crates/index-scheduler/src/utils.rs b/crates/index-scheduler/src/utils.rs index 67e8fc090..ca37065ec 100644 --- a/crates/index-scheduler/src/utils.rs +++ b/crates/index-scheduler/src/utils.rs @@ -1,7 +1,9 @@ //! Utility functions on the DBs. Mainly getter and setters. +use crate::milli::progress::EmbedderStats; use std::collections::{BTreeSet, HashSet}; use std::ops::Bound; +use std::sync::Arc; use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchId, BatchStats}; use meilisearch_types::heed::{Database, RoTxn, RwTxn}; @@ -27,6 +29,7 @@ pub struct ProcessingBatch { pub uid: BatchId, pub details: DetailsView, pub stats: BatchStats, + pub embedder_stats: Arc, pub statuses: HashSet, pub kinds: HashSet, @@ -48,6 +51,7 @@ impl ProcessingBatch { uid, details: DetailsView::default(), stats: BatchStats::default(), + embedder_stats: Default::default(), statuses, kinds: HashSet::default(), @@ -146,6 +150,7 @@ impl ProcessingBatch { progress: None, details: self.details.clone(), stats: self.stats.clone(), + embedder_stats: self.embedder_stats.as_ref().into(), started_at: self.started_at, finished_at: self.finished_at, enqueued_at: self.enqueued_at, diff --git a/crates/meilisearch-types/src/batch_view.rs b/crates/meilisearch-types/src/batch_view.rs index f0a5f364b..297b10ba1 100644 --- a/crates/meilisearch-types/src/batch_view.rs +++ b/crates/meilisearch-types/src/batch_view.rs @@ -3,7 +3,7 @@ use serde::Serialize; use time::{Duration, OffsetDateTime}; use utoipa::ToSchema; -use crate::batches::{Batch, BatchId, BatchStats}; +use crate::batches::{Batch, BatchId, BatchStats, EmbedderStatsView}; use crate::task_view::DetailsView; use crate::tasks::serialize_duration; @@ -14,7 +14,7 @@ pub struct BatchView { pub uid: BatchId, pub progress: Option, pub details: DetailsView, - pub stats: BatchStats, + pub stats: BatchStatsView, #[serde(serialize_with = "serialize_duration", default)] pub duration: Option, #[serde(with = "time::serde::rfc3339", default)] @@ -25,13 +25,26 @@ pub struct BatchView { pub batch_strategy: String, } +#[derive(Debug, Clone, Serialize, ToSchema)] +#[serde(rename_all = "camelCase")] +#[schema(rename_all = "camelCase")] +pub struct BatchStatsView { + #[serde(flatten)] + pub stats: BatchStats, + #[serde(skip_serializing_if = "EmbedderStatsView::skip_serializing", default)] + pub embedder_requests: EmbedderStatsView, +} + impl BatchView { pub fn from_batch(batch: &Batch) -> Self { Self { uid: batch.uid, progress: batch.progress.clone(), details: batch.details.clone(), - stats: batch.stats.clone(), + stats: BatchStatsView { + stats: batch.stats.clone(), + embedder_requests: batch.embedder_stats.clone(), + }, duration: batch.finished_at.map(|finished_at| finished_at - batch.started_at), started_at: batch.started_at, finished_at: batch.finished_at, diff --git a/crates/meilisearch-types/src/batches.rs b/crates/meilisearch-types/src/batches.rs index 4d40189db..e1cc2b7c7 100644 --- a/crates/meilisearch-types/src/batches.rs +++ b/crates/meilisearch-types/src/batches.rs @@ -1,6 +1,6 @@ use std::collections::BTreeMap; -use milli::progress::ProgressView; +use milli::progress::{EmbedderStats, ProgressView}; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use utoipa::ToSchema; @@ -19,6 +19,8 @@ pub struct Batch { pub progress: Option, pub details: DetailsView, pub stats: BatchStats, + #[serde(skip_serializing_if = "EmbedderStatsView::skip_serializing", default)] + pub embedder_stats: EmbedderStatsView, #[serde(with = "time::serde::rfc3339")] pub started_at: OffsetDateTime, @@ -43,6 +45,7 @@ impl PartialEq for Batch { progress, details, stats, + embedder_stats, started_at, finished_at, enqueued_at, @@ -53,6 +56,7 @@ impl PartialEq for Batch { && progress.is_none() == other.progress.is_none() && details == &other.details && stats == &other.stats + && embedder_stats == &other.embedder_stats && started_at == &other.started_at && finished_at == &other.finished_at && enqueued_at == &other.enqueued_at @@ -83,3 +87,30 @@ pub struct BatchStats { #[serde(default, skip_serializing_if = "serde_json::Map::is_empty")] pub internal_database_sizes: serde_json::Map, } + +#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "camelCase")] +#[schema(rename_all = "camelCase")] +pub struct EmbedderStatsView { + pub total: usize, + pub failed: usize, + #[serde(skip_serializing_if = "Option::is_none", default)] + pub last_error: Option, +} + +impl From<&EmbedderStats> for EmbedderStatsView { + fn from(stats: &EmbedderStats) -> Self { + let errors = stats.errors.read().unwrap_or_else(|p| p.into_inner()); + Self { + total: stats.total_count.load(std::sync::atomic::Ordering::Relaxed), + failed: errors.1 as usize, + last_error: errors.0.clone(), + } + } +} + +impl EmbedderStatsView { + pub fn skip_serializing(&self) -> bool { + self.total == 0 && self.failed == 0 && self.last_error.is_none() + } +} diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 1e0c205d0..c902f4e60 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -37,6 +37,7 @@ use index_scheduler::{IndexScheduler, IndexSchedulerOptions}; use meilisearch_auth::{open_auth_store_env, AuthController}; use meilisearch_types::milli::constants::VERSION_MAJOR; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; +use meilisearch_types::milli::progress::EmbedderStats; use meilisearch_types::milli::update::{ default_thread_pool_and_threads, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, }; @@ -542,8 +543,12 @@ fn import_dump( tracing::info!("Importing the settings."); let settings = index_reader.settings()?; apply_settings_to_builder(&settings, &mut builder); - builder - .execute(|indexing_step| tracing::debug!("update: {:?}", indexing_step), || false)?; + let embedder_stats: Arc = Default::default(); + builder.execute( + |indexing_step| tracing::debug!("update: {:?}", indexing_step), + || false, + embedder_stats.clone(), + )?; // 4.3 Import the documents. // 4.3.1 We need to recreate the grenad+obkv format accepted by the index. @@ -574,6 +579,7 @@ fn import_dump( }, |indexing_step| tracing::trace!("update: {:?}", indexing_step), || false, + &embedder_stats, )?; let builder = builder.with_embedders(embedders); diff --git a/crates/meilisearch/tests/vector/rest.rs b/crates/meilisearch/tests/vector/rest.rs index 82fc71b26..6e781e525 100644 --- a/crates/meilisearch/tests/vector/rest.rs +++ b/crates/meilisearch/tests/vector/rest.rs @@ -1,7 +1,10 @@ use std::collections::BTreeMap; +use std::sync::atomic::AtomicUsize; use meili_snap::{json_string, snapshot}; use reqwest::IntoUrl; +use std::time::Duration; +use tokio::sync::mpsc; use wiremock::matchers::{method, path}; use wiremock::{Mock, MockServer, Request, ResponseTemplate}; @@ -334,6 +337,41 @@ async fn create_mock_raw() -> (MockServer, Value) { (mock_server, embedder_settings) } +async fn create_faulty_mock_raw(sender: mpsc::Sender<()>) -> (MockServer, Value) { + let mock_server = MockServer::start().await; + let count = AtomicUsize::new(0); + + Mock::given(method("POST")) + .and(path("/")) + .respond_with(move |_req: &Request| { + let count = count.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + + if count >= 5 { + let _ = sender.try_send(()); + ResponseTemplate::new(500) + .set_delay(Duration::from_secs(u64::MAX)) // Make the response hang forever + .set_body_string("Service Unavailable") + } else { + ResponseTemplate::new(500).set_body_string("Service Unavailable") + } + }) + .mount(&mock_server) + .await; + + let url = mock_server.uri(); + + let embedder_settings = json!({ + "source": "rest", + "url": url, + "dimensions": 3, + "request": "{{text}}", + "response": "{{embedding}}", + "documentTemplate": "{{doc.name}}" + }); + + (mock_server, embedder_settings) +} + pub async fn post(url: T, text: &str) -> reqwest::Result { reqwest::Client::builder().build()?.post(url).json(&json!(text)).send().await } @@ -2111,3 +2149,70 @@ async fn searchable_reindex() { } "###); } + +#[actix_rt::test] +async fn last_error_stats() { + let (sender, mut receiver) = mpsc::channel(10); + let (_mock, setting) = create_faulty_mock_raw(sender).await; + let server = get_server_vector().await; + let index = server.index("doggo"); + + let (response, code) = index + .update_settings(json!({ + "embedders": { + "rest": setting, + }, + })) + .await; + snapshot!(code, @"202 Accepted"); + let task = server.wait_task(response.uid()).await; + snapshot!(task["status"], @r###""succeeded""###); + let documents = json!([ + {"id": 0, "name": "will_return_500"}, + {"id": 1, "name": "will_error"}, + {"id": 2, "name": "must_error"}, + ]); + let (_value, code) = index.add_documents(documents, None).await; + snapshot!(code, @"202 Accepted"); + + // The task will eventually fail, so let's not wait for it. + // Let's just wait for the server's signal + receiver.recv().await; + + let (response, _code) = index.filtered_batches(&[], &[], &[]).await; + snapshot!(json_string!(response["results"][0], { + ".progress" => "[ignored]", + ".stats.embedderRequests.total" => "[ignored]", + ".startedAt" => "[ignored]" + }), @r#" + { + "uid": 1, + "progress": "[ignored]", + "details": { + "receivedDocuments": 3, + "indexedDocuments": null + }, + "stats": { + "totalNbTasks": 1, + "status": { + "processing": 1 + }, + "types": { + "documentAdditionOrUpdate": 1 + }, + "indexUids": { + "doggo": 1 + }, + "embedderRequests": { + "total": "[ignored]", + "failed": 5, + "lastError": "runtime error: received internal error HTTP 500 from embedding server\n - server replied with `Service Unavailable`" + } + }, + "duration": null, + "startedAt": "[ignored]", + "finishedAt": null, + "batchStrategy": "batched all enqueued tasks" + } + "#); +} diff --git a/crates/milli/src/progress.rs b/crates/milli/src/progress.rs index fa651e17f..61c61cd49 100644 --- a/crates/milli/src/progress.rs +++ b/crates/milli/src/progress.rs @@ -1,7 +1,7 @@ use std::any::TypeId; use std::borrow::Cow; use std::marker::PhantomData; -use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; @@ -22,6 +22,25 @@ pub struct Progress { steps: Arc>, } +#[derive(Default)] +pub struct EmbedderStats { + pub errors: Arc, u32)>>, + pub total_count: AtomicUsize, +} + +impl std::fmt::Debug for EmbedderStats { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let guard = self.errors.read().unwrap_or_else(|p| p.into_inner()); + let (error, count) = (guard.0.clone(), guard.1); + std::mem::drop(guard); + f.debug_struct("EmbedderStats") + .field("last_error", &error) + .field("total_count", &self.total_count.load(Ordering::Relaxed)) + .field("error_count", &count) + .finish() + } +} + #[derive(Default)] struct InnerProgress { /// The hierarchy of steps. diff --git a/crates/milli/src/search/new/tests/integration.rs b/crates/milli/src/search/new/tests/integration.rs index 4a6cc9b90..36917c10e 100644 --- a/crates/milli/src/search/new/tests/integration.rs +++ b/crates/milli/src/search/new/tests/integration.rs @@ -44,7 +44,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { S("america") => vec![S("the united states")], }); builder.set_searchable_fields(vec![S("title"), S("description")]); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, Default::default()).unwrap(); wtxn.commit().unwrap(); // index documents @@ -95,6 +95,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { embedders, &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); diff --git a/crates/milli/src/test_index.rs b/crates/milli/src/test_index.rs index dfd570b96..d218bb3a6 100644 --- a/crates/milli/src/test_index.rs +++ b/crates/milli/src/test_index.rs @@ -103,6 +103,7 @@ impl TempIndex { embedders, &|| false, &Progress::default(), + &Default::default(), ) }) .unwrap()?; @@ -134,7 +135,7 @@ impl TempIndex { ) -> Result<(), crate::error::Error> { let mut builder = update::Settings::new(wtxn, &self.inner, &self.indexer_config); update(&mut builder); - builder.execute(drop, || false)?; + builder.execute(drop, || false, Default::default())?; Ok(()) } @@ -185,6 +186,7 @@ impl TempIndex { embedders, &|| false, &Progress::default(), + &Default::default(), ) }) .unwrap()?; @@ -259,6 +261,7 @@ fn aborting_indexation() { embedders, &|| should_abort.load(Relaxed), &Progress::default(), + &Default::default(), ) }) .unwrap() diff --git a/crates/milli/src/update/index_documents/extract/extract_vector_points.rs b/crates/milli/src/update/index_documents/extract/extract_vector_points.rs index cb8c121ce..e1981a615 100644 --- a/crates/milli/src/update/index_documents/extract/extract_vector_points.rs +++ b/crates/milli/src/update/index_documents/extract/extract_vector_points.rs @@ -17,6 +17,7 @@ use crate::constants::RESERVED_VECTORS_FIELD_NAME; use crate::error::FaultSource; use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; use crate::index::IndexEmbeddingConfig; +use crate::progress::EmbedderStats; use crate::prompt::Prompt; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::settings::InnerIndexSettingsDiff; @@ -674,6 +675,7 @@ fn compare_vectors(a: &[f32], b: &[f32]) -> Ordering { a.iter().copied().map(OrderedFloat).cmp(b.iter().copied().map(OrderedFloat)) } +#[allow(clippy::too_many_arguments)] #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] pub fn extract_embeddings( // docid, prompt @@ -682,6 +684,7 @@ pub fn extract_embeddings( embedder: Arc, embedder_name: &str, possible_embedding_mistakes: &PossibleEmbeddingMistakes, + embedder_stats: &EmbedderStats, unused_vectors_distribution: &UnusedVectorsDistribution, request_threads: &ThreadPoolNoAbort, ) -> Result>> { @@ -724,6 +727,7 @@ pub fn extract_embeddings( std::mem::replace(&mut chunks, Vec::with_capacity(n_chunks)), embedder_name, possible_embedding_mistakes, + embedder_stats, unused_vectors_distribution, request_threads, )?; @@ -746,6 +750,7 @@ pub fn extract_embeddings( std::mem::take(&mut chunks), embedder_name, possible_embedding_mistakes, + embedder_stats, unused_vectors_distribution, request_threads, )?; @@ -764,6 +769,7 @@ pub fn extract_embeddings( vec![std::mem::take(&mut current_chunk)], embedder_name, possible_embedding_mistakes, + embedder_stats, unused_vectors_distribution, request_threads, )?; @@ -783,10 +789,11 @@ fn embed_chunks( text_chunks: Vec>, embedder_name: &str, possible_embedding_mistakes: &PossibleEmbeddingMistakes, + embedder_stats: &EmbedderStats, unused_vectors_distribution: &UnusedVectorsDistribution, request_threads: &ThreadPoolNoAbort, ) -> Result>> { - match embedder.embed_index(text_chunks, request_threads) { + match embedder.embed_index(text_chunks, request_threads, embedder_stats) { Ok(chunks) => Ok(chunks), Err(error) => { if let FaultSource::Bug = error.fault { diff --git a/crates/milli/src/update/index_documents/extract/mod.rs b/crates/milli/src/update/index_documents/extract/mod.rs index 8cd664a2f..9c1971356 100644 --- a/crates/milli/src/update/index_documents/extract/mod.rs +++ b/crates/milli/src/update/index_documents/extract/mod.rs @@ -31,6 +31,7 @@ use self::extract_word_position_docids::extract_word_position_docids; use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters}; use super::{helpers, TypedChunk}; use crate::index::IndexEmbeddingConfig; +use crate::progress::EmbedderStats; use crate::update::settings::InnerIndexSettingsDiff; use crate::vector::error::PossibleEmbeddingMistakes; use crate::{FieldId, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; @@ -49,6 +50,7 @@ pub(crate) fn data_from_obkv_documents( settings_diff: Arc, max_positions_per_attributes: Option, possible_embedding_mistakes: Arc, + embedder_stats: &Arc, ) -> Result<()> { let (original_pipeline_result, flattened_pipeline_result): (Result<_>, Result<_>) = rayon::join( || { @@ -62,6 +64,7 @@ pub(crate) fn data_from_obkv_documents( embedders_configs.clone(), settings_diff.clone(), possible_embedding_mistakes.clone(), + embedder_stats.clone(), ) }) .collect::>() @@ -231,6 +234,7 @@ fn send_original_documents_data( embedders_configs: Arc>, settings_diff: Arc, possible_embedding_mistakes: Arc, + embedder_stats: Arc, ) -> Result<()> { let original_documents_chunk = original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; @@ -270,6 +274,7 @@ fn send_original_documents_data( embedder.clone(), &embedder_name, &possible_embedding_mistakes, + &embedder_stats, &unused_vectors_distribution, request_threads(), ) { diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index f547c68d4..6e56ad155 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -32,7 +32,7 @@ use crate::database_stats::DatabaseStats; use crate::documents::{obkv_to_object, DocumentsBatchReader}; use crate::error::{Error, InternalError}; use crate::index::{PrefixSearch, PrefixSettings}; -use crate::progress::Progress; +use crate::progress::{EmbedderStats, Progress}; pub use crate::update::index_documents::helpers::CursorClonableMmap; use crate::update::{ IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst, @@ -81,6 +81,7 @@ pub struct IndexDocuments<'t, 'i, 'a, FP, FA> { added_documents: u64, deleted_documents: u64, embedders: EmbeddingConfigs, + embedder_stats: &'t Arc, } #[derive(Default, Debug, Clone)] @@ -103,6 +104,7 @@ where config: IndexDocumentsConfig, progress: FP, should_abort: FA, + embedder_stats: &'t Arc, ) -> Result> { let transform = Some(Transform::new( wtxn, @@ -123,6 +125,7 @@ where added_documents: 0, deleted_documents: 0, embedders: Default::default(), + embedder_stats, }) } @@ -292,6 +295,7 @@ where // Run extraction pipeline in parallel. let mut modified_docids = RoaringBitmap::new(); + let embedder_stats = self.embedder_stats.clone(); pool.install(|| { let settings_diff_cloned = settings_diff.clone(); rayon::spawn(move || { @@ -326,7 +330,8 @@ where embedders_configs.clone(), settings_diff_cloned, max_positions_per_attributes, - Arc::new(possible_embedding_mistakes) + Arc::new(possible_embedding_mistakes), + &embedder_stats ) }); @@ -2025,6 +2030,7 @@ mod tests { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2112,6 +2118,7 @@ mod tests { EmbeddingConfigs::default(), &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2297,6 +2304,7 @@ mod tests { embedders, &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2359,6 +2367,7 @@ mod tests { embedders, &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2412,6 +2421,7 @@ mod tests { embedders, &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2464,6 +2474,7 @@ mod tests { embedders, &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2518,6 +2529,7 @@ mod tests { embedders, &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2577,6 +2589,7 @@ mod tests { embedders, &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2629,6 +2642,7 @@ mod tests { embedders, &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2681,6 +2695,7 @@ mod tests { embedders, &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2879,6 +2894,7 @@ mod tests { embedders, &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2938,6 +2954,7 @@ mod tests { embedders, &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); @@ -2994,6 +3011,7 @@ mod tests { embedders, &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); wtxn.commit().unwrap(); diff --git a/crates/milli/src/update/new/extract/vectors/mod.rs b/crates/milli/src/update/new/extract/vectors/mod.rs index 43647e786..85398aa99 100644 --- a/crates/milli/src/update/new/extract/vectors/mod.rs +++ b/crates/milli/src/update/new/extract/vectors/mod.rs @@ -6,6 +6,7 @@ use hashbrown::{DefaultHashBuilder, HashMap}; use super::cache::DelAddRoaringBitmap; use crate::error::FaultSource; +use crate::progress::EmbedderStats; use crate::prompt::Prompt; use crate::update::new::channel::EmbeddingSender; use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor}; @@ -22,6 +23,7 @@ pub struct EmbeddingExtractor<'a, 'b> { embedders: &'a EmbeddingConfigs, sender: EmbeddingSender<'a, 'b>, possible_embedding_mistakes: PossibleEmbeddingMistakes, + embedder_stats: &'a EmbedderStats, threads: &'a ThreadPoolNoAbort, } @@ -30,10 +32,11 @@ impl<'a, 'b> EmbeddingExtractor<'a, 'b> { embedders: &'a EmbeddingConfigs, sender: EmbeddingSender<'a, 'b>, field_distribution: &'a FieldDistribution, + embedder_stats: &'a EmbedderStats, threads: &'a ThreadPoolNoAbort, ) -> Self { let possible_embedding_mistakes = PossibleEmbeddingMistakes::new(field_distribution); - Self { embedders, sender, threads, possible_embedding_mistakes } + Self { embedders, sender, threads, possible_embedding_mistakes, embedder_stats } } } @@ -75,6 +78,7 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> { prompt, context.data, &self.possible_embedding_mistakes, + self.embedder_stats, self.threads, self.sender, &context.doc_alloc, @@ -307,6 +311,7 @@ struct Chunks<'a, 'b, 'extractor> { dimensions: usize, prompt: &'a Prompt, possible_embedding_mistakes: &'a PossibleEmbeddingMistakes, + embedder_stats: &'a EmbedderStats, user_provided: &'a RefCell>, threads: &'a ThreadPoolNoAbort, sender: EmbeddingSender<'a, 'b>, @@ -322,6 +327,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> { prompt: &'a Prompt, user_provided: &'a RefCell>, possible_embedding_mistakes: &'a PossibleEmbeddingMistakes, + embedder_stats: &'a EmbedderStats, threads: &'a ThreadPoolNoAbort, sender: EmbeddingSender<'a, 'b>, doc_alloc: &'a Bump, @@ -336,6 +342,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> { embedder, prompt, possible_embedding_mistakes, + embedder_stats, threads, sender, embedder_id, @@ -371,6 +378,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> { self.embedder_id, self.embedder_name, self.possible_embedding_mistakes, + self.embedder_stats, unused_vectors_distribution, self.threads, self.sender, @@ -389,6 +397,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> { self.embedder_id, self.embedder_name, self.possible_embedding_mistakes, + self.embedder_stats, unused_vectors_distribution, self.threads, self.sender, @@ -407,6 +416,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> { embedder_id: u8, embedder_name: &str, possible_embedding_mistakes: &PossibleEmbeddingMistakes, + embedder_stats: &EmbedderStats, unused_vectors_distribution: &UnusedVectorsDistributionBump, threads: &ThreadPoolNoAbort, sender: EmbeddingSender<'a, 'b>, @@ -450,7 +460,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> { return Err(crate::Error::UserError(crate::UserError::DocumentEmbeddingError(msg))); } - let res = match embedder.embed_index_ref(texts.as_slice(), threads) { + let res = match embedder.embed_index_ref(texts.as_slice(), threads, embedder_stats) { Ok(embeddings) => { for (docid, embedding) in ids.into_iter().zip(embeddings) { sender.set_vector(*docid, embedder_id, embedding).unwrap(); diff --git a/crates/milli/src/update/new/indexer/extract.rs b/crates/milli/src/update/new/indexer/extract.rs index bb36ddc37..97ffc8624 100644 --- a/crates/milli/src/update/new/indexer/extract.rs +++ b/crates/milli/src/update/new/indexer/extract.rs @@ -13,6 +13,7 @@ use super::super::thread_local::{FullySend, ThreadLocal}; use super::super::FacetFieldIdsDelta; use super::document_changes::{extract, DocumentChanges, IndexingContext}; use crate::index::IndexEmbeddingConfig; +use crate::progress::EmbedderStats; use crate::progress::MergingWordCache; use crate::proximity::ProximityPrecision; use crate::update::new::extract::EmbeddingExtractor; @@ -34,6 +35,7 @@ pub(super) fn extract_all<'pl, 'extractor, DC, MSP>( mut index_embeddings: Vec, document_ids: &mut RoaringBitmap, modified_docids: &mut RoaringBitmap, + embedder_stats: &EmbedderStats, ) -> Result<(FacetFieldIdsDelta, Vec)> where DC: DocumentChanges<'pl>, @@ -245,6 +247,7 @@ where embedders, embedding_sender, field_distribution, + embedder_stats, request_threads(), ); let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 2ea3c787e..bb6ba0102 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -19,7 +19,7 @@ use super::steps::IndexingStep; use super::thread_local::ThreadLocal; use crate::documents::PrimaryKey; use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; -use crate::progress::Progress; +use crate::progress::{EmbedderStats, Progress}; use crate::update::GrenadParameters; use crate::vector::{ArroyWrapper, EmbeddingConfigs}; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort}; @@ -55,6 +55,7 @@ pub fn index<'pl, 'indexer, 'index, DC, MSP>( embedders: EmbeddingConfigs, must_stop_processing: &'indexer MSP, progress: &'indexer Progress, + embedder_stats: &'indexer EmbedderStats, ) -> Result where DC: DocumentChanges<'pl>, @@ -158,6 +159,7 @@ where index_embeddings, document_ids, modified_docids, + embedder_stats, ) }) .unwrap() diff --git a/crates/milli/src/update/settings.rs b/crates/milli/src/update/settings.rs index f396cd079..05dbb4784 100644 --- a/crates/milli/src/update/settings.rs +++ b/crates/milli/src/update/settings.rs @@ -27,6 +27,7 @@ use crate::index::{ DEFAULT_MIN_WORD_LEN_ONE_TYPO, DEFAULT_MIN_WORD_LEN_TWO_TYPOS, }; use crate::order_by_map::OrderByMap; +use crate::progress::EmbedderStats; use crate::prompt::{default_max_bytes, default_template_text, PromptData}; use crate::proximity::ProximityPrecision; use crate::update::index_documents::IndexDocumentsMethod; @@ -466,7 +467,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { #[tracing::instrument( level = "trace" - skip(self, progress_callback, should_abort, settings_diff), + skip(self, progress_callback, should_abort, settings_diff, embedder_stats), target = "indexing::documents" )] fn reindex( @@ -474,6 +475,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { progress_callback: &FP, should_abort: &FA, settings_diff: InnerIndexSettingsDiff, + embedder_stats: &Arc, ) -> Result<()> where FP: Fn(UpdateIndexingStep) + Sync, @@ -505,6 +507,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { IndexDocumentsConfig::default(), &progress_callback, &should_abort, + embedder_stats, )?; indexing_builder.execute_raw(output)?; @@ -1355,7 +1358,12 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { } } - pub fn execute(mut self, progress_callback: FP, should_abort: FA) -> Result<()> + pub fn execute( + mut self, + progress_callback: FP, + should_abort: FA, + embedder_stats: Arc, + ) -> Result<()> where FP: Fn(UpdateIndexingStep) + Sync, FA: Fn() -> bool + Sync, @@ -1413,7 +1421,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { ); if inner_settings_diff.any_reindexing_needed() { - self.reindex(&progress_callback, &should_abort, inner_settings_diff)?; + self.reindex(&progress_callback, &should_abort, inner_settings_diff, &embedder_stats)?; } Ok(()) diff --git a/crates/milli/src/vector/composite.rs b/crates/milli/src/vector/composite.rs index 9c5992bd3..8314b8649 100644 --- a/crates/milli/src/vector/composite.rs +++ b/crates/milli/src/vector/composite.rs @@ -7,6 +7,7 @@ use super::{ hf, manual, ollama, openai, rest, DistributionShift, EmbedError, Embedding, EmbeddingCache, NewEmbedderError, }; +use crate::progress::EmbedderStats; use crate::ThreadPoolNoAbort; #[derive(Debug)] @@ -81,6 +82,7 @@ impl Embedder { "This is a sample text. It is meant to compare similarity.".into(), ], None, + None, ) .map_err(|error| NewEmbedderError::composite_test_embedding_failed(error, "search"))?; @@ -92,6 +94,7 @@ impl Embedder { "This is a sample text. It is meant to compare similarity.".into(), ], None, + None, ) .map_err(|error| { NewEmbedderError::composite_test_embedding_failed(error, "indexing") @@ -150,13 +153,14 @@ impl SubEmbedder { &self, texts: Vec, deadline: Option, + embedder_stats: Option<&EmbedderStats>, ) -> std::result::Result, EmbedError> { match self { SubEmbedder::HuggingFace(embedder) => embedder.embed(texts), - SubEmbedder::OpenAi(embedder) => embedder.embed(&texts, deadline), - SubEmbedder::Ollama(embedder) => embedder.embed(&texts, deadline), + SubEmbedder::OpenAi(embedder) => embedder.embed(&texts, deadline, embedder_stats), + SubEmbedder::Ollama(embedder) => embedder.embed(&texts, deadline, embedder_stats), SubEmbedder::UserProvided(embedder) => embedder.embed(&texts), - SubEmbedder::Rest(embedder) => embedder.embed(texts, deadline), + SubEmbedder::Rest(embedder) => embedder.embed(texts, deadline, embedder_stats), } } @@ -164,18 +168,21 @@ impl SubEmbedder { &self, text: &str, deadline: Option, + embedder_stats: Option<&EmbedderStats>, ) -> std::result::Result { match self { SubEmbedder::HuggingFace(embedder) => embedder.embed_one(text), - SubEmbedder::OpenAi(embedder) => { - embedder.embed(&[text], deadline)?.pop().ok_or_else(EmbedError::missing_embedding) - } - SubEmbedder::Ollama(embedder) => { - embedder.embed(&[text], deadline)?.pop().ok_or_else(EmbedError::missing_embedding) - } + SubEmbedder::OpenAi(embedder) => embedder + .embed(&[text], deadline, embedder_stats)? + .pop() + .ok_or_else(EmbedError::missing_embedding), + SubEmbedder::Ollama(embedder) => embedder + .embed(&[text], deadline, embedder_stats)? + .pop() + .ok_or_else(EmbedError::missing_embedding), SubEmbedder::UserProvided(embedder) => embedder.embed_one(text), SubEmbedder::Rest(embedder) => embedder - .embed_ref(&[text], deadline)? + .embed_ref(&[text], deadline, embedder_stats)? .pop() .ok_or_else(EmbedError::missing_embedding), } @@ -188,13 +195,20 @@ impl SubEmbedder { &self, text_chunks: Vec>, threads: &ThreadPoolNoAbort, + embedder_stats: &EmbedderStats, ) -> std::result::Result>, EmbedError> { match self { SubEmbedder::HuggingFace(embedder) => embedder.embed_index(text_chunks), - SubEmbedder::OpenAi(embedder) => embedder.embed_index(text_chunks, threads), - SubEmbedder::Ollama(embedder) => embedder.embed_index(text_chunks, threads), + SubEmbedder::OpenAi(embedder) => { + embedder.embed_index(text_chunks, threads, embedder_stats) + } + SubEmbedder::Ollama(embedder) => { + embedder.embed_index(text_chunks, threads, embedder_stats) + } SubEmbedder::UserProvided(embedder) => embedder.embed_index(text_chunks), - SubEmbedder::Rest(embedder) => embedder.embed_index(text_chunks, threads), + SubEmbedder::Rest(embedder) => { + embedder.embed_index(text_chunks, threads, embedder_stats) + } } } @@ -203,13 +217,18 @@ impl SubEmbedder { &self, texts: &[&str], threads: &ThreadPoolNoAbort, + embedder_stats: &EmbedderStats, ) -> std::result::Result, EmbedError> { match self { SubEmbedder::HuggingFace(embedder) => embedder.embed_index_ref(texts), - SubEmbedder::OpenAi(embedder) => embedder.embed_index_ref(texts, threads), - SubEmbedder::Ollama(embedder) => embedder.embed_index_ref(texts, threads), + SubEmbedder::OpenAi(embedder) => { + embedder.embed_index_ref(texts, threads, embedder_stats) + } + SubEmbedder::Ollama(embedder) => { + embedder.embed_index_ref(texts, threads, embedder_stats) + } SubEmbedder::UserProvided(embedder) => embedder.embed_index_ref(texts), - SubEmbedder::Rest(embedder) => embedder.embed_index_ref(texts, threads), + SubEmbedder::Rest(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), } } diff --git a/crates/milli/src/vector/mod.rs b/crates/milli/src/vector/mod.rs index c2978f5db..065beb5fb 100644 --- a/crates/milli/src/vector/mod.rs +++ b/crates/milli/src/vector/mod.rs @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use self::error::{EmbedError, NewEmbedderError}; -use crate::progress::Progress; +use crate::progress::{EmbedderStats, Progress}; use crate::prompt::{Prompt, PromptData}; use crate::ThreadPoolNoAbort; @@ -719,18 +719,20 @@ impl Embedder { } let embedding = match self { Embedder::HuggingFace(embedder) => embedder.embed_one(text), - Embedder::OpenAi(embedder) => { - embedder.embed(&[text], deadline)?.pop().ok_or_else(EmbedError::missing_embedding) - } - Embedder::Ollama(embedder) => { - embedder.embed(&[text], deadline)?.pop().ok_or_else(EmbedError::missing_embedding) - } - Embedder::UserProvided(embedder) => embedder.embed_one(text), - Embedder::Rest(embedder) => embedder - .embed_ref(&[text], deadline)? + Embedder::OpenAi(embedder) => embedder + .embed(&[text], deadline, None)? .pop() .ok_or_else(EmbedError::missing_embedding), - Embedder::Composite(embedder) => embedder.search.embed_one(text, deadline), + Embedder::Ollama(embedder) => embedder + .embed(&[text], deadline, None)? + .pop() + .ok_or_else(EmbedError::missing_embedding), + Embedder::UserProvided(embedder) => embedder.embed_one(text), + Embedder::Rest(embedder) => embedder + .embed_ref(&[text], deadline, None)? + .pop() + .ok_or_else(EmbedError::missing_embedding), + Embedder::Composite(embedder) => embedder.search.embed_one(text, deadline, None), }?; if let Some(cache) = self.cache() { @@ -747,14 +749,21 @@ impl Embedder { &self, text_chunks: Vec>, threads: &ThreadPoolNoAbort, + embedder_stats: &EmbedderStats, ) -> std::result::Result>, EmbedError> { match self { Embedder::HuggingFace(embedder) => embedder.embed_index(text_chunks), - Embedder::OpenAi(embedder) => embedder.embed_index(text_chunks, threads), - Embedder::Ollama(embedder) => embedder.embed_index(text_chunks, threads), + Embedder::OpenAi(embedder) => { + embedder.embed_index(text_chunks, threads, embedder_stats) + } + Embedder::Ollama(embedder) => { + embedder.embed_index(text_chunks, threads, embedder_stats) + } Embedder::UserProvided(embedder) => embedder.embed_index(text_chunks), - Embedder::Rest(embedder) => embedder.embed_index(text_chunks, threads), - Embedder::Composite(embedder) => embedder.index.embed_index(text_chunks, threads), + Embedder::Rest(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), + Embedder::Composite(embedder) => { + embedder.index.embed_index(text_chunks, threads, embedder_stats) + } } } @@ -763,14 +772,17 @@ impl Embedder { &self, texts: &[&str], threads: &ThreadPoolNoAbort, + embedder_stats: &EmbedderStats, ) -> std::result::Result, EmbedError> { match self { Embedder::HuggingFace(embedder) => embedder.embed_index_ref(texts), - Embedder::OpenAi(embedder) => embedder.embed_index_ref(texts, threads), - Embedder::Ollama(embedder) => embedder.embed_index_ref(texts, threads), + Embedder::OpenAi(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), + Embedder::Ollama(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), Embedder::UserProvided(embedder) => embedder.embed_index_ref(texts), - Embedder::Rest(embedder) => embedder.embed_index_ref(texts, threads), - Embedder::Composite(embedder) => embedder.index.embed_index_ref(texts, threads), + Embedder::Rest(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), + Embedder::Composite(embedder) => { + embedder.index.embed_index_ref(texts, threads, embedder_stats) + } } } diff --git a/crates/milli/src/vector/ollama.rs b/crates/milli/src/vector/ollama.rs index 8beae6205..d4329a2de 100644 --- a/crates/milli/src/vector/ollama.rs +++ b/crates/milli/src/vector/ollama.rs @@ -7,6 +7,7 @@ use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErro use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions}; use super::{DistributionShift, EmbeddingCache, REQUEST_PARALLELISM}; use crate::error::FaultSource; +use crate::progress::EmbedderStats; use crate::vector::Embedding; use crate::ThreadPoolNoAbort; @@ -104,8 +105,9 @@ impl Embedder { &self, texts: &[S], deadline: Option, + embedder_stats: Option<&EmbedderStats>, ) -> Result, EmbedError> { - match self.rest_embedder.embed_ref(texts, deadline) { + match self.rest_embedder.embed_ref(texts, deadline, embedder_stats) { Ok(embeddings) => Ok(embeddings), Err(EmbedError { kind: EmbedErrorKind::RestOtherStatusCode(404, error), fault: _ }) => { Err(EmbedError::ollama_model_not_found(error)) @@ -118,15 +120,22 @@ impl Embedder { &self, text_chunks: Vec>, threads: &ThreadPoolNoAbort, + embedder_stats: &EmbedderStats, ) -> Result>, EmbedError> { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { - text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect() + text_chunks + .into_iter() + .map(move |chunk| self.embed(&chunk, None, Some(embedder_stats))) + .collect() } else { threads .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect() + text_chunks + .into_par_iter() + .map(move |chunk| self.embed(&chunk, None, Some(embedder_stats))) + .collect() }) .map_err(|error| EmbedError { kind: EmbedErrorKind::PanicInThreadPool(error), @@ -139,13 +148,14 @@ impl Embedder { &self, texts: &[&str], threads: &ThreadPoolNoAbort, + embedder_stats: &EmbedderStats, ) -> Result>, EmbedError> { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { let embeddings: Result>, _> = texts .chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed(chunk, None)) + .map(move |chunk| self.embed(chunk, None, Some(embedder_stats))) .collect(); let embeddings = embeddings?; @@ -155,7 +165,7 @@ impl Embedder { .install(move || { let embeddings: Result>, _> = texts .par_chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed(chunk, None)) + .map(move |chunk| self.embed(chunk, None, Some(embedder_stats))) .collect(); let embeddings = embeddings?; diff --git a/crates/milli/src/vector/openai.rs b/crates/milli/src/vector/openai.rs index df29f6916..0159d5c76 100644 --- a/crates/milli/src/vector/openai.rs +++ b/crates/milli/src/vector/openai.rs @@ -9,6 +9,7 @@ use super::error::{EmbedError, NewEmbedderError}; use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions}; use super::{DistributionShift, EmbeddingCache, REQUEST_PARALLELISM}; use crate::error::FaultSource; +use crate::progress::EmbedderStats; use crate::vector::error::EmbedErrorKind; use crate::vector::Embedding; use crate::ThreadPoolNoAbort; @@ -215,8 +216,9 @@ impl Embedder { &self, texts: &[S], deadline: Option, + embedder_stats: Option<&EmbedderStats>, ) -> Result, EmbedError> { - match self.rest_embedder.embed_ref(texts, deadline) { + match self.rest_embedder.embed_ref(texts, deadline, embedder_stats) { Ok(embeddings) => Ok(embeddings), Err(EmbedError { kind: EmbedErrorKind::RestBadRequest(error, _), fault: _ }) => { tracing::warn!(error=?error, "OpenAI: received `BAD_REQUEST`. Input was maybe too long, retrying on tokenized version. For best performance, limit the size of your document template."); @@ -238,7 +240,11 @@ impl Embedder { let encoded = self.tokenizer.encode_ordinary(text); let len = encoded.len(); if len < max_token_count { - all_embeddings.append(&mut self.rest_embedder.embed_ref(&[text], deadline)?); + all_embeddings.append(&mut self.rest_embedder.embed_ref( + &[text], + deadline, + None, + )?); continue; } @@ -255,15 +261,22 @@ impl Embedder { &self, text_chunks: Vec>, threads: &ThreadPoolNoAbort, + embedder_stats: &EmbedderStats, ) -> Result>, EmbedError> { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { - text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect() + text_chunks + .into_iter() + .map(move |chunk| self.embed(&chunk, None, Some(embedder_stats))) + .collect() } else { threads .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect() + text_chunks + .into_par_iter() + .map(move |chunk| self.embed(&chunk, None, Some(embedder_stats))) + .collect() }) .map_err(|error| EmbedError { kind: EmbedErrorKind::PanicInThreadPool(error), @@ -276,13 +289,14 @@ impl Embedder { &self, texts: &[&str], threads: &ThreadPoolNoAbort, + embedder_stats: &EmbedderStats, ) -> Result>, EmbedError> { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { let embeddings: Result>, _> = texts .chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed(chunk, None)) + .map(move |chunk| self.embed(chunk, None, Some(embedder_stats))) .collect(); let embeddings = embeddings?; Ok(embeddings.into_iter().flatten().collect()) @@ -291,7 +305,7 @@ impl Embedder { .install(move || { let embeddings: Result>, _> = texts .par_chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed(chunk, None)) + .map(move |chunk| self.embed(chunk, None, Some(embedder_stats))) .collect(); let embeddings = embeddings?; diff --git a/crates/milli/src/vector/rest.rs b/crates/milli/src/vector/rest.rs index b87ac9f77..fbe3c1129 100644 --- a/crates/milli/src/vector/rest.rs +++ b/crates/milli/src/vector/rest.rs @@ -13,6 +13,7 @@ use super::{ DistributionShift, EmbedError, Embedding, EmbeddingCache, NewEmbedderError, REQUEST_PARALLELISM, }; use crate::error::FaultSource; +use crate::progress::EmbedderStats; use crate::ThreadPoolNoAbort; // retrying in case of failure @@ -168,19 +169,28 @@ impl Embedder { &self, texts: Vec, deadline: Option, + embedder_stats: Option<&EmbedderStats>, ) -> Result, EmbedError> { - embed(&self.data, texts.as_slice(), texts.len(), Some(self.dimensions), deadline) + embed( + &self.data, + texts.as_slice(), + texts.len(), + Some(self.dimensions), + deadline, + embedder_stats, + ) } pub fn embed_ref( &self, texts: &[S], deadline: Option, + embedder_stats: Option<&EmbedderStats>, ) -> Result, EmbedError> where S: AsRef + Serialize, { - embed(&self.data, texts, texts.len(), Some(self.dimensions), deadline) + embed(&self.data, texts, texts.len(), Some(self.dimensions), deadline, embedder_stats) } pub fn embed_tokens( @@ -188,7 +198,7 @@ impl Embedder { tokens: &[u32], deadline: Option, ) -> Result { - let mut embeddings = embed(&self.data, tokens, 1, Some(self.dimensions), deadline)?; + let mut embeddings = embed(&self.data, tokens, 1, Some(self.dimensions), deadline, None)?; // unwrap: guaranteed that embeddings.len() == 1, otherwise the previous line terminated in error Ok(embeddings.pop().unwrap()) } @@ -197,15 +207,22 @@ impl Embedder { &self, text_chunks: Vec>, threads: &ThreadPoolNoAbort, + embedder_stats: &EmbedderStats, ) -> Result>, EmbedError> { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { - text_chunks.into_iter().map(move |chunk| self.embed(chunk, None)).collect() + text_chunks + .into_iter() + .map(move |chunk| self.embed(chunk, None, Some(embedder_stats))) + .collect() } else { threads .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect() + text_chunks + .into_par_iter() + .map(move |chunk| self.embed(chunk, None, Some(embedder_stats))) + .collect() }) .map_err(|error| EmbedError { kind: EmbedErrorKind::PanicInThreadPool(error), @@ -218,13 +235,14 @@ impl Embedder { &self, texts: &[&str], threads: &ThreadPoolNoAbort, + embedder_stats: &EmbedderStats, ) -> Result, EmbedError> { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { let embeddings: Result>, _> = texts .chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed_ref(chunk, None)) + .map(move |chunk| self.embed_ref(chunk, None, Some(embedder_stats))) .collect(); let embeddings = embeddings?; @@ -234,7 +252,7 @@ impl Embedder { .install(move || { let embeddings: Result>, _> = texts .par_chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed_ref(chunk, None)) + .map(move |chunk| self.embed_ref(chunk, None, Some(embedder_stats))) .collect(); let embeddings = embeddings?; @@ -272,7 +290,7 @@ impl Embedder { } fn infer_dimensions(data: &EmbedderData) -> Result { - let v = embed(data, ["test"].as_slice(), 1, None, None) + let v = embed(data, ["test"].as_slice(), 1, None, None, None) .map_err(NewEmbedderError::could_not_determine_dimension)?; // unwrap: guaranteed that v.len() == 1, otherwise the previous line terminated in error Ok(v.first().unwrap().len()) @@ -284,6 +302,7 @@ fn embed( expected_count: usize, expected_dimension: Option, deadline: Option, + embedder_stats: Option<&EmbedderStats>, ) -> Result, EmbedError> where S: Serialize, @@ -302,6 +321,9 @@ where let body = data.request.inject_texts(inputs); for attempt in 0..10 { + if let Some(embedder_stats) = &embedder_stats { + embedder_stats.total_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } let response = request.clone().send_json(&body); let result = check_response(response, data.configuration_source).and_then(|response| { response_to_embedding(response, data, expected_count, expected_dimension) @@ -311,6 +333,13 @@ where Ok(response) => return Ok(response), Err(retry) => { tracing::warn!("Failed: {}", retry.error); + if let Some(embedder_stats) = &embedder_stats { + let stringified_error = retry.error.to_string(); + let mut errors = + embedder_stats.errors.write().unwrap_or_else(|p| p.into_inner()); + errors.0 = Some(stringified_error); + errors.1 += 1; + } if let Some(deadline) = deadline { let now = std::time::Instant::now(); if now > deadline { @@ -336,12 +365,26 @@ where std::thread::sleep(retry_duration); } + if let Some(embedder_stats) = &embedder_stats { + embedder_stats.total_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } let response = request.send_json(&body); - let result = check_response(response, data.configuration_source); - result.map_err(Retry::into_error).and_then(|response| { + let result = check_response(response, data.configuration_source).and_then(|response| { response_to_embedding(response, data, expected_count, expected_dimension) - .map_err(Retry::into_error) - }) + }); + + match result { + Ok(response) => Ok(response), + Err(retry) => { + if let Some(embedder_stats) = &embedder_stats { + let stringified_error = retry.error.to_string(); + let mut errors = embedder_stats.errors.write().unwrap_or_else(|p| p.into_inner()); + errors.0 = Some(stringified_error); + errors.1 += 1; + }; + Err(retry.into_error()) + } + } } fn check_response( diff --git a/crates/milli/tests/search/distinct.rs b/crates/milli/tests/search/distinct.rs index fc890dfe8..15fcf70a2 100644 --- a/crates/milli/tests/search/distinct.rs +++ b/crates/milli/tests/search/distinct.rs @@ -19,7 +19,7 @@ macro_rules! test_distinct { let config = milli::update::IndexerConfig::default(); let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_distinct_field(S(stringify!($distinct))); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, Default::default()).unwrap(); wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); diff --git a/crates/milli/tests/search/facet_distribution.rs b/crates/milli/tests/search/facet_distribution.rs index 8934cbea4..8548f0d01 100644 --- a/crates/milli/tests/search/facet_distribution.rs +++ b/crates/milli/tests/search/facet_distribution.rs @@ -25,7 +25,7 @@ fn test_facet_distribution_with_no_facet_values() { FilterableAttributesRule::Field(S("genres")), FilterableAttributesRule::Field(S("tags")), ]); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, Default::default()).unwrap(); wtxn.commit().unwrap(); // index documents @@ -74,6 +74,7 @@ fn test_facet_distribution_with_no_facet_values() { embedders, &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); diff --git a/crates/milli/tests/search/mod.rs b/crates/milli/tests/search/mod.rs index 906956716..4098af736 100644 --- a/crates/milli/tests/search/mod.rs +++ b/crates/milli/tests/search/mod.rs @@ -63,7 +63,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { S("america") => vec![S("the united states")], }); builder.set_searchable_fields(vec![S("title"), S("description")]); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, Default::default()).unwrap(); wtxn.commit().unwrap(); // index documents @@ -114,6 +114,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { embedders, &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); diff --git a/crates/milli/tests/search/phrase_search.rs b/crates/milli/tests/search/phrase_search.rs index b7f792bfc..180fcd176 100644 --- a/crates/milli/tests/search/phrase_search.rs +++ b/crates/milli/tests/search/phrase_search.rs @@ -10,7 +10,7 @@ fn set_stop_words(index: &Index, stop_words: &[&str]) { let mut builder = Settings::new(&mut wtxn, index, &config); let stop_words = stop_words.iter().map(|s| s.to_string()).collect(); builder.set_stop_words(stop_words); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, Default::default()).unwrap(); wtxn.commit().unwrap(); } diff --git a/crates/milli/tests/search/query_criteria.rs b/crates/milli/tests/search/query_criteria.rs index 1acc89484..b72978330 100644 --- a/crates/milli/tests/search/query_criteria.rs +++ b/crates/milli/tests/search/query_criteria.rs @@ -236,7 +236,7 @@ fn criteria_mixup() { let mut wtxn = index.write_txn().unwrap(); let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_criteria(criteria.clone()); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, Default::default()).unwrap(); wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); @@ -276,7 +276,7 @@ fn criteria_ascdesc() { S("name"), S("age"), }); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, Default::default()).unwrap(); wtxn.commit().unwrap(); let mut wtxn = index.write_txn().unwrap(); @@ -344,6 +344,7 @@ fn criteria_ascdesc() { embedders, &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -358,7 +359,7 @@ fn criteria_ascdesc() { let mut wtxn = index.write_txn().unwrap(); let mut builder = Settings::new(&mut wtxn, &index, &config); builder.set_criteria(vec![criterion.clone()]); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, Default::default()).unwrap(); wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); diff --git a/crates/milli/tests/search/typo_tolerance.rs b/crates/milli/tests/search/typo_tolerance.rs index 3c0717063..9aacbf82a 100644 --- a/crates/milli/tests/search/typo_tolerance.rs +++ b/crates/milli/tests/search/typo_tolerance.rs @@ -46,7 +46,7 @@ fn test_typo_tolerance_one_typo() { let config = IndexerConfig::default(); let mut builder = Settings::new(&mut txn, &index, &config); builder.set_min_word_len_one_typo(4); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, Default::default()).unwrap(); // typo is now supported for 4 letters words let mut search = Search::new(&txn, &index); @@ -92,7 +92,7 @@ fn test_typo_tolerance_two_typo() { let config = IndexerConfig::default(); let mut builder = Settings::new(&mut txn, &index, &config); builder.set_min_word_len_two_typos(7); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, Default::default()).unwrap(); // typo is now supported for 4 letters words let mut search = Search::new(&txn, &index); @@ -153,6 +153,7 @@ fn test_typo_disabled_on_word() { embedders, &|| false, &Progress::default(), + &Default::default(), ) .unwrap(); @@ -180,7 +181,7 @@ fn test_typo_disabled_on_word() { // `zealand` doesn't allow typos anymore exact_words.insert("zealand".to_string()); builder.set_exact_words(exact_words); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, Default::default()).unwrap(); let mut search = Search::new(&txn, &index); search.query("zealand"); @@ -218,7 +219,7 @@ fn test_disable_typo_on_attribute() { let mut builder = Settings::new(&mut txn, &index, &config); // disable typos on `description` builder.set_exact_attributes(vec!["description".to_string()].into_iter().collect()); - builder.execute(|_| (), || false).unwrap(); + builder.execute(|_| (), || false, Default::default()).unwrap(); let mut search = Search::new(&txn, &index); search.query("antebelum");