mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-01 19:08:29 +02:00
Improve code quality
This commit is contained in:
parent
59a1c5d9a7
commit
4a179fb3c0
@ -1,7 +1,7 @@
|
||||
use std::collections::BTreeSet;
|
||||
use std::fmt::Write;
|
||||
|
||||
use meilisearch_types::batches::{Batch, BatchEmbeddingStats, BatchEnqueuedAt, BatchStats};
|
||||
use meilisearch_types::batches::{Batch, EmbedderStatsView, BatchEnqueuedAt, BatchStats};
|
||||
use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str};
|
||||
use meilisearch_types::heed::{Database, RoTxn};
|
||||
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
|
||||
|
@ -1,7 +1,7 @@
|
||||
use std::collections::HashSet;
|
||||
use std::ops::{Bound, RangeBounds};
|
||||
|
||||
use meilisearch_types::batches::{Batch, BatchEmbeddingStats, BatchId};
|
||||
use meilisearch_types::batches::{Batch, EmbedderStatsView, BatchId};
|
||||
use meilisearch_types::heed::types::{DecodeIgnore, SerdeBincode, SerdeJson, Str};
|
||||
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
|
||||
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
|
||||
@ -92,10 +92,7 @@ impl BatchQueue {
|
||||
}
|
||||
|
||||
pub(crate) fn get_batch(&self, rtxn: &RoTxn, batch_id: BatchId) -> Result<Option<Batch>> {
|
||||
println!("Got batch from db {batch_id:?}");
|
||||
let r = Ok(self.all_batches.get(rtxn, &batch_id)?);
|
||||
println!("Got batch from db => {:?}", r);
|
||||
r
|
||||
Ok(self.all_batches.get(rtxn, &batch_id)?)
|
||||
}
|
||||
|
||||
/// Returns the whole set of batches that belongs to this index.
|
||||
@ -174,8 +171,6 @@ impl BatchQueue {
|
||||
pub(crate) fn write_batch(&self, wtxn: &mut RwTxn, batch: ProcessingBatch) -> Result<()> {
|
||||
let old_batch = self.all_batches.get(wtxn, &batch.uid)?;
|
||||
|
||||
println!("Saving batch: {:?}", batch.embedder_stats);
|
||||
|
||||
self.all_batches.put(
|
||||
wtxn,
|
||||
&batch.uid,
|
||||
|
@ -437,10 +437,8 @@ impl IndexScheduler {
|
||||
#[cfg(test)]
|
||||
self.maybe_fail(crate::test_utils::FailureLocation::InsideCreateBatch)?;
|
||||
|
||||
println!("create next batch");
|
||||
let batch_id = self.queue.batches.next_batch_id(rtxn)?;
|
||||
let mut current_batch = ProcessingBatch::new(batch_id);
|
||||
println!("over");
|
||||
|
||||
let enqueued = &self.queue.tasks.get_status(rtxn, Status::Enqueued)?;
|
||||
let count_total_enqueued = enqueued.len();
|
||||
@ -456,7 +454,6 @@ impl IndexScheduler {
|
||||
kind: Kind::TaskCancelation,
|
||||
id: task_id,
|
||||
});
|
||||
println!("task cancelled");
|
||||
return Ok(Some((Batch::TaskCancelation { task }, current_batch)));
|
||||
}
|
||||
|
||||
@ -527,7 +524,7 @@ impl IndexScheduler {
|
||||
}
|
||||
|
||||
// 5. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
|
||||
let task_id = if let Some(task_id) = enqueued.min() { task_id } else { println!("return"); return Ok(None) };
|
||||
let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) };
|
||||
let mut task =
|
||||
self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||
|
||||
@ -605,7 +602,6 @@ impl IndexScheduler {
|
||||
autobatcher::autobatch(enqueued, index_already_exists, primary_key.as_deref())
|
||||
{
|
||||
current_batch.reason(autobatch_stop_reason.unwrap_or(stop_reason));
|
||||
println!("autobatch");
|
||||
return Ok(self
|
||||
.create_next_batch_index(
|
||||
rtxn,
|
||||
@ -619,7 +615,6 @@ impl IndexScheduler {
|
||||
|
||||
// If we found no tasks then we were notified for something that got autobatched
|
||||
// somehow and there is nothing to do.
|
||||
println!("nothing to do");
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ use std::ops::Bound;
|
||||
use std::sync::Arc;
|
||||
use crate::milli::progress::EmbedderStats;
|
||||
|
||||
use meilisearch_types::batches::{Batch, BatchEmbeddingStats, BatchEnqueuedAt, BatchId, BatchStats};
|
||||
use meilisearch_types::batches::{Batch, EmbedderStatsView, BatchEnqueuedAt, BatchId, BatchStats};
|
||||
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
|
||||
use meilisearch_types::milli::CboRoaringBitmapCodec;
|
||||
use meilisearch_types::task_view::DetailsView;
|
||||
@ -47,8 +47,6 @@ impl ProcessingBatch {
|
||||
let mut statuses = HashSet::default();
|
||||
statuses.insert(Status::Processing);
|
||||
|
||||
println!("Processing batch created: {}", uid);
|
||||
|
||||
Self {
|
||||
uid,
|
||||
details: DetailsView::default(),
|
||||
@ -104,14 +102,11 @@ impl ProcessingBatch {
|
||||
}
|
||||
|
||||
pub fn reason(&mut self, reason: BatchStopReason) {
|
||||
println!("batch stopped: {:?}", reason);
|
||||
self.reason = reason;
|
||||
}
|
||||
|
||||
/// Must be called once the batch has finished processing.
|
||||
pub fn finished(&mut self) {
|
||||
println!("Batch finished: {}", self.uid);
|
||||
|
||||
self.details = DetailsView::default();
|
||||
self.stats = BatchStats::default();
|
||||
self.finished_at = Some(OffsetDateTime::now_utc());
|
||||
@ -126,8 +121,6 @@ impl ProcessingBatch {
|
||||
|
||||
/// Update the timestamp of the tasks and the inner structure of this structure.
|
||||
pub fn update(&mut self, task: &mut Task) {
|
||||
println!("Updating task: {} in batch: {}", task.uid, self.uid);
|
||||
|
||||
// We must re-set this value in case we're dealing with a task that has been added between
|
||||
// the `processing` and `finished` state
|
||||
// We must re-set this value in case we're dealing with a task that has been added between
|
||||
@ -152,7 +145,6 @@ impl ProcessingBatch {
|
||||
}
|
||||
|
||||
pub fn to_batch(&self) -> Batch {
|
||||
println!("Converting to batch: {:?} {:?}", self.uid, self.embedder_stats);
|
||||
Batch {
|
||||
uid: self.uid,
|
||||
progress: None,
|
||||
|
@ -3,7 +3,7 @@ use serde::Serialize;
|
||||
use time::{Duration, OffsetDateTime};
|
||||
use utoipa::ToSchema;
|
||||
|
||||
use crate::batches::{Batch, BatchEmbeddingStats, BatchId, BatchStats};
|
||||
use crate::batches::{Batch, EmbedderStatsView, BatchId, BatchStats};
|
||||
use crate::task_view::DetailsView;
|
||||
use crate::tasks::serialize_duration;
|
||||
|
||||
@ -31,8 +31,8 @@ pub struct BatchView {
|
||||
pub struct BatchStatsView {
|
||||
#[serde(flatten)]
|
||||
pub stats: BatchStats,
|
||||
#[serde(skip_serializing_if = "BatchEmbeddingStats::skip_serializing", default)]
|
||||
pub embedder: BatchEmbeddingStats,
|
||||
#[serde(skip_serializing_if = "EmbedderStatsView::skip_serializing", default)]
|
||||
pub embedder: EmbedderStatsView,
|
||||
}
|
||||
|
||||
impl BatchView {
|
||||
|
@ -20,8 +20,8 @@ pub struct Batch {
|
||||
pub progress: Option<ProgressView>,
|
||||
pub details: DetailsView,
|
||||
pub stats: BatchStats,
|
||||
#[serde(skip_serializing_if = "BatchEmbeddingStats::skip_serializing", default)]
|
||||
pub embedder_stats: BatchEmbeddingStats,
|
||||
#[serde(skip_serializing_if = "EmbedderStatsView::skip_serializing", default)]
|
||||
pub embedder_stats: EmbedderStatsView,
|
||||
|
||||
#[serde(with = "time::serde::rfc3339")]
|
||||
pub started_at: OffsetDateTime,
|
||||
@ -92,25 +92,25 @@ pub struct BatchStats {
|
||||
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[schema(rename_all = "camelCase")]
|
||||
pub struct BatchEmbeddingStats {
|
||||
pub struct EmbedderStatsView {
|
||||
pub total_count: usize,
|
||||
pub error_count: usize,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(skip_serializing_if = "Option::is_none", default)]
|
||||
pub last_error: Option<String>,
|
||||
}
|
||||
|
||||
impl From<&EmbedderStats> for BatchEmbeddingStats {
|
||||
impl From<&EmbedderStats> for EmbedderStatsView {
|
||||
fn from(stats: &EmbedderStats) -> Self {
|
||||
let errors = stats.errors.read().unwrap();
|
||||
Self {
|
||||
total_count: stats.total_requests.load(std::sync::atomic::Ordering::Relaxed),
|
||||
total_count: stats.total_count.load(std::sync::atomic::Ordering::Relaxed),
|
||||
error_count: errors.1 as usize,
|
||||
last_error: errors.0.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl BatchEmbeddingStats {
|
||||
impl EmbedderStatsView {
|
||||
pub fn skip_serializing(&self) -> bool {
|
||||
self.total_count == 0 && self.error_count == 0 && self.last_error.is_none()
|
||||
}
|
||||
|
@ -2170,7 +2170,7 @@ async fn searchable_reindex() {
|
||||
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn observability() {
|
||||
async fn last_error_stats() {
|
||||
let (sender, mut receiver) = mpsc::channel(10);
|
||||
let (_mock, setting) = create_faulty_mock_raw(sender).await;
|
||||
let server = get_server_vector().await;
|
||||
@ -2187,7 +2187,7 @@ async fn observability() {
|
||||
let task = server.wait_task(response.uid()).await;
|
||||
snapshot!(task["status"], @r###""succeeded""###);
|
||||
let documents = json!([
|
||||
{"id": 0, "name": "will_return_500"}, // Stuff that doesn't exist
|
||||
{"id": 0, "name": "will_return_500"},
|
||||
{"id": 1, "name": "will_error"},
|
||||
{"id": 2, "name": "must_error"},
|
||||
]);
|
||||
@ -2195,9 +2195,9 @@ async fn observability() {
|
||||
snapshot!(code, @"202 Accepted");
|
||||
|
||||
// The task will eventually fail, so let's not wait for it.
|
||||
// Let's just wait for the server to block
|
||||
// Let's just wait for the server's signal
|
||||
receiver.recv().await;
|
||||
|
||||
let batches = index.filtered_batches(&[], &[], &[]).await;
|
||||
snapshot!(task, @r###""###);
|
||||
let (response, _code) = index.filtered_batches(&[], &[], &[]).await;
|
||||
snapshot!(response["results"][0], @r###""###);
|
||||
}
|
||||
|
@ -25,15 +25,15 @@ pub struct Progress {
|
||||
#[derive(Default)]
|
||||
pub struct EmbedderStats {
|
||||
pub errors: Arc<RwLock<(Option<String>, u32)>>,
|
||||
pub total_requests: AtomicUsize
|
||||
pub total_count: AtomicUsize
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for EmbedderStats {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
let (error, count) = self.errors.read().unwrap().clone();
|
||||
f.debug_struct("EmbedderStats")
|
||||
.field("errors", &error)
|
||||
.field("total_requests", &self.total_requests.load(Ordering::Relaxed))
|
||||
.field("last_error", &error)
|
||||
.field("total_count", &self.total_count.load(Ordering::Relaxed))
|
||||
.field("error_count", &count)
|
||||
.finish()
|
||||
}
|
||||
|
@ -295,10 +295,6 @@ fn embed<S>(
|
||||
where
|
||||
S: Serialize,
|
||||
{
|
||||
use std::backtrace::Backtrace;
|
||||
|
||||
println!("Embedder stats? {}", embedder_stats.is_some());
|
||||
|
||||
let request = data.client.post(&data.url);
|
||||
let request = if let Some(bearer) = &data.bearer {
|
||||
request.set("Authorization", bearer)
|
||||
@ -314,9 +310,8 @@ where
|
||||
|
||||
for attempt in 0..10 {
|
||||
if let Some(embedder_stats) = &embedder_stats {
|
||||
embedder_stats.as_ref().total_requests.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
embedder_stats.as_ref().total_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
// TODO: also catch 403 errors
|
||||
let response = request.clone().send_json(&body);
|
||||
let result = check_response(response, data.configuration_source).and_then(|response| {
|
||||
response_to_embedding(response, data, expected_count, expected_dimension)
|
||||
@ -358,7 +353,7 @@ where
|
||||
}
|
||||
|
||||
if let Some(embedder_stats) = &embedder_stats {
|
||||
embedder_stats.as_ref().total_requests.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
embedder_stats.as_ref().total_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
let response = request.send_json(&body);
|
||||
let result = check_response(response, data.configuration_source).and_then(|response| {
|
||||
|
Loading…
x
Reference in New Issue
Block a user