Use Server::wait_task() instead of Index::wait_task() for tasks IT tests

Revert the debugging helper that dumped the thread stack traces.
Try with 400 max attempts for the task success/failure (200 secs)

Signed-off-by: Martin Tzvetanov Grigorov <mgrigorov@apache.org>
This commit is contained in:
Martin Tzvetanov Grigorov 2025-06-06 13:45:05 +03:00
parent 1b4d344e18
commit 63ccd19ab1
No known key found for this signature in database
GPG Key ID: 3194FD8C1AE300EF
4 changed files with 39 additions and 51 deletions

View File

@ -1,5 +1,2 @@
[alias] [alias]
xtask = "run --release --package xtask --" xtask = "run --release --package xtask --"
[build]
rustflags = ["--cfg", "tokio_unstable", "--cfg", "tokio_taskdump"]

View File

@ -158,8 +158,6 @@ jobs:
uses: Swatinem/rust-cache@v2.7.8 uses: Swatinem/rust-cache@v2.7.8
- name: Run tests in debug - name: Run tests in debug
uses: actions-rs/cargo@v1 uses: actions-rs/cargo@v1
env:
RUSTFLAGS: "--cfg tokio_unstable --cfg tokio_taskdump"
with: with:
command: test command: test
args: --locked --all args: --locked --all

View File

@ -399,14 +399,9 @@ impl<State> Server<State> {
pub async fn wait_task(&self, update_id: u64) -> Value { pub async fn wait_task(&self, update_id: u64) -> Value {
// try several times to get status, or panic to not wait forever // try several times to get status, or panic to not wait forever
let url = format!("/tasks/{}", update_id); let url = format!("/tasks/{}", update_id);
// Increase timeout for vector-related tests let max_attempts = 400; // 200 seconds total, 0.5s per attempt
let max_attempts = if update_id > 1000 {
400 // 200 seconds for vector tests
} else {
1000 // 50 seconds for other tests
};
for _ in 0..max_attempts { for i in 0..max_attempts {
let (response, status_code) = self.service.get(&url).await; let (response, status_code) = self.service.get(&url).await;
assert_eq!(200, status_code, "response: {}", response); assert_eq!(200, status_code, "response: {}", response);
@ -416,13 +411,9 @@ impl<State> Server<State> {
// wait 0.5 second. // wait 0.5 second.
sleep(Duration::from_millis(500)).await; sleep(Duration::from_millis(500)).await;
}
let handle = tokio::runtime::Handle::current(); if i == max_attempts - 1 {
if let Ok(dump) = tokio::time::timeout(Duration::from_secs(2), handle.dump()).await { dbg!(response);
for (i, task) in dump.tasks().iter().enumerate() {
let trace = task.trace();
println!("TASK {i}:");
println!("{trace}\n");
} }
} }
panic!("Timeout waiting for update id"); panic!("Timeout waiting for update id");

View File

@ -39,7 +39,7 @@ async fn get_task_status() {
None, None,
) )
.await; .await;
index.wait_task(create_task.uid()).await.succeeded(); server.wait_task(create_task.uid()).await.succeeded();
let (_response, code) = index.get_task(add_task.uid()).await; let (_response, code) = index.get_task(add_task.uid()).await;
assert_eq!(code, 200); assert_eq!(code, 200);
// TODO check response format, as per #48 // TODO check response format, as per #48
@ -51,7 +51,7 @@ async fn list_tasks() {
let server = Server::new().await; let server = Server::new().await;
let index = server.index("test"); let index = server.index("test");
let (task, _status_code) = index.create(None).await; let (task, _status_code) = index.create(None).await;
index.wait_task(task.uid()).await.succeeded(); server.wait_task(task.uid()).await.succeeded();
index index
.add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None) .add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None)
.await; .await;
@ -105,7 +105,7 @@ async fn list_tasks_with_star_filters() {
// Do not use a unique index here, as we want to test the `indexUids=*` filter. // Do not use a unique index here, as we want to test the `indexUids=*` filter.
let index = server.index("test"); let index = server.index("test");
let (task, _code) = index.create(None).await; let (task, _code) = index.create(None).await;
index.wait_task(task.uid()).await.succeeded(); server.wait_task(task.uid()).await.succeeded();
index index
.add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None) .add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None)
.await; .await;
@ -154,9 +154,9 @@ async fn list_tasks_status_filtered() {
let server = Server::new().await; let server = Server::new().await;
let index = server.index("test"); let index = server.index("test");
let (task, _status_code) = index.create(None).await; let (task, _status_code) = index.create(None).await;
index.wait_task(task.uid()).await.succeeded(); server.wait_task(task.uid()).await.succeeded();
let (task, _status_code) = index.create(None).await; let (task, _status_code) = index.create(None).await;
index.wait_task(task.uid()).await.failed(); server.wait_task(task.uid()).await.failed();
let (response, code) = index.filtered_tasks(&[], &["succeeded"], &[]).await; let (response, code) = index.filtered_tasks(&[], &["succeeded"], &[]).await;
assert_eq!(code, 200, "{response}"); assert_eq!(code, 200, "{response}");
@ -177,7 +177,7 @@ async fn list_tasks_type_filtered() {
let server = Server::new().await; let server = Server::new().await;
let index = server.index("test"); let index = server.index("test");
let (task, _status_code) = index.create(None).await; let (task, _status_code) = index.create(None).await;
index.wait_task(task.uid()).await.succeeded(); server.wait_task(task.uid()).await.succeeded();
index index
.add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None) .add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None)
.await; .await;
@ -197,10 +197,12 @@ async fn list_tasks_invalid_canceled_by_filter() {
let server = Server::new_shared(); let server = Server::new_shared();
let index = server.unique_index(); let index = server.unique_index();
let (task, _status_code) = index.create(None).await; let (task, _status_code) = index.create(None).await;
index.wait_task(task.uid()).await.succeeded(); server.wait_task(task.uid()).await.succeeded();
let (task, _code) = index let (task, _code) = index
.add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None) .add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None)
.await; .await;
server.wait_task(task.uid()).await.succeeded();
let (response, code) = let (response, code) =
index.filtered_tasks(&[], &[], &[format!("{}", task.uid()).as_str()]).await; index.filtered_tasks(&[], &[], &[format!("{}", task.uid()).as_str()]).await;
@ -214,7 +216,7 @@ async fn list_tasks_status_and_type_filtered() {
let server = Server::new().await; let server = Server::new().await;
let index = server.index("test"); let index = server.index("test");
let (task, _status_code) = index.create(None).await; let (task, _status_code) = index.create(None).await;
index.wait_task(task.uid()).await.succeeded(); server.wait_task(task.uid()).await.succeeded();
index index
.add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None) .add_documents(serde_json::from_str(include_str!("../assets/test_set.json")).unwrap(), None)
.await; .await;
@ -284,7 +286,7 @@ async fn test_summarized_document_addition_or_update() {
let index = server.unique_index(); let index = server.unique_index();
let (task, _status_code) = let (task, _status_code) =
index.add_documents(json!({ "id": 42, "content": "doggos & fluff" }), None).await; index.add_documents(json!({ "id": 42, "content": "doggos & fluff" }), None).await;
index.wait_task(task.uid()).await.succeeded(); server.wait_task(task.uid()).await.succeeded();
let (task, _) = index.get_task(task.uid()).await; let (task, _) = index.get_task(task.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"
@ -309,7 +311,7 @@ async fn test_summarized_document_addition_or_update() {
let (task, _status_code) = let (task, _status_code) =
index.add_documents(json!({ "id": 42, "content": "doggos & fluff" }), Some("id")).await; index.add_documents(json!({ "id": 42, "content": "doggos & fluff" }), Some("id")).await;
index.wait_task(task.uid()).await.succeeded(); server.wait_task(task.uid()).await.succeeded();
let (task, _) = index.get_task(task.uid()).await; let (task, _) = index.get_task(task.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"
@ -343,7 +345,7 @@ async fn test_summarized_delete_documents_by_batch() {
let (task, _status_code) = index let (task, _status_code) = index
.delete_batch(vec![non_existing_task_id1, non_existing_task_id2, non_existing_task_id3]) .delete_batch(vec![non_existing_task_id1, non_existing_task_id2, non_existing_task_id3])
.await; .await;
index.wait_task(task.uid()).await.failed(); server.wait_task(task.uid()).await.failed();
let (task, _) = index.get_task(task.uid()).await; let (task, _) = index.get_task(task.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"
@ -374,7 +376,7 @@ async fn test_summarized_delete_documents_by_batch() {
index.create(None).await; index.create(None).await;
let (del_task, _status_code) = index.delete_batch(vec![42]).await; let (del_task, _status_code) = index.delete_batch(vec![42]).await;
index.wait_task(del_task.uid()).await.succeeded(); server.wait_task(del_task.uid()).await.succeeded();
let (task, _) = index.get_task(del_task.uid()).await; let (task, _) = index.get_task(del_task.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"
@ -406,7 +408,7 @@ async fn test_summarized_delete_documents_by_filter() {
let (task, _status_code) = let (task, _status_code) =
index.delete_document_by_filter(json!({ "filter": "doggo = bernese" })).await; index.delete_document_by_filter(json!({ "filter": "doggo = bernese" })).await;
index.wait_task(task.uid()).await.failed(); server.wait_task(task.uid()).await.failed();
let (task, _) = index.get_task(task.uid()).await; let (task, _) = index.get_task(task.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"
@ -438,7 +440,7 @@ async fn test_summarized_delete_documents_by_filter() {
index.create(None).await; index.create(None).await;
let (task, _status_code) = let (task, _status_code) =
index.delete_document_by_filter(json!({ "filter": "doggo = bernese" })).await; index.delete_document_by_filter(json!({ "filter": "doggo = bernese" })).await;
index.wait_task(task.uid()).await.failed(); server.wait_task(task.uid()).await.failed();
let (task, _) = index.get_task(task.uid()).await; let (task, _) = index.get_task(task.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"
@ -470,7 +472,7 @@ async fn test_summarized_delete_documents_by_filter() {
index.update_settings(json!({ "filterableAttributes": ["doggo"] })).await; index.update_settings(json!({ "filterableAttributes": ["doggo"] })).await;
let (task, _status_code) = let (task, _status_code) =
index.delete_document_by_filter(json!({ "filter": "doggo = bernese" })).await; index.delete_document_by_filter(json!({ "filter": "doggo = bernese" })).await;
index.wait_task(task.uid()).await.succeeded(); server.wait_task(task.uid()).await.succeeded();
let (task, _) = index.get_task(task.uid()).await; let (task, _) = index.get_task(task.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"
@ -500,7 +502,7 @@ async fn test_summarized_delete_document_by_id() {
let server = Server::new_shared(); let server = Server::new_shared();
let index = server.unique_index(); let index = server.unique_index();
let (task, _status_code) = index.delete_document(1).await; let (task, _status_code) = index.delete_document(1).await;
index.wait_task(task.uid()).await.failed(); server.wait_task(task.uid()).await.failed();
let (task, _) = index.get_task(task.uid()).await; let (task, _) = index.get_task(task.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"
@ -531,7 +533,7 @@ async fn test_summarized_delete_document_by_id() {
index.create(None).await; index.create(None).await;
let (task, _status_code) = index.delete_document(42).await; let (task, _status_code) = index.delete_document(42).await;
index.wait_task(task.uid()).await.succeeded(); server.wait_task(task.uid()).await.succeeded();
let (task, _) = index.get_task(task.uid()).await; let (task, _) = index.get_task(task.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"
@ -573,7 +575,7 @@ async fn test_summarized_settings_update() {
"###); "###);
let (task,_status_code) = index.update_settings(json!({ "displayedAttributes": ["doggos", "name"], "filterableAttributes": ["age", "nb_paw_pads"], "sortableAttributes": ["iq"] })).await; let (task,_status_code) = index.update_settings(json!({ "displayedAttributes": ["doggos", "name"], "filterableAttributes": ["age", "nb_paw_pads"], "sortableAttributes": ["iq"] })).await;
index.wait_task(task.uid()).await.succeeded(); server.wait_task(task.uid()).await.succeeded();
let (task, _) = index.get_task(task.uid()).await; let (task, _) = index.get_task(task.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"
@ -611,7 +613,7 @@ async fn test_summarized_index_creation() {
let server = Server::new_shared(); let server = Server::new_shared();
let index = server.unique_index(); let index = server.unique_index();
let (task, _status_code) = index.create(None).await; let (task, _status_code) = index.create(None).await;
index.wait_task(task.uid()).await.succeeded(); server.wait_task(task.uid()).await.succeeded();
let (task, _) = index.get_task(task.uid()).await; let (task, _) = index.get_task(task.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"
@ -634,7 +636,7 @@ async fn test_summarized_index_creation() {
"###); "###);
let (task, _status_code) = index.create(Some("doggos")).await; let (task, _status_code) = index.create(Some("doggos")).await;
index.wait_task(task.uid()).await.failed(); server.wait_task(task.uid()).await.failed();
let (task, _) = index.get_task(task.uid()).await; let (task, _) = index.get_task(task.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"
@ -667,7 +669,7 @@ async fn test_summarized_index_deletion() {
let server = Server::new_shared(); let server = Server::new_shared();
let index = server.unique_index(); let index = server.unique_index();
let (ret, _code) = index.delete().await; let (ret, _code) = index.delete().await;
let task = index.wait_task(ret.uid()).await; let task = server.wait_task(ret.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"
{ {
@ -698,7 +700,7 @@ async fn test_summarized_index_deletion() {
// both tasks may get autobatched and the deleted documents count will be wrong. // both tasks may get autobatched and the deleted documents count will be wrong.
let (ret, _code) = let (ret, _code) =
index.add_documents(json!({ "id": 42, "content": "doggos & fluff" }), Some("id")).await; index.add_documents(json!({ "id": 42, "content": "doggos & fluff" }), Some("id")).await;
let task = index.wait_task(ret.uid()).await; let task = server.wait_task(ret.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"
{ {
@ -721,7 +723,7 @@ async fn test_summarized_index_deletion() {
"###); "###);
let (ret, _code) = index.delete().await; let (ret, _code) = index.delete().await;
let task = index.wait_task(ret.uid()).await; let task = server.wait_task(ret.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"
{ {
@ -744,7 +746,7 @@ async fn test_summarized_index_deletion() {
// What happens when you delete an index that doesn't exists. // What happens when you delete an index that doesn't exists.
let (ret, _code) = index.delete().await; let (ret, _code) = index.delete().await;
let task = index.wait_task(ret.uid()).await; let task = server.wait_task(ret.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"
{ {
@ -777,7 +779,7 @@ async fn test_summarized_index_update() {
let index = server.unique_index(); let index = server.unique_index();
// If the index doesn't exist yet, we should get errors with or without the primary key. // If the index doesn't exist yet, we should get errors with or without the primary key.
let (task, _status_code) = index.update(None).await; let (task, _status_code) = index.update(None).await;
index.wait_task(task.uid()).await.failed(); server.wait_task(task.uid()).await.failed();
let (task, _) = index.get_task(task.uid()).await; let (task, _) = index.get_task(task.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"
@ -805,7 +807,7 @@ async fn test_summarized_index_update() {
"###); "###);
let (task, _status_code) = index.update(Some("bones")).await; let (task, _status_code) = index.update(Some("bones")).await;
index.wait_task(task.uid()).await.failed(); server.wait_task(task.uid()).await.failed();
let (task, _) = index.get_task(task.uid()).await; let (task, _) = index.get_task(task.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"
@ -836,7 +838,7 @@ async fn test_summarized_index_update() {
index.create(None).await; index.create(None).await;
let (task, _status_code) = index.update(None).await; let (task, _status_code) = index.update(None).await;
index.wait_task(task.uid()).await.succeeded(); server.wait_task(task.uid()).await.succeeded();
let (task, _) = index.get_task(task.uid()).await; let (task, _) = index.get_task(task.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"
@ -859,7 +861,7 @@ async fn test_summarized_index_update() {
"###); "###);
let (task, _status_code) = index.update(Some("bones")).await; let (task, _status_code) = index.update(Some("bones")).await;
index.wait_task(task.uid()).await.succeeded(); server.wait_task(task.uid()).await.succeeded();
let (task, _) = index.get_task(task.uid()).await; let (task, _) = index.get_task(task.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"
@ -973,9 +975,9 @@ async fn test_summarized_task_cancelation() {
// to avoid being flaky we're only going to cancel an already finished task :( // to avoid being flaky we're only going to cancel an already finished task :(
let (task, _status_code) = index.create(None).await; let (task, _status_code) = index.create(None).await;
let task_uid = task.uid(); let task_uid = task.uid();
index.wait_task(task_uid).await.succeeded(); server.wait_task(task.uid()).await.succeeded();
let (task, _status_code) = server.cancel_tasks(format!("uids={task_uid}").as_str()).await; let (task, _status_code) = server.cancel_tasks(format!("uids={task_uid}").as_str()).await;
index.wait_task(task.uid()).await.succeeded(); server.wait_task(task.uid()).await.succeeded();
let (task, _) = index.get_task(task.uid()).await; let (task, _) = index.get_task(task.uid()).await;
snapshot!(json_string!(task, snapshot!(json_string!(task,
{ ".uid" => "[uid]", ".batchUid" => "[batch_uid]", ".**.originalFilter" => "[of]", ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }), { ".uid" => "[uid]", ".batchUid" => "[batch_uid]", ".**.originalFilter" => "[of]", ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }),
@ -1007,9 +1009,9 @@ async fn test_summarized_task_deletion() {
let index = server.unique_index(); let index = server.unique_index();
// to avoid being flaky we're only going to delete an already finished task :( // to avoid being flaky we're only going to delete an already finished task :(
let (task, _status_code) = index.create(None).await; let (task, _status_code) = index.create(None).await;
index.wait_task(task.uid()).await.succeeded(); server.wait_task(task.uid()).await.succeeded();
let (task, _status_code) = server.delete_tasks("uids=0").await; let (task, _status_code) = server.delete_tasks("uids=0").await;
index.wait_task(task.uid()).await.succeeded(); server.wait_task(task.uid()).await.succeeded();
let (task, _) = index.get_task(task.uid()).await; let (task, _) = index.get_task(task.uid()).await;
snapshot!(task, snapshot!(task,
@r###" @r###"