This commit is contained in:
Martin Grigorov 2025-07-03 09:18:22 +12:00 committed by GitHub
commit b9ab763f26
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 218 additions and 225 deletions

View file

@ -1,15 +1,13 @@
use std::fmt::Write;
use std::marker::PhantomData;
use std::panic::{catch_unwind, resume_unwind, UnwindSafe};
use std::time::Duration;
use actix_web::http::StatusCode;
use tokio::time::sleep;
use urlencoding::encode as urlencode;
use super::encoder::Encoder;
use super::service::Service;
use super::{Owned, Shared, Value};
use super::{Owned, Server, Shared, Value};
use crate::json;
pub struct Index<'a, State = Owned> {
@ -44,9 +42,9 @@ impl<'a> Index<'a, Owned> {
)
.await;
assert_eq!(code, 202);
let update_id = response["taskUid"].as_i64().unwrap();
self.wait_task(update_id as u64).await;
update_id as u64
let update_id = response["taskUid"].as_u64().unwrap();
self.wait_task(update_id).await;
update_id
}
pub async fn load_test_set_ndjson(&self) -> u64 {
@ -60,9 +58,9 @@ impl<'a> Index<'a, Owned> {
)
.await;
assert_eq!(code, 202);
let update_id = response["taskUid"].as_i64().unwrap();
self.wait_task(update_id as u64).await;
update_id as u64
let update_id = response["taskUid"].as_u64().unwrap();
self.wait_task(update_id).await;
update_id
}
pub async fn create(&self, primary_key: Option<&str>) -> (Value, StatusCode) {
@ -364,21 +362,8 @@ impl<State> Index<'_, State> {
self.service.delete(url).await
}
pub async fn wait_task(&self, update_id: u64) -> Value {
// try several times to get status, or panic to not wait forever
let url = format!("/tasks/{}", update_id);
for _ in 0..100 {
let (response, status_code) = self.service.get(&url).await;
assert_eq!(200, status_code, "response: {}", response);
if response["status"] == "succeeded" || response["status"] == "failed" {
return response;
}
// wait 0.5 second.
sleep(Duration::from_millis(500)).await;
}
panic!("Timeout waiting for update id");
async fn wait_task(&self, update_id: u64) -> Value {
Server::<Shared>::_wait_task(async |url| self.service.get(url).await, update_id).await
}
pub async fn get_task(&self, update_id: u64) -> (Value, StatusCode) {

View file

@ -181,7 +181,7 @@ pub async fn shared_empty_index() -> &'static Index<'static, Shared> {
let server = Server::new_shared();
let index = server._index("EMPTY_INDEX").to_shared();
let (response, _code) = index._create(None).await;
index.wait_task(response.uid()).await.succeeded();
server.wait_task(response.uid()).await.succeeded();
index
})
.await
@ -229,13 +229,13 @@ pub async fn shared_index_with_documents() -> &'static Index<'static, Shared> {
let index = server._index("SHARED_DOCUMENTS").to_shared();
let documents = DOCUMENTS.clone();
let (response, _code) = index._add_documents(documents, None).await;
index.wait_task(response.uid()).await.succeeded();
server.wait_task(response.uid()).await.succeeded();
let (response, _code) = index
._update_settings(
json!({"filterableAttributes": ["id", "title"], "sortableAttributes": ["id", "title"]}),
)
.await;
index.wait_task(response.uid()).await.succeeded();
server.wait_task(response.uid()).await.succeeded();
index
}).await
}
@ -272,13 +272,13 @@ pub async fn shared_index_with_score_documents() -> &'static Index<'static, Shar
let index = server._index("SHARED_SCORE_DOCUMENTS").to_shared();
let documents = SCORE_DOCUMENTS.clone();
let (response, _code) = index._add_documents(documents, None).await;
index.wait_task(response.uid()).await.succeeded();
server.wait_task(response.uid()).await.succeeded();
let (response, _code) = index
._update_settings(
json!({"filterableAttributes": ["id", "title"], "sortableAttributes": ["id", "title"]}),
)
.await;
index.wait_task(response.uid()).await.succeeded();
server.wait_task(response.uid()).await.succeeded();
index
}).await
}
@ -349,13 +349,13 @@ pub async fn shared_index_with_nested_documents() -> &'static Index<'static, Sha
let index = server._index("SHARED_NESTED_DOCUMENTS").to_shared();
let documents = NESTED_DOCUMENTS.clone();
let (response, _code) = index._add_documents(documents, None).await;
index.wait_task(response.uid()).await.succeeded();
server.wait_task(response.uid()).await.succeeded();
let (response, _code) = index
._update_settings(
json!({"filterableAttributes": ["father", "doggos", "cattos"], "sortableAttributes": ["doggos"]}),
)
.await;
index.wait_task(response.uid()).await.succeeded();
server.wait_task(response.uid()).await.succeeded();
index
}).await
}
@ -449,7 +449,7 @@ pub async fn shared_index_with_test_set() -> &'static Index<'static, Shared> {
)
.await;
assert_eq!(code, 202);
index.wait_task(response.uid()).await.succeeded();
server.wait_task(response.uid()).await.succeeded();
index
})
.await
@ -496,14 +496,14 @@ pub async fn shared_index_with_geo_documents() -> &'static Index<'static, Shared
let server = Server::new_shared();
let index = server._index("SHARED_GEO_DOCUMENTS").to_shared();
let (response, _code) = index._add_documents(GEO_DOCUMENTS.clone(), None).await;
index.wait_task(response.uid()).await.succeeded();
server.wait_task(response.uid()).await.succeeded();
let (response, _code) = index
._update_settings(
json!({"filterableAttributes": ["_geo"], "sortableAttributes": ["_geo"]}),
)
.await;
index.wait_task(response.uid()).await.succeeded();
server.wait_task(response.uid()).await.succeeded();
index
})
.await

View file

@ -407,13 +407,20 @@ impl<State> Server<State> {
}
pub async fn wait_task(&self, update_id: u64) -> Value {
Server::<Shared>::_wait_task(async |url| self.service.get(url).await, update_id).await
}
pub(super) async fn _wait_task<F>(request_fn: F, update_id: u64) -> Value
where
F: AsyncFnOnce(String) -> (Value, StatusCode) + Copy,
{
// try several times to get status, or panic to not wait forever
let url = format!("/tasks/{}", update_id);
let max_attempts = 400; // 200 seconds total, 0.5s per attempt
let url = format!("/tasks/{update_id}");
let max_attempts = 400; // 200 seconds in total, 0.5secs per attempt
for i in 0..max_attempts {
let (response, status_code) = self.service.get(&url).await;
assert_eq!(200, status_code, "response: {}", response);
let (response, status_code) = request_fn(url.clone()).await;
assert_eq!(200, status_code, "response: {response}");
if response["status"] == "succeeded" || response["status"] == "failed" {
return response;