diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index 8e1fb1c2c..d4504c246 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -1,7 +1,7 @@ use std::collections::BTreeSet; use std::fmt::Write; -use meilisearch_types::batches::{Batch, EmbedderStatsView, BatchEnqueuedAt, BatchStats}; +use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchStats}; use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::{Database, RoTxn}; use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32}; @@ -367,7 +367,10 @@ pub fn snapshot_batch(batch: &Batch) -> String { snap.push_str(&format!("uid: {uid}, ")); snap.push_str(&format!("details: {}, ", serde_json::to_string(details).unwrap())); snap.push_str(&format!("stats: {}, ", serde_json::to_string(&stats).unwrap())); - snap.push_str(&format!("embedder_stats: {}, ", serde_json::to_string(&embedder_stats).unwrap())); + snap.push_str(&format!( + "embedder_stats: {}, ", + serde_json::to_string(&embedder_stats).unwrap() + )); snap.push_str(&format!("stop reason: {}, ", serde_json::to_string(&stop_reason).unwrap())); snap.push('}'); snap diff --git a/crates/index-scheduler/src/queue/batches.rs b/crates/index-scheduler/src/queue/batches.rs index 96a3940a5..b96f65836 100644 --- a/crates/index-scheduler/src/queue/batches.rs +++ b/crates/index-scheduler/src/queue/batches.rs @@ -1,7 +1,7 @@ use std::collections::HashSet; use std::ops::{Bound, RangeBounds}; -use meilisearch_types::batches::{Batch, EmbedderStatsView, BatchId}; +use meilisearch_types::batches::{Batch, BatchId}; use meilisearch_types::heed::types::{DecodeIgnore, SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls}; use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32}; diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index c5305cf21..5261692b6 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -1,11 +1,10 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use std::panic::{catch_unwind, AssertUnwindSafe}; use std::sync::atomic::Ordering; -use std::sync::Arc; use meilisearch_types::batches::{BatchEnqueuedAt, BatchId}; use meilisearch_types::heed::{RoTxn, RwTxn}; -use meilisearch_types::milli::progress::{EmbedderStats, Progress, VariableNameStep}; +use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::milli::{self, ChannelCongestion}; use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; @@ -163,8 +162,13 @@ impl IndexScheduler { .set_currently_updating_index(Some((index_uid.clone(), index.clone()))); let pre_commit_dabases_sizes = index.database_sizes(&index_wtxn)?; - let (tasks, congestion) = - self.apply_index_operation(&mut index_wtxn, &index, op, &progress, current_batch.embedder_stats.clone())?; + let (tasks, congestion) = self.apply_index_operation( + &mut index_wtxn, + &index, + op, + &progress, + current_batch.embedder_stats.clone(), + )?; { progress.update_progress(FinalizingIndexStep::Committing); diff --git a/crates/index-scheduler/src/utils.rs b/crates/index-scheduler/src/utils.rs index 226ef9f06..ca37065ec 100644 --- a/crates/index-scheduler/src/utils.rs +++ b/crates/index-scheduler/src/utils.rs @@ -1,11 +1,11 @@ //! Utility functions on the DBs. Mainly getter and setters. +use crate::milli::progress::EmbedderStats; use std::collections::{BTreeSet, HashSet}; use std::ops::Bound; use std::sync::Arc; -use crate::milli::progress::EmbedderStats; -use meilisearch_types::batches::{Batch, EmbedderStatsView, BatchEnqueuedAt, BatchId, BatchStats}; +use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchId, BatchStats}; use meilisearch_types::heed::{Database, RoTxn, RwTxn}; use meilisearch_types::milli::CboRoaringBitmapCodec; use meilisearch_types::task_view::DetailsView; diff --git a/crates/meilisearch-types/src/batch_view.rs b/crates/meilisearch-types/src/batch_view.rs index aced97d7a..ea027b74e 100644 --- a/crates/meilisearch-types/src/batch_view.rs +++ b/crates/meilisearch-types/src/batch_view.rs @@ -3,7 +3,7 @@ use serde::Serialize; use time::{Duration, OffsetDateTime}; use utoipa::ToSchema; -use crate::batches::{Batch, EmbedderStatsView, BatchId, BatchStats}; +use crate::batches::{Batch, BatchId, BatchStats, EmbedderStatsView}; use crate::task_view::DetailsView; use crate::tasks::serialize_duration; diff --git a/crates/meilisearch-types/src/batches.rs b/crates/meilisearch-types/src/batches.rs index 45cc2d9f4..cec74fb75 100644 --- a/crates/meilisearch-types/src/batches.rs +++ b/crates/meilisearch-types/src/batches.rs @@ -1,5 +1,4 @@ use std::collections::BTreeMap; -use std::sync::Arc; use milli::progress::{EmbedderStats, ProgressView}; use serde::{Deserialize, Serialize}; diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 72be6aec9..cdecd520c 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -544,8 +544,11 @@ fn import_dump( let settings = index_reader.settings()?; apply_settings_to_builder(&settings, &mut builder); let embedder_stats: Arc = Default::default(); // FIXME: this isn't linked to anything - builder - .execute(|indexing_step| tracing::debug!("update: {:?}", indexing_step), || false, embedder_stats.clone())?; + builder.execute( + |indexing_step| tracing::debug!("update: {:?}", indexing_step), + || false, + embedder_stats.clone(), + )?; // 4.3 Import the documents. // 4.3.1 We need to recreate the grenad+obkv format accepted by the index. diff --git a/crates/meilisearch/tests/vector/rest.rs b/crates/meilisearch/tests/vector/rest.rs index 1fdd18d28..363931a86 100644 --- a/crates/meilisearch/tests/vector/rest.rs +++ b/crates/meilisearch/tests/vector/rest.rs @@ -1,14 +1,12 @@ -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::BTreeMap; use std::sync::atomic::AtomicUsize; use meili_snap::{json_string, snapshot}; use reqwest::IntoUrl; -use tokio::spawn; +use std::time::Duration; use tokio::sync::mpsc; use wiremock::matchers::{method, path}; use wiremock::{Mock, MockServer, Request, ResponseTemplate}; -use tokio::time::sleep; -use std::time::Duration; use crate::common::Value; use crate::json; @@ -342,7 +340,7 @@ async fn create_mock_raw() -> (MockServer, Value) { async fn create_faulty_mock_raw(sender: mpsc::Sender<()>) -> (MockServer, Value) { let mock_server = MockServer::start().await; let count = AtomicUsize::new(0); - + Mock::given(method("POST")) .and(path("/")) .respond_with(move |req: &Request| { @@ -359,14 +357,13 @@ async fn create_faulty_mock_raw(sender: mpsc::Sender<()>) -> (MockServer, Value) if count >= 5 { let _ = sender.try_send(()); - ResponseTemplate::new(500) - .set_delay(Duration::from_secs(u64::MAX)) - .set_body_json(json!({ + ResponseTemplate::new(500).set_delay(Duration::from_secs(u64::MAX)).set_body_json( + json!({ "error": "Service Unavailable", "text": req_body - })) + }), + ) } else { - ResponseTemplate::new(500).set_body_json(json!({ "error": "Service Unavailable", "text": req_body @@ -2168,7 +2165,6 @@ async fn searchable_reindex() { "###); } - #[actix_rt::test] async fn last_error_stats() { let (sender, mut receiver) = mpsc::channel(10); @@ -2191,7 +2187,7 @@ async fn last_error_stats() { {"id": 1, "name": "will_error"}, {"id": 2, "name": "must_error"}, ]); - let (value, code) = index.add_documents(documents, None).await; + let (_value, code) = index.add_documents(documents, None).await; snapshot!(code, @"202 Accepted"); // The task will eventually fail, so let's not wait for it. diff --git a/crates/milli/src/progress.rs b/crates/milli/src/progress.rs index 8cd2c9336..7ecfcc095 100644 --- a/crates/milli/src/progress.rs +++ b/crates/milli/src/progress.rs @@ -25,7 +25,7 @@ pub struct Progress { #[derive(Default)] pub struct EmbedderStats { pub errors: Arc, u32)>>, - pub total_count: AtomicUsize + pub total_count: AtomicUsize, } impl std::fmt::Debug for EmbedderStats { diff --git a/crates/milli/src/search/new/tests/integration.rs b/crates/milli/src/search/new/tests/integration.rs index 0b7e1a292..c4e521a88 100644 --- a/crates/milli/src/search/new/tests/integration.rs +++ b/crates/milli/src/search/new/tests/integration.rs @@ -95,7 +95,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { embedders, &|| false, &Progress::default(), - Default::default(), + Default::default(), ) .unwrap(); diff --git a/crates/milli/src/update/new/indexer/extract.rs b/crates/milli/src/update/new/indexer/extract.rs index 72c63b605..040886236 100644 --- a/crates/milli/src/update/new/indexer/extract.rs +++ b/crates/milli/src/update/new/indexer/extract.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use std::sync::atomic::AtomicBool; -use std::sync::OnceLock; use std::sync::Arc; +use std::sync::OnceLock; use bumpalo::Bump; use roaring::RoaringBitmap; diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 52fd6cd0b..33774f892 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -1,7 +1,7 @@ use std::sync::atomic::AtomicBool; +use std::sync::Arc; use std::sync::{Once, RwLock}; use std::thread::{self, Builder}; -use std::sync::Arc; use big_s::S; use document_changes::{DocumentChanges, IndexingContext}; diff --git a/crates/milli/src/update/settings.rs b/crates/milli/src/update/settings.rs index 98ee86978..b3f70d1b6 100644 --- a/crates/milli/src/update/settings.rs +++ b/crates/milli/src/update/settings.rs @@ -1358,7 +1358,12 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> { } } - pub fn execute(mut self, progress_callback: FP, should_abort: FA, embedder_stats: Arc) -> Result<()> + pub fn execute( + mut self, + progress_callback: FP, + should_abort: FA, + embedder_stats: Arc, + ) -> Result<()> where FP: Fn(UpdateIndexingStep) + Sync, FA: Fn() -> bool + Sync, diff --git a/crates/milli/src/vector/composite.rs b/crates/milli/src/vector/composite.rs index daec50e4b..7d9497165 100644 --- a/crates/milli/src/vector/composite.rs +++ b/crates/milli/src/vector/composite.rs @@ -173,12 +173,14 @@ impl SubEmbedder { ) -> std::result::Result { match self { SubEmbedder::HuggingFace(embedder) => embedder.embed_one(text), - SubEmbedder::OpenAi(embedder) => { - embedder.embed(&[text], deadline, embedder_stats)?.pop().ok_or_else(EmbedError::missing_embedding) - } - SubEmbedder::Ollama(embedder) => { - embedder.embed(&[text], deadline, embedder_stats)?.pop().ok_or_else(EmbedError::missing_embedding) - } + SubEmbedder::OpenAi(embedder) => embedder + .embed(&[text], deadline, embedder_stats)? + .pop() + .ok_or_else(EmbedError::missing_embedding), + SubEmbedder::Ollama(embedder) => embedder + .embed(&[text], deadline, embedder_stats)? + .pop() + .ok_or_else(EmbedError::missing_embedding), SubEmbedder::UserProvided(embedder) => embedder.embed_one(text), SubEmbedder::Rest(embedder) => embedder .embed_ref(&[text], deadline, embedder_stats)? @@ -198,10 +200,16 @@ impl SubEmbedder { ) -> std::result::Result>, EmbedError> { match self { SubEmbedder::HuggingFace(embedder) => embedder.embed_index(text_chunks), - SubEmbedder::OpenAi(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), - SubEmbedder::Ollama(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), + SubEmbedder::OpenAi(embedder) => { + embedder.embed_index(text_chunks, threads, embedder_stats) + } + SubEmbedder::Ollama(embedder) => { + embedder.embed_index(text_chunks, threads, embedder_stats) + } SubEmbedder::UserProvided(embedder) => embedder.embed_index(text_chunks), - SubEmbedder::Rest(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), + SubEmbedder::Rest(embedder) => { + embedder.embed_index(text_chunks, threads, embedder_stats) + } } } @@ -214,8 +222,12 @@ impl SubEmbedder { ) -> std::result::Result, EmbedError> { match self { SubEmbedder::HuggingFace(embedder) => embedder.embed_index_ref(texts), - SubEmbedder::OpenAi(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), - SubEmbedder::Ollama(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), + SubEmbedder::OpenAi(embedder) => { + embedder.embed_index_ref(texts, threads, embedder_stats) + } + SubEmbedder::Ollama(embedder) => { + embedder.embed_index_ref(texts, threads, embedder_stats) + } SubEmbedder::UserProvided(embedder) => embedder.embed_index_ref(texts), SubEmbedder::Rest(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), } diff --git a/crates/milli/src/vector/mod.rs b/crates/milli/src/vector/mod.rs index 124e17cff..efa981694 100644 --- a/crates/milli/src/vector/mod.rs +++ b/crates/milli/src/vector/mod.rs @@ -719,12 +719,14 @@ impl Embedder { } let embedding = match self { Embedder::HuggingFace(embedder) => embedder.embed_one(text), - Embedder::OpenAi(embedder) => { - embedder.embed(&[text], deadline, None)?.pop().ok_or_else(EmbedError::missing_embedding) - } - Embedder::Ollama(embedder) => { - embedder.embed(&[text], deadline, None)?.pop().ok_or_else(EmbedError::missing_embedding) - } + Embedder::OpenAi(embedder) => embedder + .embed(&[text], deadline, None)? + .pop() + .ok_or_else(EmbedError::missing_embedding), + Embedder::Ollama(embedder) => embedder + .embed(&[text], deadline, None)? + .pop() + .ok_or_else(EmbedError::missing_embedding), Embedder::UserProvided(embedder) => embedder.embed_one(text), Embedder::Rest(embedder) => embedder .embed_ref(&[text], deadline, None)? @@ -751,11 +753,17 @@ impl Embedder { ) -> std::result::Result>, EmbedError> { match self { Embedder::HuggingFace(embedder) => embedder.embed_index(text_chunks), - Embedder::OpenAi(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), - Embedder::Ollama(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), + Embedder::OpenAi(embedder) => { + embedder.embed_index(text_chunks, threads, embedder_stats) + } + Embedder::Ollama(embedder) => { + embedder.embed_index(text_chunks, threads, embedder_stats) + } Embedder::UserProvided(embedder) => embedder.embed_index(text_chunks), Embedder::Rest(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), - Embedder::Composite(embedder) => embedder.index.embed_index(text_chunks, threads, embedder_stats), + Embedder::Composite(embedder) => { + embedder.index.embed_index(text_chunks, threads, embedder_stats) + } } } @@ -772,7 +780,9 @@ impl Embedder { Embedder::Ollama(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), Embedder::UserProvided(embedder) => embedder.embed_index_ref(texts), Embedder::Rest(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), - Embedder::Composite(embedder) => embedder.index.embed_index_ref(texts, threads, embedder_stats), + Embedder::Composite(embedder) => { + embedder.index.embed_index_ref(texts, threads, embedder_stats) + } } } diff --git a/crates/milli/src/vector/ollama.rs b/crates/milli/src/vector/ollama.rs index b3ee925e6..e26b7e1ea 100644 --- a/crates/milli/src/vector/ollama.rs +++ b/crates/milli/src/vector/ollama.rs @@ -106,7 +106,7 @@ impl Embedder { &self, texts: &[S], deadline: Option, - embedder_stats: Option> + embedder_stats: Option>, ) -> Result, EmbedError> { match self.rest_embedder.embed_ref(texts, deadline, embedder_stats) { Ok(embeddings) => Ok(embeddings), @@ -126,11 +126,17 @@ impl Embedder { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { - text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None, embedder_stats.clone())).collect() + text_chunks + .into_iter() + .map(move |chunk| self.embed(&chunk, None, embedder_stats.clone())) + .collect() } else { threads .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None, embedder_stats.clone())).collect() + text_chunks + .into_par_iter() + .map(move |chunk| self.embed(&chunk, None, embedder_stats.clone())) + .collect() }) .map_err(|error| EmbedError { kind: EmbedErrorKind::PanicInThreadPool(error), @@ -143,7 +149,7 @@ impl Embedder { &self, texts: &[&str], threads: &ThreadPoolNoAbort, - embedder_stats: Option> + embedder_stats: Option>, ) -> Result>, EmbedError> { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. diff --git a/crates/milli/src/vector/openai.rs b/crates/milli/src/vector/openai.rs index 384abe880..ca072d6e5 100644 --- a/crates/milli/src/vector/openai.rs +++ b/crates/milli/src/vector/openai.rs @@ -241,7 +241,11 @@ impl Embedder { let encoded = self.tokenizer.encode_ordinary(text); let len = encoded.len(); if len < max_token_count { - all_embeddings.append(&mut self.rest_embedder.embed_ref(&[text], deadline, None)?); + all_embeddings.append(&mut self.rest_embedder.embed_ref( + &[text], + deadline, + None, + )?); continue; } @@ -263,11 +267,17 @@ impl Embedder { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { - text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None, embedder_stats.clone())).collect() + text_chunks + .into_iter() + .map(move |chunk| self.embed(&chunk, None, embedder_stats.clone())) + .collect() } else { threads .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None, embedder_stats.clone())).collect() + text_chunks + .into_par_iter() + .map(move |chunk| self.embed(&chunk, None, embedder_stats.clone())) + .collect() }) .map_err(|error| EmbedError { kind: EmbedErrorKind::PanicInThreadPool(error), diff --git a/crates/milli/src/vector/rest.rs b/crates/milli/src/vector/rest.rs index d8de89c6a..294b0ceda 100644 --- a/crates/milli/src/vector/rest.rs +++ b/crates/milli/src/vector/rest.rs @@ -14,8 +14,8 @@ use super::{ DistributionShift, EmbedError, Embedding, EmbeddingCache, NewEmbedderError, REQUEST_PARALLELISM, }; use crate::error::FaultSource; -use crate::ThreadPoolNoAbort; use crate::progress::EmbedderStats; +use crate::ThreadPoolNoAbort; // retrying in case of failure pub struct Retry { @@ -172,7 +172,14 @@ impl Embedder { deadline: Option, embedder_stats: Option>, ) -> Result, EmbedError> { - embed(&self.data, texts.as_slice(), texts.len(), Some(self.dimensions), deadline, embedder_stats) + embed( + &self.data, + texts.as_slice(), + texts.len(), + Some(self.dimensions), + deadline, + embedder_stats, + ) } pub fn embed_ref( @@ -206,11 +213,17 @@ impl Embedder { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { - text_chunks.into_iter().map(move |chunk| self.embed(chunk, None, embedder_stats.clone())).collect() + text_chunks + .into_iter() + .map(move |chunk| self.embed(chunk, None, embedder_stats.clone())) + .collect() } else { threads .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None, embedder_stats.clone())).collect() + text_chunks + .into_par_iter() + .map(move |chunk| self.embed(chunk, None, embedder_stats.clone())) + .collect() }) .map_err(|error| EmbedError { kind: EmbedErrorKind::PanicInThreadPool(error), @@ -223,7 +236,7 @@ impl Embedder { &self, texts: &[&str], threads: &ThreadPoolNoAbort, - embedder_stats: Option> + embedder_stats: Option>, ) -> Result, EmbedError> { // This condition helps reduce the number of active rayon jobs // so that we avoid consuming all the LMDB rtxns and avoid stack overflows.