From 2f82d945028fce52efaf704c2b0a4060093369fa Mon Sep 17 00:00:00 2001 From: Mubelotix Date: Mon, 23 Jun 2025 18:55:23 +0200 Subject: [PATCH] Fix the test and simplify types --- crates/dump/src/lib.rs | 2 +- crates/index-scheduler/src/queue/batches.rs | 4 +- .../src/scheduler/create_batch.rs | 7 +- .../src/scheduler/process_batch.rs | 13 +--- crates/index-scheduler/src/utils.rs | 26 +++---- crates/meilisearch-types/src/batch_view.rs | 4 +- crates/meilisearch-types/src/batches.rs | 10 ++- crates/meilisearch/tests/vector/rest.rs | 71 +++++++++++++++---- crates/milli/src/vector/rest.rs | 1 + 9 files changed, 87 insertions(+), 51 deletions(-) diff --git a/crates/dump/src/lib.rs b/crates/dump/src/lib.rs index b7a35ad5c..a84ec4ba5 100644 --- a/crates/dump/src/lib.rs +++ b/crates/dump/src/lib.rs @@ -329,7 +329,7 @@ pub(crate) mod test { write_channel_congestion: None, internal_database_sizes: Default::default(), }, - embedder_stats: None, + embedder_stats: Default::default(), enqueued_at: Some(BatchEnqueuedAt { earliest: datetime!(2022-11-11 0:00 UTC), oldest: datetime!(2022-11-11 0:00 UTC), diff --git a/crates/index-scheduler/src/queue/batches.rs b/crates/index-scheduler/src/queue/batches.rs index b14601733..c82d5acd2 100644 --- a/crates/index-scheduler/src/queue/batches.rs +++ b/crates/index-scheduler/src/queue/batches.rs @@ -174,7 +174,7 @@ impl BatchQueue { pub(crate) fn write_batch(&self, wtxn: &mut RwTxn, batch: ProcessingBatch) -> Result<()> { let old_batch = self.all_batches.get(wtxn, &batch.uid)?; - println!("Saving batch: {}", batch.embedder_stats.is_some()); + println!("Saving batch: {:?}", batch.embedder_stats); self.all_batches.put( wtxn, @@ -184,7 +184,7 @@ impl BatchQueue { progress: None, details: batch.details, stats: batch.stats, - embedder_stats: batch.embedder_stats.as_ref().map(|s| BatchEmbeddingStats::from(s.as_ref())), + embedder_stats: batch.embedder_stats.as_ref().into(), started_at: batch.started_at, finished_at: batch.finished_at, enqueued_at: batch.enqueued_at, diff --git a/crates/index-scheduler/src/scheduler/create_batch.rs b/crates/index-scheduler/src/scheduler/create_batch.rs index e3763881b..fc20b6fd5 100644 --- a/crates/index-scheduler/src/scheduler/create_batch.rs +++ b/crates/index-scheduler/src/scheduler/create_batch.rs @@ -437,8 +437,10 @@ impl IndexScheduler { #[cfg(test)] self.maybe_fail(crate::test_utils::FailureLocation::InsideCreateBatch)?; + println!("create next batch"); let batch_id = self.queue.batches.next_batch_id(rtxn)?; let mut current_batch = ProcessingBatch::new(batch_id); + println!("over"); let enqueued = &self.queue.tasks.get_status(rtxn, Status::Enqueued)?; let count_total_enqueued = enqueued.len(); @@ -454,6 +456,7 @@ impl IndexScheduler { kind: Kind::TaskCancelation, id: task_id, }); + println!("task cancelled"); return Ok(Some((Batch::TaskCancelation { task }, current_batch))); } @@ -524,7 +527,7 @@ impl IndexScheduler { } // 5. We make a batch from the unprioritised tasks. Start by taking the next enqueued task. - let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) }; + let task_id = if let Some(task_id) = enqueued.min() { task_id } else { println!("return"); return Ok(None) }; let mut task = self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; @@ -602,6 +605,7 @@ impl IndexScheduler { autobatcher::autobatch(enqueued, index_already_exists, primary_key.as_deref()) { current_batch.reason(autobatch_stop_reason.unwrap_or(stop_reason)); + println!("autobatch"); return Ok(self .create_next_batch_index( rtxn, @@ -615,6 +619,7 @@ impl IndexScheduler { // If we found no tasks then we were notified for something that got autobatched // somehow and there is nothing to do. + println!("nothing to do"); Ok(None) } } diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 4e36b65b6..c5305cf21 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -164,7 +164,7 @@ impl IndexScheduler { let pre_commit_dabases_sizes = index.database_sizes(&index_wtxn)?; let (tasks, congestion) = - self.apply_index_operation(&mut index_wtxn, &index, op, &progress, current_batch.clone_embedder_stats())?; + self.apply_index_operation(&mut index_wtxn, &index, op, &progress, current_batch.embedder_stats.clone())?; { progress.update_progress(FinalizingIndexStep::Committing); @@ -240,20 +240,11 @@ impl IndexScheduler { builder.set_primary_key(primary_key); let must_stop_processing = self.scheduler.must_stop_processing.clone(); - let embedder_stats = match current_batch.embedder_stats { - Some(ref stats) => stats.clone(), - None => { - let embedder_stats: Arc = Default::default(); - current_batch.embedder_stats = Some(embedder_stats.clone()); - embedder_stats - }, - }; - builder .execute( |indexing_step| tracing::debug!(update = ?indexing_step), || must_stop_processing.get(), - embedder_stats, + current_batch.embedder_stats.clone(), ) .map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?; index_wtxn.commit()?; diff --git a/crates/index-scheduler/src/utils.rs b/crates/index-scheduler/src/utils.rs index 22e319580..455b6a2e7 100644 --- a/crates/index-scheduler/src/utils.rs +++ b/crates/index-scheduler/src/utils.rs @@ -29,7 +29,7 @@ pub struct ProcessingBatch { pub uid: BatchId, pub details: DetailsView, pub stats: BatchStats, - pub embedder_stats: Option>, + pub embedder_stats: Arc, pub statuses: HashSet, pub kinds: HashSet, @@ -47,11 +47,13 @@ impl ProcessingBatch { let mut statuses = HashSet::default(); statuses.insert(Status::Processing); + println!("Processing batch created: {}", uid); + Self { uid, details: DetailsView::default(), stats: BatchStats::default(), - embedder_stats: None, + embedder_stats: Default::default(), statuses, kinds: HashSet::default(), @@ -64,17 +66,6 @@ impl ProcessingBatch { } } - pub fn clone_embedder_stats(&mut self) -> Arc { - match self.embedder_stats { - Some(ref stats) => stats.clone(), - None => { - let embedder_stats: Arc = Default::default(); - self.embedder_stats = Some(embedder_stats.clone()); - embedder_stats - }, - } - } - /// Update itself with the content of the task and update the batch id in the task. pub fn processing<'a>(&mut self, tasks: impl IntoIterator) { for task in tasks.into_iter() { @@ -113,11 +104,14 @@ impl ProcessingBatch { } pub fn reason(&mut self, reason: BatchStopReason) { + println!("batch stopped: {:?}", reason); self.reason = reason; } /// Must be called once the batch has finished processing. pub fn finished(&mut self) { + println!("Batch finished: {}", self.uid); + self.details = DetailsView::default(); self.stats = BatchStats::default(); self.finished_at = Some(OffsetDateTime::now_utc()); @@ -132,6 +126,8 @@ impl ProcessingBatch { /// Update the timestamp of the tasks and the inner structure of this structure. pub fn update(&mut self, task: &mut Task) { + println!("Updating task: {} in batch: {}", task.uid, self.uid); + // We must re-set this value in case we're dealing with a task that has been added between // the `processing` and `finished` state // We must re-set this value in case we're dealing with a task that has been added between @@ -156,13 +152,13 @@ impl ProcessingBatch { } pub fn to_batch(&self) -> Batch { - println!("Converting to batch: {:?}", self.embedder_stats); + println!("Converting to batch: {:?} {:?}", self.uid, self.embedder_stats); Batch { uid: self.uid, progress: None, details: self.details.clone(), stats: self.stats.clone(), - embedder_stats: self.embedder_stats.as_ref().map(|s| BatchEmbeddingStats::from(s.as_ref())), + embedder_stats: self.embedder_stats.as_ref().into(), started_at: self.started_at, finished_at: self.finished_at, enqueued_at: self.enqueued_at, diff --git a/crates/meilisearch-types/src/batch_view.rs b/crates/meilisearch-types/src/batch_view.rs index 0a9b80f4e..bd56f5b1a 100644 --- a/crates/meilisearch-types/src/batch_view.rs +++ b/crates/meilisearch-types/src/batch_view.rs @@ -31,8 +31,8 @@ pub struct BatchView { pub struct BatchStatsView { #[serde(flatten)] pub stats: BatchStats, - #[serde(skip_serializing_if = "BatchEmbeddingStats::skip_serializing")] - pub embedder: Option, + #[serde(skip_serializing_if = "BatchEmbeddingStats::skip_serializing", default)] + pub embedder: BatchEmbeddingStats, } impl BatchView { diff --git a/crates/meilisearch-types/src/batches.rs b/crates/meilisearch-types/src/batches.rs index 24be75d1c..e1c9411b6 100644 --- a/crates/meilisearch-types/src/batches.rs +++ b/crates/meilisearch-types/src/batches.rs @@ -20,7 +20,8 @@ pub struct Batch { pub progress: Option, pub details: DetailsView, pub stats: BatchStats, - pub embedder_stats: Option, + #[serde(skip_serializing_if = "BatchEmbeddingStats::skip_serializing", default)] + pub embedder_stats: BatchEmbeddingStats, #[serde(with = "time::serde::rfc3339")] pub started_at: OffsetDateTime, @@ -110,10 +111,7 @@ impl From<&EmbedderStats> for BatchEmbeddingStats { } impl BatchEmbeddingStats { - pub fn skip_serializing(this: &Option) -> bool { - match this { - Some(stats) => stats.total_count == 0 && stats.error_count == 0 && stats.last_error.is_none(), - None => true, - } + pub fn skip_serializing(&self) -> bool { + self.total_count == 0 && self.error_count == 0 && self.last_error.is_none() } } diff --git a/crates/meilisearch/tests/vector/rest.rs b/crates/meilisearch/tests/vector/rest.rs index 1ff2dd9fe..156a2f07b 100644 --- a/crates/meilisearch/tests/vector/rest.rs +++ b/crates/meilisearch/tests/vector/rest.rs @@ -1,10 +1,12 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use meili_snap::{json_string, snapshot}; use reqwest::IntoUrl; +use tokio::spawn; +use tokio::sync::mpsc; use wiremock::matchers::{method, path}; use wiremock::{Mock, MockServer, Request, ResponseTemplate}; -use std::thread::sleep; +use tokio::time::sleep; use std::time::Duration; use crate::common::Value; @@ -307,7 +309,6 @@ async fn create_mock_raw() -> (MockServer, Value) { Mock::given(method("POST")) .and(path("/")) .respond_with(move |req: &Request| { - println!("Sent!"); let req: String = match req.body_json() { Ok(req) => req, Err(error) => { @@ -337,6 +338,50 @@ async fn create_mock_raw() -> (MockServer, Value) { (mock_server, embedder_settings) } +/// A mock server that returns 500 errors, and sends a message once 5 requests are received +async fn create_faulty_mock_raw(mut sender: mpsc::Sender<()>) -> (MockServer, Value) { + let mock_server = MockServer::start().await; + + Mock::given(method("POST")) + .and(path("/")) + .respond_with(move |req: &Request| { + let req: String = match req.body_json() { + Ok(req) => req, + Err(error) => { + return ResponseTemplate::new(400).set_body_json(json!({ + "error": format!("Invalid request: {error}") + })); + } + }; + + let sender = sender.clone(); + spawn(async move { + sender.send(()).await; + }); + + ResponseTemplate::new(500) + .set_delay(Duration::from_millis(500)) + .set_body_json(json!({ + "error": "Service Unavailable", + "text": req + })) + }) + .mount(&mock_server) + .await; + let url = mock_server.uri(); + + let embedder_settings = json!({ + "source": "rest", + "url": url, + "dimensions": 3, + "request": "{{text}}", + "response": "{{embedding}}", + "documentTemplate": "{{doc.name}}" + }); + + (mock_server, embedder_settings) +} + pub async fn post(url: T, text: &str) -> reqwest::Result { reqwest::Client::builder().build()?.post(url).json(&json!(text)).send().await } @@ -2118,7 +2163,8 @@ async fn searchable_reindex() { #[actix_rt::test] async fn observability() { - let (_mock, setting) = create_mock_raw().await; + let (sender, mut receiver) = mpsc::channel(10); + let (_mock, setting) = create_faulty_mock_raw(sender).await; let server = get_server_vector().await; let index = server.index("doggo"); @@ -2133,20 +2179,19 @@ async fn observability() { let task = server.wait_task(response.uid()).await; snapshot!(task["status"], @r###""succeeded""###); let documents = json!([ - {"id": 0, "name": "kefir"}, - {"id": 1, "name": "echo", "_vectors": { "rest": [1, 1, 1] }}, - {"id": 2, "name": "intel"}, - {"id": 3, "name": "missing"}, // Stuff that doesn't exist - {"id": 4, "name": "invalid"}, - {"id": 5, "name": "foobar"}, + {"id": 0, "name": "will_return_500"}, // Stuff that doesn't exist + {"id": 1, "name": "will_error"}, + {"id": 2, "name": "must_error"}, ]); let (value, code) = index.add_documents(documents, None).await; snapshot!(code, @"202 Accepted"); - let batches = index.filtered_batches(&[], &[], &[]).await; - println!("Batches: {batches:?}"); + // The task will eventually fail, so let's not wait for it. + // Let's just wait for 5 errors from the mock server. + for _errors in 0..5 { + receiver.recv().await; + } - let task = index.wait_task(value.uid()).await; let batches = index.filtered_batches(&[], &[], &[]).await; println!("Batches: {batches:?}"); diff --git a/crates/milli/src/vector/rest.rs b/crates/milli/src/vector/rest.rs index 9aeb73f42..706a411fb 100644 --- a/crates/milli/src/vector/rest.rs +++ b/crates/milli/src/vector/rest.rs @@ -316,6 +316,7 @@ where if let Some(embedder_stats) = &embedder_stats { embedder_stats.as_ref().total_requests.fetch_add(1, std::sync::atomic::Ordering::Relaxed); } + // TODO: also catch 403 errors let response = request.clone().send_json(&body); let result = check_response(response, data.configuration_source).and_then(|response| { response_to_embedding(response, data, expected_count, expected_dimension)