diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index 06ec01b5e..8e1fb1c2c 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -1,7 +1,7 @@ use std::collections::BTreeSet; use std::fmt::Write; -use meilisearch_types::batches::{Batch, BatchEmbeddingStats, BatchEnqueuedAt, BatchStats}; +use meilisearch_types::batches::{Batch, EmbedderStatsView, BatchEnqueuedAt, BatchStats}; use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::{Database, RoTxn}; use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32}; diff --git a/crates/index-scheduler/src/queue/batches.rs b/crates/index-scheduler/src/queue/batches.rs index c82d5acd2..96a3940a5 100644 --- a/crates/index-scheduler/src/queue/batches.rs +++ b/crates/index-scheduler/src/queue/batches.rs @@ -1,7 +1,7 @@ use std::collections::HashSet; use std::ops::{Bound, RangeBounds}; -use meilisearch_types::batches::{Batch, BatchEmbeddingStats, BatchId}; +use meilisearch_types::batches::{Batch, EmbedderStatsView, BatchId}; use meilisearch_types::heed::types::{DecodeIgnore, SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls}; use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32}; @@ -92,10 +92,7 @@ impl BatchQueue { } pub(crate) fn get_batch(&self, rtxn: &RoTxn, batch_id: BatchId) -> Result> { - println!("Got batch from db {batch_id:?}"); - let r = Ok(self.all_batches.get(rtxn, &batch_id)?); - println!("Got batch from db => {:?}", r); - r + Ok(self.all_batches.get(rtxn, &batch_id)?) } /// Returns the whole set of batches that belongs to this index. @@ -174,8 +171,6 @@ impl BatchQueue { pub(crate) fn write_batch(&self, wtxn: &mut RwTxn, batch: ProcessingBatch) -> Result<()> { let old_batch = self.all_batches.get(wtxn, &batch.uid)?; - println!("Saving batch: {:?}", batch.embedder_stats); - self.all_batches.put( wtxn, &batch.uid, diff --git a/crates/index-scheduler/src/scheduler/create_batch.rs b/crates/index-scheduler/src/scheduler/create_batch.rs index fc20b6fd5..e3763881b 100644 --- a/crates/index-scheduler/src/scheduler/create_batch.rs +++ b/crates/index-scheduler/src/scheduler/create_batch.rs @@ -437,10 +437,8 @@ impl IndexScheduler { #[cfg(test)] self.maybe_fail(crate::test_utils::FailureLocation::InsideCreateBatch)?; - println!("create next batch"); let batch_id = self.queue.batches.next_batch_id(rtxn)?; let mut current_batch = ProcessingBatch::new(batch_id); - println!("over"); let enqueued = &self.queue.tasks.get_status(rtxn, Status::Enqueued)?; let count_total_enqueued = enqueued.len(); @@ -456,7 +454,6 @@ impl IndexScheduler { kind: Kind::TaskCancelation, id: task_id, }); - println!("task cancelled"); return Ok(Some((Batch::TaskCancelation { task }, current_batch))); } @@ -527,7 +524,7 @@ impl IndexScheduler { } // 5. We make a batch from the unprioritised tasks. Start by taking the next enqueued task. - let task_id = if let Some(task_id) = enqueued.min() { task_id } else { println!("return"); return Ok(None) }; + let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) }; let mut task = self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; @@ -605,7 +602,6 @@ impl IndexScheduler { autobatcher::autobatch(enqueued, index_already_exists, primary_key.as_deref()) { current_batch.reason(autobatch_stop_reason.unwrap_or(stop_reason)); - println!("autobatch"); return Ok(self .create_next_batch_index( rtxn, @@ -619,7 +615,6 @@ impl IndexScheduler { // If we found no tasks then we were notified for something that got autobatched // somehow and there is nothing to do. - println!("nothing to do"); Ok(None) } } diff --git a/crates/index-scheduler/src/utils.rs b/crates/index-scheduler/src/utils.rs index 455b6a2e7..226ef9f06 100644 --- a/crates/index-scheduler/src/utils.rs +++ b/crates/index-scheduler/src/utils.rs @@ -5,7 +5,7 @@ use std::ops::Bound; use std::sync::Arc; use crate::milli::progress::EmbedderStats; -use meilisearch_types::batches::{Batch, BatchEmbeddingStats, BatchEnqueuedAt, BatchId, BatchStats}; +use meilisearch_types::batches::{Batch, EmbedderStatsView, BatchEnqueuedAt, BatchId, BatchStats}; use meilisearch_types::heed::{Database, RoTxn, RwTxn}; use meilisearch_types::milli::CboRoaringBitmapCodec; use meilisearch_types::task_view::DetailsView; @@ -47,8 +47,6 @@ impl ProcessingBatch { let mut statuses = HashSet::default(); statuses.insert(Status::Processing); - println!("Processing batch created: {}", uid); - Self { uid, details: DetailsView::default(), @@ -104,14 +102,11 @@ impl ProcessingBatch { } pub fn reason(&mut self, reason: BatchStopReason) { - println!("batch stopped: {:?}", reason); self.reason = reason; } /// Must be called once the batch has finished processing. pub fn finished(&mut self) { - println!("Batch finished: {}", self.uid); - self.details = DetailsView::default(); self.stats = BatchStats::default(); self.finished_at = Some(OffsetDateTime::now_utc()); @@ -126,8 +121,6 @@ impl ProcessingBatch { /// Update the timestamp of the tasks and the inner structure of this structure. pub fn update(&mut self, task: &mut Task) { - println!("Updating task: {} in batch: {}", task.uid, self.uid); - // We must re-set this value in case we're dealing with a task that has been added between // the `processing` and `finished` state // We must re-set this value in case we're dealing with a task that has been added between @@ -152,7 +145,6 @@ impl ProcessingBatch { } pub fn to_batch(&self) -> Batch { - println!("Converting to batch: {:?} {:?}", self.uid, self.embedder_stats); Batch { uid: self.uid, progress: None, diff --git a/crates/meilisearch-types/src/batch_view.rs b/crates/meilisearch-types/src/batch_view.rs index bd56f5b1a..aced97d7a 100644 --- a/crates/meilisearch-types/src/batch_view.rs +++ b/crates/meilisearch-types/src/batch_view.rs @@ -3,7 +3,7 @@ use serde::Serialize; use time::{Duration, OffsetDateTime}; use utoipa::ToSchema; -use crate::batches::{Batch, BatchEmbeddingStats, BatchId, BatchStats}; +use crate::batches::{Batch, EmbedderStatsView, BatchId, BatchStats}; use crate::task_view::DetailsView; use crate::tasks::serialize_duration; @@ -31,8 +31,8 @@ pub struct BatchView { pub struct BatchStatsView { #[serde(flatten)] pub stats: BatchStats, - #[serde(skip_serializing_if = "BatchEmbeddingStats::skip_serializing", default)] - pub embedder: BatchEmbeddingStats, + #[serde(skip_serializing_if = "EmbedderStatsView::skip_serializing", default)] + pub embedder: EmbedderStatsView, } impl BatchView { diff --git a/crates/meilisearch-types/src/batches.rs b/crates/meilisearch-types/src/batches.rs index e1c9411b6..45cc2d9f4 100644 --- a/crates/meilisearch-types/src/batches.rs +++ b/crates/meilisearch-types/src/batches.rs @@ -20,8 +20,8 @@ pub struct Batch { pub progress: Option, pub details: DetailsView, pub stats: BatchStats, - #[serde(skip_serializing_if = "BatchEmbeddingStats::skip_serializing", default)] - pub embedder_stats: BatchEmbeddingStats, + #[serde(skip_serializing_if = "EmbedderStatsView::skip_serializing", default)] + pub embedder_stats: EmbedderStatsView, #[serde(with = "time::serde::rfc3339")] pub started_at: OffsetDateTime, @@ -92,25 +92,25 @@ pub struct BatchStats { #[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "camelCase")] #[schema(rename_all = "camelCase")] -pub struct BatchEmbeddingStats { +pub struct EmbedderStatsView { pub total_count: usize, pub error_count: usize, - #[serde(skip_serializing_if = "Option::is_none")] + #[serde(skip_serializing_if = "Option::is_none", default)] pub last_error: Option, } -impl From<&EmbedderStats> for BatchEmbeddingStats { +impl From<&EmbedderStats> for EmbedderStatsView { fn from(stats: &EmbedderStats) -> Self { let errors = stats.errors.read().unwrap(); Self { - total_count: stats.total_requests.load(std::sync::atomic::Ordering::Relaxed), + total_count: stats.total_count.load(std::sync::atomic::Ordering::Relaxed), error_count: errors.1 as usize, last_error: errors.0.clone(), } } } -impl BatchEmbeddingStats { +impl EmbedderStatsView { pub fn skip_serializing(&self) -> bool { self.total_count == 0 && self.error_count == 0 && self.last_error.is_none() } diff --git a/crates/meilisearch/tests/vector/rest.rs b/crates/meilisearch/tests/vector/rest.rs index 54ed52213..1fdd18d28 100644 --- a/crates/meilisearch/tests/vector/rest.rs +++ b/crates/meilisearch/tests/vector/rest.rs @@ -2170,7 +2170,7 @@ async fn searchable_reindex() { #[actix_rt::test] -async fn observability() { +async fn last_error_stats() { let (sender, mut receiver) = mpsc::channel(10); let (_mock, setting) = create_faulty_mock_raw(sender).await; let server = get_server_vector().await; @@ -2187,7 +2187,7 @@ async fn observability() { let task = server.wait_task(response.uid()).await; snapshot!(task["status"], @r###""succeeded""###); let documents = json!([ - {"id": 0, "name": "will_return_500"}, // Stuff that doesn't exist + {"id": 0, "name": "will_return_500"}, {"id": 1, "name": "will_error"}, {"id": 2, "name": "must_error"}, ]); @@ -2195,9 +2195,9 @@ async fn observability() { snapshot!(code, @"202 Accepted"); // The task will eventually fail, so let's not wait for it. - // Let's just wait for the server to block + // Let's just wait for the server's signal receiver.recv().await; - let batches = index.filtered_batches(&[], &[], &[]).await; - snapshot!(task, @r###""###); + let (response, _code) = index.filtered_batches(&[], &[], &[]).await; + snapshot!(response["results"][0], @r###""###); } diff --git a/crates/milli/src/progress.rs b/crates/milli/src/progress.rs index 7026f0c11..8cd2c9336 100644 --- a/crates/milli/src/progress.rs +++ b/crates/milli/src/progress.rs @@ -25,15 +25,15 @@ pub struct Progress { #[derive(Default)] pub struct EmbedderStats { pub errors: Arc, u32)>>, - pub total_requests: AtomicUsize + pub total_count: AtomicUsize } impl std::fmt::Debug for EmbedderStats { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let (error, count) = self.errors.read().unwrap().clone(); f.debug_struct("EmbedderStats") - .field("errors", &error) - .field("total_requests", &self.total_requests.load(Ordering::Relaxed)) + .field("last_error", &error) + .field("total_count", &self.total_count.load(Ordering::Relaxed)) .field("error_count", &count) .finish() } diff --git a/crates/milli/src/vector/rest.rs b/crates/milli/src/vector/rest.rs index 706a411fb..d8de89c6a 100644 --- a/crates/milli/src/vector/rest.rs +++ b/crates/milli/src/vector/rest.rs @@ -295,10 +295,6 @@ fn embed( where S: Serialize, { - use std::backtrace::Backtrace; - - println!("Embedder stats? {}", embedder_stats.is_some()); - let request = data.client.post(&data.url); let request = if let Some(bearer) = &data.bearer { request.set("Authorization", bearer) @@ -314,9 +310,8 @@ where for attempt in 0..10 { if let Some(embedder_stats) = &embedder_stats { - embedder_stats.as_ref().total_requests.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + embedder_stats.as_ref().total_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); } - // TODO: also catch 403 errors let response = request.clone().send_json(&body); let result = check_response(response, data.configuration_source).and_then(|response| { response_to_embedding(response, data, expected_count, expected_dimension) @@ -358,7 +353,7 @@ where } if let Some(embedder_stats) = &embedder_stats { - embedder_stats.as_ref().total_requests.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + embedder_stats.as_ref().total_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); } let response = request.send_json(&body); let result = check_response(response, data.configuration_source).and_then(|response| {