mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-15 13:58:36 +02:00
Pass the Server as an extra parameter when the Index needs to wait for a task
Signed-off-by: Martin Tzvetanov Grigorov <mgrigorov@apache.org>
This commit is contained in:
parent
13ea29e511
commit
ae912c4c3f
9 changed files with 23 additions and 34 deletions
|
@ -31,7 +31,7 @@ impl<'a> Index<'a, Owned> {
|
|||
Index { uid: self.uid.clone(), service: self.service, encoder, marker: PhantomData }
|
||||
}
|
||||
|
||||
pub async fn load_test_set(&self) -> u64 {
|
||||
pub async fn load_test_set<State>(&self, waiter: &Server<State>) -> u64 {
|
||||
let url = format!("/indexes/{}/documents", urlencode(self.uid.as_ref()));
|
||||
let (response, code) = self
|
||||
.service
|
||||
|
@ -43,11 +43,11 @@ impl<'a> Index<'a, Owned> {
|
|||
.await;
|
||||
assert_eq!(code, 202);
|
||||
let update_id = response["taskUid"].as_u64().unwrap();
|
||||
self.wait_task(update_id).await;
|
||||
waiter.wait_task(update_id).await;
|
||||
update_id
|
||||
}
|
||||
|
||||
pub async fn load_test_set_ndjson(&self) -> u64 {
|
||||
pub async fn load_test_set_ndjson<State>(&self, waiter: &Server<State>) -> u64 {
|
||||
let url = format!("/indexes/{}/documents", urlencode(self.uid.as_ref()));
|
||||
let (response, code) = self
|
||||
.service
|
||||
|
@ -59,7 +59,7 @@ impl<'a> Index<'a, Owned> {
|
|||
.await;
|
||||
assert_eq!(code, 202);
|
||||
let update_id = response["taskUid"].as_u64().unwrap();
|
||||
self.wait_task(update_id).await;
|
||||
waiter.wait_task(update_id).await;
|
||||
update_id
|
||||
}
|
||||
|
||||
|
@ -265,10 +265,10 @@ impl Index<'_, Shared> {
|
|||
/// You cannot modify the content of a shared index, thus the delete_document_by_filter call
|
||||
/// must fail. If the task successfully enqueue itself, we'll wait for the task to finishes,
|
||||
/// and if it succeed the function will panic.
|
||||
pub async fn delete_document_by_filter_fail(&self, body: Value) -> (Value, StatusCode) {
|
||||
pub async fn delete_document_by_filter_fail<State>(&self, body: Value, waiter: &Server<State>) -> (Value, StatusCode) {
|
||||
let (mut task, code) = self._delete_document_by_filter(body).await;
|
||||
if code.is_success() {
|
||||
task = self.wait_task(task.uid()).await;
|
||||
task = waiter.wait_task(task.uid()).await;
|
||||
if task.is_success() {
|
||||
panic!(
|
||||
"`delete_document_by_filter_fail` succeeded: {}",
|
||||
|
@ -279,10 +279,10 @@ impl Index<'_, Shared> {
|
|||
(task, code)
|
||||
}
|
||||
|
||||
pub async fn delete_index_fail(&self) -> (Value, StatusCode) {
|
||||
pub async fn delete_index_fail<State>(&self, waiter: &Server<State>) -> (Value, StatusCode) {
|
||||
let (mut task, code) = self._delete().await;
|
||||
if code.is_success() {
|
||||
task = self.wait_task(task.uid()).await;
|
||||
task = waiter.wait_task(task.uid()).await;
|
||||
if task.is_success() {
|
||||
panic!(
|
||||
"`delete_index_fail` succeeded: {}",
|
||||
|
@ -293,10 +293,10 @@ impl Index<'_, Shared> {
|
|||
(task, code)
|
||||
}
|
||||
|
||||
pub async fn update_index_fail(&self, primary_key: Option<&str>) -> (Value, StatusCode) {
|
||||
pub async fn update_index_fail<State>(&self, primary_key: Option<&str>, waiter: &Server<State>) -> (Value, StatusCode) {
|
||||
let (mut task, code) = self._update(primary_key).await;
|
||||
if code.is_success() {
|
||||
task = self.wait_task(task.uid()).await;
|
||||
task = waiter.wait_task(task.uid()).await;
|
||||
if task.is_success() {
|
||||
panic!(
|
||||
"`update_index_fail` succeeded: {}",
|
||||
|
@ -362,10 +362,6 @@ impl<State> Index<'_, State> {
|
|||
self.service.delete(url).await
|
||||
}
|
||||
|
||||
async fn wait_task(&self, update_id: u64) -> Value {
|
||||
Server::<Shared>::_wait_task(async |url| self.service.get(url).await, update_id).await
|
||||
}
|
||||
|
||||
pub async fn get_task(&self, update_id: u64) -> (Value, StatusCode) {
|
||||
let url = format!("/tasks/{}", update_id);
|
||||
self.service.get(url).await
|
||||
|
|
|
@ -407,19 +407,12 @@ impl<State> Server<State> {
|
|||
}
|
||||
|
||||
pub async fn wait_task(&self, update_id: u64) -> Value {
|
||||
Server::<Shared>::_wait_task(async |url| self.service.get(url).await, update_id).await
|
||||
}
|
||||
|
||||
pub(super) async fn _wait_task<F>(request_fn: F, update_id: u64) -> Value
|
||||
where
|
||||
F: AsyncFnOnce(String) -> (Value, StatusCode) + Copy,
|
||||
{
|
||||
// try several times to get status, or panic to not wait forever
|
||||
let url = format!("/tasks/{update_id}");
|
||||
let max_attempts = 400; // 200 seconds in total, 0.5secs per attempt
|
||||
|
||||
for i in 0..max_attempts {
|
||||
let (response, status_code) = request_fn(url.clone()).await;
|
||||
let (response, status_code) = self.service.get(url.clone()).await;
|
||||
assert_eq!(200, status_code, "response: {response}");
|
||||
|
||||
if response["status"] == "succeeded" || response["status"] == "failed" {
|
||||
|
|
|
@ -1318,7 +1318,7 @@ async fn add_no_documents() {
|
|||
async fn add_larger_dataset() {
|
||||
let server = Server::new_shared();
|
||||
let index = server.unique_index();
|
||||
let update_id = index.load_test_set().await;
|
||||
let update_id = index.load_test_set(server).await;
|
||||
let (response, code) = index.get_task(update_id).await;
|
||||
assert_eq!(code, 200);
|
||||
assert_eq!(response["status"], "succeeded");
|
||||
|
@ -1333,7 +1333,7 @@ async fn add_larger_dataset() {
|
|||
|
||||
// x-ndjson add large test
|
||||
let index = server.unique_index();
|
||||
let update_id = index.load_test_set_ndjson().await;
|
||||
let update_id = index.load_test_set_ndjson(server).await;
|
||||
let (response, code) = index.get_task(update_id).await;
|
||||
assert_eq!(code, 200);
|
||||
assert_eq!(response["status"], "succeeded");
|
||||
|
|
|
@ -7,7 +7,7 @@ use crate::json;
|
|||
async fn delete_one_document_unexisting_index() {
|
||||
let server = Server::new_shared();
|
||||
let index = shared_does_not_exists_index().await;
|
||||
let (task, code) = index.delete_document_by_filter_fail(json!({"filter": "a = b"})).await;
|
||||
let (task, code) = index.delete_document_by_filter_fail(json!({"filter": "a = b"}), server).await;
|
||||
assert_eq!(code, 202);
|
||||
|
||||
server.wait_task(task.uid()).await.failed();
|
||||
|
|
|
@ -559,7 +559,7 @@ async fn delete_document_by_filter() {
|
|||
let index = shared_does_not_exists_index().await;
|
||||
// index does not exists
|
||||
let (response, _code) =
|
||||
index.delete_document_by_filter_fail(json!({ "filter": "doggo = bernese"})).await;
|
||||
index.delete_document_by_filter_fail(json!({ "filter": "doggo = bernese"}), server).await;
|
||||
snapshot!(response, @r###"
|
||||
{
|
||||
"uid": "[uid]",
|
||||
|
@ -589,7 +589,7 @@ async fn delete_document_by_filter() {
|
|||
// no filterable are set
|
||||
let index = shared_empty_index().await;
|
||||
let (response, _code) =
|
||||
index.delete_document_by_filter_fail(json!({ "filter": "doggo = bernese"})).await;
|
||||
index.delete_document_by_filter_fail(json!({ "filter": "doggo = bernese"}), server).await;
|
||||
snapshot!(response, @r###"
|
||||
{
|
||||
"uid": "[uid]",
|
||||
|
@ -619,7 +619,7 @@ async fn delete_document_by_filter() {
|
|||
// not filterable while there is a filterable attribute
|
||||
let index = shared_index_with_documents().await;
|
||||
let (response, code) =
|
||||
index.delete_document_by_filter_fail(json!({ "filter": "catto = jorts"})).await;
|
||||
index.delete_document_by_filter_fail(json!({ "filter": "catto = jorts"}), server).await;
|
||||
snapshot!(code, @"202 Accepted");
|
||||
let response = server.wait_task(response.uid()).await.failed();
|
||||
snapshot!(response, @r###"
|
||||
|
|
|
@ -334,7 +334,7 @@ async fn get_document_s_nested_attributes_to_retrieve() {
|
|||
async fn get_documents_displayed_attributes_is_ignored() {
|
||||
let server = Server::new_shared();
|
||||
let index = server.unique_index();
|
||||
index.load_test_set().await;
|
||||
index.load_test_set(server).await;
|
||||
index.update_settings(json!({"displayedAttributes": ["gender"]})).await;
|
||||
|
||||
let (response, code) = index.get_all_documents(GetAllDocumentsOptions::default()).await;
|
||||
|
|
|
@ -26,7 +26,7 @@ async fn create_and_delete_index() {
|
|||
async fn error_delete_unexisting_index() {
|
||||
let server = Server::new_shared();
|
||||
let index = shared_does_not_exists_index().await;
|
||||
let (task, code) = index.delete_index_fail().await;
|
||||
let (task, code) = index.delete_index_fail(server).await;
|
||||
|
||||
assert_eq!(code, 202);
|
||||
server.wait_task(task.uid()).await.failed();
|
||||
|
|
|
@ -72,7 +72,7 @@ async fn error_update_existing_primary_key() {
|
|||
let server = Server::new_shared();
|
||||
let index = shared_index_with_documents().await;
|
||||
|
||||
let (update_task, code) = index.update_index_fail(Some("primary")).await;
|
||||
let (update_task, code) = index.update_index_fail(Some("primary"), server).await;
|
||||
|
||||
assert_eq!(code, 202);
|
||||
let response = server.wait_task(update_task.uid()).await.failed();
|
||||
|
@ -91,7 +91,7 @@ async fn error_update_existing_primary_key() {
|
|||
async fn error_update_unexisting_index() {
|
||||
let server = Server::new_shared();
|
||||
let index = shared_does_not_exists_index().await;
|
||||
let (task, code) = index.update_index_fail(Some("my-primary-key")).await;
|
||||
let (task, code) = index.update_index_fail(Some("my-primary-key"), server).await;
|
||||
|
||||
assert_eq!(code, 202);
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@ async fn perform_snapshot() {
|
|||
}))
|
||||
.await;
|
||||
|
||||
index.load_test_set().await;
|
||||
index.load_test_set(&server).await;
|
||||
|
||||
let (task, code) = server.index("test1").create(Some("prim")).await;
|
||||
meili_snap::snapshot!(code, @"202 Accepted");
|
||||
|
@ -128,7 +128,7 @@ async fn perform_on_demand_snapshot() {
|
|||
}))
|
||||
.await;
|
||||
|
||||
index.load_test_set().await;
|
||||
index.load_test_set(&server).await;
|
||||
|
||||
let (task, _status_code) = server.index("doggo").create(Some("bone")).await;
|
||||
server.wait_task(task.uid()).await.succeeded();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue