mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-01 19:08:29 +02:00
Fix the test and simplify types
This commit is contained in:
parent
4925b30196
commit
2f82d94502
@ -329,7 +329,7 @@ pub(crate) mod test {
|
|||||||
write_channel_congestion: None,
|
write_channel_congestion: None,
|
||||||
internal_database_sizes: Default::default(),
|
internal_database_sizes: Default::default(),
|
||||||
},
|
},
|
||||||
embedder_stats: None,
|
embedder_stats: Default::default(),
|
||||||
enqueued_at: Some(BatchEnqueuedAt {
|
enqueued_at: Some(BatchEnqueuedAt {
|
||||||
earliest: datetime!(2022-11-11 0:00 UTC),
|
earliest: datetime!(2022-11-11 0:00 UTC),
|
||||||
oldest: datetime!(2022-11-11 0:00 UTC),
|
oldest: datetime!(2022-11-11 0:00 UTC),
|
||||||
|
@ -174,7 +174,7 @@ impl BatchQueue {
|
|||||||
pub(crate) fn write_batch(&self, wtxn: &mut RwTxn, batch: ProcessingBatch) -> Result<()> {
|
pub(crate) fn write_batch(&self, wtxn: &mut RwTxn, batch: ProcessingBatch) -> Result<()> {
|
||||||
let old_batch = self.all_batches.get(wtxn, &batch.uid)?;
|
let old_batch = self.all_batches.get(wtxn, &batch.uid)?;
|
||||||
|
|
||||||
println!("Saving batch: {}", batch.embedder_stats.is_some());
|
println!("Saving batch: {:?}", batch.embedder_stats);
|
||||||
|
|
||||||
self.all_batches.put(
|
self.all_batches.put(
|
||||||
wtxn,
|
wtxn,
|
||||||
@ -184,7 +184,7 @@ impl BatchQueue {
|
|||||||
progress: None,
|
progress: None,
|
||||||
details: batch.details,
|
details: batch.details,
|
||||||
stats: batch.stats,
|
stats: batch.stats,
|
||||||
embedder_stats: batch.embedder_stats.as_ref().map(|s| BatchEmbeddingStats::from(s.as_ref())),
|
embedder_stats: batch.embedder_stats.as_ref().into(),
|
||||||
started_at: batch.started_at,
|
started_at: batch.started_at,
|
||||||
finished_at: batch.finished_at,
|
finished_at: batch.finished_at,
|
||||||
enqueued_at: batch.enqueued_at,
|
enqueued_at: batch.enqueued_at,
|
||||||
|
@ -437,8 +437,10 @@ impl IndexScheduler {
|
|||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
self.maybe_fail(crate::test_utils::FailureLocation::InsideCreateBatch)?;
|
self.maybe_fail(crate::test_utils::FailureLocation::InsideCreateBatch)?;
|
||||||
|
|
||||||
|
println!("create next batch");
|
||||||
let batch_id = self.queue.batches.next_batch_id(rtxn)?;
|
let batch_id = self.queue.batches.next_batch_id(rtxn)?;
|
||||||
let mut current_batch = ProcessingBatch::new(batch_id);
|
let mut current_batch = ProcessingBatch::new(batch_id);
|
||||||
|
println!("over");
|
||||||
|
|
||||||
let enqueued = &self.queue.tasks.get_status(rtxn, Status::Enqueued)?;
|
let enqueued = &self.queue.tasks.get_status(rtxn, Status::Enqueued)?;
|
||||||
let count_total_enqueued = enqueued.len();
|
let count_total_enqueued = enqueued.len();
|
||||||
@ -454,6 +456,7 @@ impl IndexScheduler {
|
|||||||
kind: Kind::TaskCancelation,
|
kind: Kind::TaskCancelation,
|
||||||
id: task_id,
|
id: task_id,
|
||||||
});
|
});
|
||||||
|
println!("task cancelled");
|
||||||
return Ok(Some((Batch::TaskCancelation { task }, current_batch)));
|
return Ok(Some((Batch::TaskCancelation { task }, current_batch)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -524,7 +527,7 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 5. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
|
// 5. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
|
||||||
let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) };
|
let task_id = if let Some(task_id) = enqueued.min() { task_id } else { println!("return"); return Ok(None) };
|
||||||
let mut task =
|
let mut task =
|
||||||
self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
self.queue.tasks.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
|
||||||
|
|
||||||
@ -602,6 +605,7 @@ impl IndexScheduler {
|
|||||||
autobatcher::autobatch(enqueued, index_already_exists, primary_key.as_deref())
|
autobatcher::autobatch(enqueued, index_already_exists, primary_key.as_deref())
|
||||||
{
|
{
|
||||||
current_batch.reason(autobatch_stop_reason.unwrap_or(stop_reason));
|
current_batch.reason(autobatch_stop_reason.unwrap_or(stop_reason));
|
||||||
|
println!("autobatch");
|
||||||
return Ok(self
|
return Ok(self
|
||||||
.create_next_batch_index(
|
.create_next_batch_index(
|
||||||
rtxn,
|
rtxn,
|
||||||
@ -615,6 +619,7 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
// If we found no tasks then we were notified for something that got autobatched
|
// If we found no tasks then we were notified for something that got autobatched
|
||||||
// somehow and there is nothing to do.
|
// somehow and there is nothing to do.
|
||||||
|
println!("nothing to do");
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -164,7 +164,7 @@ impl IndexScheduler {
|
|||||||
|
|
||||||
let pre_commit_dabases_sizes = index.database_sizes(&index_wtxn)?;
|
let pre_commit_dabases_sizes = index.database_sizes(&index_wtxn)?;
|
||||||
let (tasks, congestion) =
|
let (tasks, congestion) =
|
||||||
self.apply_index_operation(&mut index_wtxn, &index, op, &progress, current_batch.clone_embedder_stats())?;
|
self.apply_index_operation(&mut index_wtxn, &index, op, &progress, current_batch.embedder_stats.clone())?;
|
||||||
|
|
||||||
{
|
{
|
||||||
progress.update_progress(FinalizingIndexStep::Committing);
|
progress.update_progress(FinalizingIndexStep::Committing);
|
||||||
@ -240,20 +240,11 @@ impl IndexScheduler {
|
|||||||
builder.set_primary_key(primary_key);
|
builder.set_primary_key(primary_key);
|
||||||
let must_stop_processing = self.scheduler.must_stop_processing.clone();
|
let must_stop_processing = self.scheduler.must_stop_processing.clone();
|
||||||
|
|
||||||
let embedder_stats = match current_batch.embedder_stats {
|
|
||||||
Some(ref stats) => stats.clone(),
|
|
||||||
None => {
|
|
||||||
let embedder_stats: Arc<EmbedderStats> = Default::default();
|
|
||||||
current_batch.embedder_stats = Some(embedder_stats.clone());
|
|
||||||
embedder_stats
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
builder
|
builder
|
||||||
.execute(
|
.execute(
|
||||||
|indexing_step| tracing::debug!(update = ?indexing_step),
|
|indexing_step| tracing::debug!(update = ?indexing_step),
|
||||||
|| must_stop_processing.get(),
|
|| must_stop_processing.get(),
|
||||||
embedder_stats,
|
current_batch.embedder_stats.clone(),
|
||||||
)
|
)
|
||||||
.map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?;
|
.map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?;
|
||||||
index_wtxn.commit()?;
|
index_wtxn.commit()?;
|
||||||
|
@ -29,7 +29,7 @@ pub struct ProcessingBatch {
|
|||||||
pub uid: BatchId,
|
pub uid: BatchId,
|
||||||
pub details: DetailsView,
|
pub details: DetailsView,
|
||||||
pub stats: BatchStats,
|
pub stats: BatchStats,
|
||||||
pub embedder_stats: Option<Arc<EmbedderStats>>,
|
pub embedder_stats: Arc<EmbedderStats>,
|
||||||
|
|
||||||
pub statuses: HashSet<Status>,
|
pub statuses: HashSet<Status>,
|
||||||
pub kinds: HashSet<Kind>,
|
pub kinds: HashSet<Kind>,
|
||||||
@ -47,11 +47,13 @@ impl ProcessingBatch {
|
|||||||
let mut statuses = HashSet::default();
|
let mut statuses = HashSet::default();
|
||||||
statuses.insert(Status::Processing);
|
statuses.insert(Status::Processing);
|
||||||
|
|
||||||
|
println!("Processing batch created: {}", uid);
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
uid,
|
uid,
|
||||||
details: DetailsView::default(),
|
details: DetailsView::default(),
|
||||||
stats: BatchStats::default(),
|
stats: BatchStats::default(),
|
||||||
embedder_stats: None,
|
embedder_stats: Default::default(),
|
||||||
|
|
||||||
statuses,
|
statuses,
|
||||||
kinds: HashSet::default(),
|
kinds: HashSet::default(),
|
||||||
@ -64,17 +66,6 @@ impl ProcessingBatch {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn clone_embedder_stats(&mut self) -> Arc<EmbedderStats> {
|
|
||||||
match self.embedder_stats {
|
|
||||||
Some(ref stats) => stats.clone(),
|
|
||||||
None => {
|
|
||||||
let embedder_stats: Arc<EmbedderStats> = Default::default();
|
|
||||||
self.embedder_stats = Some(embedder_stats.clone());
|
|
||||||
embedder_stats
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Update itself with the content of the task and update the batch id in the task.
|
/// Update itself with the content of the task and update the batch id in the task.
|
||||||
pub fn processing<'a>(&mut self, tasks: impl IntoIterator<Item = &'a mut Task>) {
|
pub fn processing<'a>(&mut self, tasks: impl IntoIterator<Item = &'a mut Task>) {
|
||||||
for task in tasks.into_iter() {
|
for task in tasks.into_iter() {
|
||||||
@ -113,11 +104,14 @@ impl ProcessingBatch {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn reason(&mut self, reason: BatchStopReason) {
|
pub fn reason(&mut self, reason: BatchStopReason) {
|
||||||
|
println!("batch stopped: {:?}", reason);
|
||||||
self.reason = reason;
|
self.reason = reason;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Must be called once the batch has finished processing.
|
/// Must be called once the batch has finished processing.
|
||||||
pub fn finished(&mut self) {
|
pub fn finished(&mut self) {
|
||||||
|
println!("Batch finished: {}", self.uid);
|
||||||
|
|
||||||
self.details = DetailsView::default();
|
self.details = DetailsView::default();
|
||||||
self.stats = BatchStats::default();
|
self.stats = BatchStats::default();
|
||||||
self.finished_at = Some(OffsetDateTime::now_utc());
|
self.finished_at = Some(OffsetDateTime::now_utc());
|
||||||
@ -132,6 +126,8 @@ impl ProcessingBatch {
|
|||||||
|
|
||||||
/// Update the timestamp of the tasks and the inner structure of this structure.
|
/// Update the timestamp of the tasks and the inner structure of this structure.
|
||||||
pub fn update(&mut self, task: &mut Task) {
|
pub fn update(&mut self, task: &mut Task) {
|
||||||
|
println!("Updating task: {} in batch: {}", task.uid, self.uid);
|
||||||
|
|
||||||
// We must re-set this value in case we're dealing with a task that has been added between
|
// We must re-set this value in case we're dealing with a task that has been added between
|
||||||
// the `processing` and `finished` state
|
// the `processing` and `finished` state
|
||||||
// We must re-set this value in case we're dealing with a task that has been added between
|
// We must re-set this value in case we're dealing with a task that has been added between
|
||||||
@ -156,13 +152,13 @@ impl ProcessingBatch {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn to_batch(&self) -> Batch {
|
pub fn to_batch(&self) -> Batch {
|
||||||
println!("Converting to batch: {:?}", self.embedder_stats);
|
println!("Converting to batch: {:?} {:?}", self.uid, self.embedder_stats);
|
||||||
Batch {
|
Batch {
|
||||||
uid: self.uid,
|
uid: self.uid,
|
||||||
progress: None,
|
progress: None,
|
||||||
details: self.details.clone(),
|
details: self.details.clone(),
|
||||||
stats: self.stats.clone(),
|
stats: self.stats.clone(),
|
||||||
embedder_stats: self.embedder_stats.as_ref().map(|s| BatchEmbeddingStats::from(s.as_ref())),
|
embedder_stats: self.embedder_stats.as_ref().into(),
|
||||||
started_at: self.started_at,
|
started_at: self.started_at,
|
||||||
finished_at: self.finished_at,
|
finished_at: self.finished_at,
|
||||||
enqueued_at: self.enqueued_at,
|
enqueued_at: self.enqueued_at,
|
||||||
|
@ -31,8 +31,8 @@ pub struct BatchView {
|
|||||||
pub struct BatchStatsView {
|
pub struct BatchStatsView {
|
||||||
#[serde(flatten)]
|
#[serde(flatten)]
|
||||||
pub stats: BatchStats,
|
pub stats: BatchStats,
|
||||||
#[serde(skip_serializing_if = "BatchEmbeddingStats::skip_serializing")]
|
#[serde(skip_serializing_if = "BatchEmbeddingStats::skip_serializing", default)]
|
||||||
pub embedder: Option<BatchEmbeddingStats>,
|
pub embedder: BatchEmbeddingStats,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BatchView {
|
impl BatchView {
|
||||||
|
@ -20,7 +20,8 @@ pub struct Batch {
|
|||||||
pub progress: Option<ProgressView>,
|
pub progress: Option<ProgressView>,
|
||||||
pub details: DetailsView,
|
pub details: DetailsView,
|
||||||
pub stats: BatchStats,
|
pub stats: BatchStats,
|
||||||
pub embedder_stats: Option<BatchEmbeddingStats>,
|
#[serde(skip_serializing_if = "BatchEmbeddingStats::skip_serializing", default)]
|
||||||
|
pub embedder_stats: BatchEmbeddingStats,
|
||||||
|
|
||||||
#[serde(with = "time::serde::rfc3339")]
|
#[serde(with = "time::serde::rfc3339")]
|
||||||
pub started_at: OffsetDateTime,
|
pub started_at: OffsetDateTime,
|
||||||
@ -110,10 +111,7 @@ impl From<&EmbedderStats> for BatchEmbeddingStats {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl BatchEmbeddingStats {
|
impl BatchEmbeddingStats {
|
||||||
pub fn skip_serializing(this: &Option<BatchEmbeddingStats>) -> bool {
|
pub fn skip_serializing(&self) -> bool {
|
||||||
match this {
|
self.total_count == 0 && self.error_count == 0 && self.last_error.is_none()
|
||||||
Some(stats) => stats.total_count == 0 && stats.error_count == 0 && stats.last_error.is_none(),
|
|
||||||
None => true,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,12 @@
|
|||||||
use std::collections::BTreeMap;
|
use std::collections::{BTreeMap, BTreeSet};
|
||||||
|
|
||||||
use meili_snap::{json_string, snapshot};
|
use meili_snap::{json_string, snapshot};
|
||||||
use reqwest::IntoUrl;
|
use reqwest::IntoUrl;
|
||||||
|
use tokio::spawn;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
use wiremock::matchers::{method, path};
|
use wiremock::matchers::{method, path};
|
||||||
use wiremock::{Mock, MockServer, Request, ResponseTemplate};
|
use wiremock::{Mock, MockServer, Request, ResponseTemplate};
|
||||||
use std::thread::sleep;
|
use tokio::time::sleep;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use crate::common::Value;
|
use crate::common::Value;
|
||||||
@ -307,7 +309,6 @@ async fn create_mock_raw() -> (MockServer, Value) {
|
|||||||
Mock::given(method("POST"))
|
Mock::given(method("POST"))
|
||||||
.and(path("/"))
|
.and(path("/"))
|
||||||
.respond_with(move |req: &Request| {
|
.respond_with(move |req: &Request| {
|
||||||
println!("Sent!");
|
|
||||||
let req: String = match req.body_json() {
|
let req: String = match req.body_json() {
|
||||||
Ok(req) => req,
|
Ok(req) => req,
|
||||||
Err(error) => {
|
Err(error) => {
|
||||||
@ -337,6 +338,50 @@ async fn create_mock_raw() -> (MockServer, Value) {
|
|||||||
(mock_server, embedder_settings)
|
(mock_server, embedder_settings)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A mock server that returns 500 errors, and sends a message once 5 requests are received
|
||||||
|
async fn create_faulty_mock_raw(mut sender: mpsc::Sender<()>) -> (MockServer, Value) {
|
||||||
|
let mock_server = MockServer::start().await;
|
||||||
|
|
||||||
|
Mock::given(method("POST"))
|
||||||
|
.and(path("/"))
|
||||||
|
.respond_with(move |req: &Request| {
|
||||||
|
let req: String = match req.body_json() {
|
||||||
|
Ok(req) => req,
|
||||||
|
Err(error) => {
|
||||||
|
return ResponseTemplate::new(400).set_body_json(json!({
|
||||||
|
"error": format!("Invalid request: {error}")
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let sender = sender.clone();
|
||||||
|
spawn(async move {
|
||||||
|
sender.send(()).await;
|
||||||
|
});
|
||||||
|
|
||||||
|
ResponseTemplate::new(500)
|
||||||
|
.set_delay(Duration::from_millis(500))
|
||||||
|
.set_body_json(json!({
|
||||||
|
"error": "Service Unavailable",
|
||||||
|
"text": req
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
.mount(&mock_server)
|
||||||
|
.await;
|
||||||
|
let url = mock_server.uri();
|
||||||
|
|
||||||
|
let embedder_settings = json!({
|
||||||
|
"source": "rest",
|
||||||
|
"url": url,
|
||||||
|
"dimensions": 3,
|
||||||
|
"request": "{{text}}",
|
||||||
|
"response": "{{embedding}}",
|
||||||
|
"documentTemplate": "{{doc.name}}"
|
||||||
|
});
|
||||||
|
|
||||||
|
(mock_server, embedder_settings)
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn post<T: IntoUrl>(url: T, text: &str) -> reqwest::Result<reqwest::Response> {
|
pub async fn post<T: IntoUrl>(url: T, text: &str) -> reqwest::Result<reqwest::Response> {
|
||||||
reqwest::Client::builder().build()?.post(url).json(&json!(text)).send().await
|
reqwest::Client::builder().build()?.post(url).json(&json!(text)).send().await
|
||||||
}
|
}
|
||||||
@ -2118,7 +2163,8 @@ async fn searchable_reindex() {
|
|||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
async fn observability() {
|
async fn observability() {
|
||||||
let (_mock, setting) = create_mock_raw().await;
|
let (sender, mut receiver) = mpsc::channel(10);
|
||||||
|
let (_mock, setting) = create_faulty_mock_raw(sender).await;
|
||||||
let server = get_server_vector().await;
|
let server = get_server_vector().await;
|
||||||
let index = server.index("doggo");
|
let index = server.index("doggo");
|
||||||
|
|
||||||
@ -2133,20 +2179,19 @@ async fn observability() {
|
|||||||
let task = server.wait_task(response.uid()).await;
|
let task = server.wait_task(response.uid()).await;
|
||||||
snapshot!(task["status"], @r###""succeeded""###);
|
snapshot!(task["status"], @r###""succeeded""###);
|
||||||
let documents = json!([
|
let documents = json!([
|
||||||
{"id": 0, "name": "kefir"},
|
{"id": 0, "name": "will_return_500"}, // Stuff that doesn't exist
|
||||||
{"id": 1, "name": "echo", "_vectors": { "rest": [1, 1, 1] }},
|
{"id": 1, "name": "will_error"},
|
||||||
{"id": 2, "name": "intel"},
|
{"id": 2, "name": "must_error"},
|
||||||
{"id": 3, "name": "missing"}, // Stuff that doesn't exist
|
|
||||||
{"id": 4, "name": "invalid"},
|
|
||||||
{"id": 5, "name": "foobar"},
|
|
||||||
]);
|
]);
|
||||||
let (value, code) = index.add_documents(documents, None).await;
|
let (value, code) = index.add_documents(documents, None).await;
|
||||||
snapshot!(code, @"202 Accepted");
|
snapshot!(code, @"202 Accepted");
|
||||||
|
|
||||||
let batches = index.filtered_batches(&[], &[], &[]).await;
|
// The task will eventually fail, so let's not wait for it.
|
||||||
println!("Batches: {batches:?}");
|
// Let's just wait for 5 errors from the mock server.
|
||||||
|
for _errors in 0..5 {
|
||||||
|
receiver.recv().await;
|
||||||
|
}
|
||||||
|
|
||||||
let task = index.wait_task(value.uid()).await;
|
|
||||||
let batches = index.filtered_batches(&[], &[], &[]).await;
|
let batches = index.filtered_batches(&[], &[], &[]).await;
|
||||||
println!("Batches: {batches:?}");
|
println!("Batches: {batches:?}");
|
||||||
|
|
||||||
|
@ -316,6 +316,7 @@ where
|
|||||||
if let Some(embedder_stats) = &embedder_stats {
|
if let Some(embedder_stats) = &embedder_stats {
|
||||||
embedder_stats.as_ref().total_requests.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
embedder_stats.as_ref().total_requests.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
// TODO: also catch 403 errors
|
||||||
let response = request.clone().send_json(&body);
|
let response = request.clone().send_json(&body);
|
||||||
let result = check_response(response, data.configuration_source).and_then(|response| {
|
let result = check_response(response, data.configuration_source).and_then(|response| {
|
||||||
response_to_embedding(response, data, expected_count, expected_dimension)
|
response_to_embedding(response, data, expected_count, expected_dimension)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user