513 lines
19 KiB
Rust
Raw Normal View History

2022-10-20 18:00:07 +02:00
use std::fmt::Write;
use std::marker::PhantomData;
2022-10-20 18:00:07 +02:00
use std::panic::{catch_unwind, resume_unwind, UnwindSafe};
use std::time::Duration;
2021-02-18 22:10:50 +01:00
2021-02-18 19:50:52 +01:00
use actix_web::http::StatusCode;
2021-03-10 14:47:04 +01:00
use tokio::time::sleep;
use urlencoding::encode as urlencode;
2021-02-18 19:50:52 +01:00
use super::encoder::Encoder;
2022-10-20 18:00:07 +02:00
use super::service::Service;
2023-09-11 16:50:53 +02:00
use super::Value;
use super::{Owned, Shared};
2023-09-11 16:50:53 +02:00
use crate::json;
pub struct Index<'a, State = Owned> {
2021-02-18 19:50:52 +01:00
pub uid: String,
pub service: &'a Service,
pub(super) encoder: Encoder,
pub(super) marker: PhantomData<State>,
2021-02-18 19:50:52 +01:00
}
impl<'a> Index<'a, Owned> {
pub fn to_shared(&self) -> Index<'a, Shared> {
Index {
uid: self.uid.clone(),
service: self.service,
encoder: self.encoder,
marker: PhantomData,
}
2021-02-18 19:50:52 +01:00
}
2021-02-19 19:55:00 +01:00
pub async fn load_test_set(&self) -> u64 {
let url = format!("/indexes/{}/documents", urlencode(self.uid.as_ref()));
2022-12-21 11:27:15 +08:00
let (response, code) = self
.service
.post_str(
url,
include_str!("../assets/test_set.json"),
vec![("content-type", "application/json")],
2022-12-21 11:27:15 +08:00
)
.await;
assert_eq!(code, 202);
let update_id = response["taskUid"].as_i64().unwrap();
self.wait_task(update_id as u64).await;
update_id as u64
}
pub async fn load_test_set_ndjson(&self) -> u64 {
let url = format!("/indexes/{}/documents", urlencode(self.uid.as_ref()));
let (response, code) = self
.service
.post_str(
url,
include_str!("../assets/test_set.ndjson"),
vec![("content-type", "application/x-ndjson")],
2022-12-21 11:27:15 +08:00
)
.await;
assert_eq!(code, 202);
let update_id = response["taskUid"].as_i64().unwrap();
self.wait_task(update_id as u64).await;
2021-02-19 19:55:00 +01:00
update_id as u64
}
2021-05-31 16:03:39 +02:00
pub async fn create(&self, primary_key: Option<&str>) -> (Value, StatusCode) {
self._create(primary_key).await
2021-02-18 19:50:52 +01:00
}
2021-02-18 20:28:10 +01:00
2023-01-24 13:20:20 +01:00
pub async fn update_raw(&self, body: Value) -> (Value, StatusCode) {
let url = format!("/indexes/{}", urlencode(self.uid.as_ref()));
self.service.patch_encoded(url, body, self.encoder).await
}
2021-02-18 20:28:10 +01:00
pub async fn update(&self, primary_key: Option<&str>) -> (Value, StatusCode) {
let body = json!({
"primaryKey": primary_key,
});
let url = format!("/indexes/{}", urlencode(self.uid.as_ref()));
2021-02-18 20:28:10 +01:00
self.service.patch_encoded(url, body, self.encoder).await
2021-02-18 20:28:10 +01:00
}
2021-02-18 20:44:33 +01:00
pub async fn delete(&self) -> (Value, StatusCode) {
let url = format!("/indexes/{}", urlencode(self.uid.as_ref()));
2021-02-18 20:44:33 +01:00
self.service.delete(url).await
}
2021-02-18 22:10:50 +01:00
pub async fn add_documents(
&self,
documents: Value,
primary_key: Option<&str>,
) -> (Value, StatusCode) {
self._add_documents(documents, primary_key).await
2021-02-18 22:10:50 +01:00
}
pub async fn raw_add_documents(
&self,
payload: &str,
headers: Vec<(&str, &str)>,
query_parameter: &str,
) -> (Value, StatusCode) {
let url = format!("/indexes/{}/documents{}", urlencode(self.uid.as_ref()), query_parameter);
self.service.post_str(url, payload, headers).await
}
2021-03-15 18:11:10 +01:00
pub async fn update_documents(
&self,
documents: Value,
primary_key: Option<&str>,
) -> (Value, StatusCode) {
2021-02-19 19:55:00 +01:00
let url = match primary_key {
2022-10-20 18:00:07 +02:00
Some(key) => {
format!("/indexes/{}/documents?primaryKey={}", urlencode(self.uid.as_ref()), key)
}
None => format!("/indexes/{}/documents", urlencode(self.uid.as_ref())),
2021-02-19 19:55:00 +01:00
};
self.service.put_encoded(url, documents, self.encoder).await
2021-02-19 19:55:00 +01:00
}
pub async fn raw_update_documents(
&self,
payload: &str,
content_type: Option<&str>,
query_parameter: &str,
) -> (Value, StatusCode) {
let url = format!("/indexes/{}/documents{}", urlencode(self.uid.as_ref()), query_parameter);
if let Some(content_type) = content_type {
self.service.put_str(url, payload, vec![("Content-Type", content_type)]).await
} else {
self.service.put_str(url, payload, Vec::new()).await
}
}
pub async fn list_tasks(&self) -> (Value, StatusCode) {
Bring back `release-v0.30.0` into `release-v0.30.0-temp` (final: into `main`) (#3145) * Fix error code of the "duplicate index found" error * Use the content of the ProcessingTasks in the tasks cancelation system * Change the missing_filters error code into missing_task_filters * WIP Introduce the invalid_task_uid error code * Use more precise error codes/message for the task routes + Allow star operator in delete/cancel tasks + rename originalQuery to originalFilters + Display error/canceled_by in task view even when they are = null + Rename task filter fields by using their plural forms + Prepare an error code for canceledBy filter + Only return global tasks if the API key action `index.*` is there * Add canceledBy task filter * Update tests following task API changes * Rename original_query to original_filters everywhere * Update more insta-snap tests * Make clippy happy They're a happy clip now. * Make rustfmt happy >:-( * Fix Index name parsing error message to fit the specification * Bump milli version to 0.35.1 * Fix the new error messages * fix the error messages and add tests * rename the error codes for the sake of consistency * refactor the way we send the cli informations + add the analytics for the config file and ssl usage * Apply suggestions from code review Co-authored-by: Clément Renault <clement@meilisearch.com> * add a comment over the new infos structure * reformat, sorry @kero * Store analytics for the documents deletions * Add analytics on all the settings * Spawn threads with names * Spawn rayon threads with names * update the distinct attributes to the spec update * update the analytics on the search route * implements the analytics on the health and version routes * Fix task details serialization * Add the question mark to the task deletion query filter * Add the question mark to the task cancelation query filter * Fix tests * add analytics on the task route * Add all the missing fields of the new task query type * Create a new analytics for the task deletion * Create a new analytics for the task creation * batch the tasks seen events * Update the finite pagination analytics * add the analytics of the swap-indexes route * Stop removing the DB when failing to read it * Rename originalFilters into originalFilters * Rename matchedDocuments into providedIds * Add `workflow_dispatch` to flaky.yml * Bump grenad to 0.4.4 * Bump milli to version v0.37.0 * Don't multiply total memory returned by sysinfo anymore sysinfo now returns bytes rather than KB * Add a dispatch to the publish binaries workflow * Fix publish release CI * Don't use gold but the default linker * Always display details for the indexDeletion task * Fix the insta tests * refactorize the whole test suite 1. Make a call to assert_internally_consistent automatically when snapshoting the scheduler. There is no point in snapshoting something broken and expect the dumb humans to notice. 2. Replace every possible call to assert_internally_consistent by a snapshot of the scheduler. It takes as many lines and ensure we never change something without noticing in any tests ever. 3. Name every snapshots: it's easier to debug when something goes wrong and easier to review in general. 4. Stop skipping breakpoints, it's too easy to miss something. Now you must explicitely show which path is the scheduler supposed to use. 5. Add a timeout on the channel.recv, it eases the process of writing tests, now when something file you get a failure instead of a deadlock. * rebase on release-v0.30 * makes clippy happy * update the snapshots after a rebase * try to remove the flakyness of the failing test * Add more analytics on the ranking rules positions * Update the dump test to check for the dumpUid dumpCreation task details * send the ranking rules as a string because amplitude is too dumb to process an array as a single value * Display a null dumpUid until we computed the dump itself on disk * Update tests * Check if the master key is missing before returning an error Co-authored-by: Loïc Lecrenier <loic.lecrenier@me.com> Co-authored-by: bors[bot] <26634292+bors[bot]@users.noreply.github.com> Co-authored-by: Kerollmops <clement@meilisearch.com> Co-authored-by: ManyTheFish <many@meilisearch.com> Co-authored-by: Tamo <tamo@meilisearch.com> Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2022-11-28 16:27:41 +01:00
let url = format!("/tasks?indexUids={}", self.uid);
2021-02-18 22:10:50 +01:00
self.service.get(url).await
}
2021-02-19 19:55:00 +01:00
2021-02-22 16:03:17 +01:00
pub async fn delete_document(&self, id: u64) -> (Value, StatusCode) {
let url = format!("/indexes/{}/documents/{}", urlencode(self.uid.as_ref()), id);
2021-02-22 16:03:17 +01:00
self.service.delete(url).await
}
2023-04-11 19:34:25 +02:00
pub async fn delete_document_by_filter(&self, body: Value) -> (Value, StatusCode) {
2023-04-27 13:51:02 +02:00
let url = format!("/indexes/{}/documents/delete", urlencode(self.uid.as_ref()));
2023-04-11 19:34:25 +02:00
self.service.post_encoded(url, body, self.encoder).await
}
2021-02-22 16:03:17 +01:00
pub async fn clear_all_documents(&self) -> (Value, StatusCode) {
let url = format!("/indexes/{}/documents", urlencode(self.uid.as_ref()));
2021-02-22 16:03:17 +01:00
self.service.delete(url).await
}
2021-02-23 14:13:43 +01:00
pub async fn delete_batch(&self, ids: Vec<u64>) -> (Value, StatusCode) {
2022-10-20 18:00:07 +02:00
let url = format!("/indexes/{}/documents/delete-batch", urlencode(self.uid.as_ref()));
2023-09-11 16:50:53 +02:00
self.service
.post_encoded(url, serde_json::to_value(&ids).unwrap().into(), self.encoder)
.await
2021-02-23 14:13:43 +01:00
}
2021-02-24 09:30:51 +01:00
pub async fn delete_batch_raw(&self, body: Value) -> (Value, StatusCode) {
let url = format!("/indexes/{}/documents/delete-batch", urlencode(self.uid.as_ref()));
self.service.post_encoded(url, body, self.encoder).await
}
2021-02-24 09:30:51 +01:00
pub async fn update_settings(&self, settings: Value) -> (Value, StatusCode) {
self._update_settings(settings).await
2021-02-24 09:30:51 +01:00
}
2021-02-24 10:14:36 +01:00
2023-01-11 17:04:24 +01:00
pub async fn update_settings_displayed_attributes(
&self,
settings: Value,
) -> (Value, StatusCode) {
let url =
format!("/indexes/{}/settings/displayed-attributes", urlencode(self.uid.as_ref()));
self.service.put_encoded(url, settings, self.encoder).await
}
pub async fn update_settings_searchable_attributes(
&self,
settings: Value,
) -> (Value, StatusCode) {
let url =
format!("/indexes/{}/settings/searchable-attributes", urlencode(self.uid.as_ref()));
self.service.put_encoded(url, settings, self.encoder).await
}
pub async fn update_settings_filterable_attributes(
&self,
settings: Value,
) -> (Value, StatusCode) {
let url =
format!("/indexes/{}/settings/filterable-attributes", urlencode(self.uid.as_ref()));
self.service.put_encoded(url, settings, self.encoder).await
}
pub async fn update_settings_sortable_attributes(
&self,
settings: Value,
) -> (Value, StatusCode) {
let url = format!("/indexes/{}/settings/sortable-attributes", urlencode(self.uid.as_ref()));
self.service.put_encoded(url, settings, self.encoder).await
}
pub async fn update_settings_ranking_rules(&self, settings: Value) -> (Value, StatusCode) {
let url = format!("/indexes/{}/settings/ranking-rules", urlencode(self.uid.as_ref()));
self.service.put_encoded(url, settings, self.encoder).await
}
pub async fn update_settings_stop_words(&self, settings: Value) -> (Value, StatusCode) {
let url = format!("/indexes/{}/settings/stop-words", urlencode(self.uid.as_ref()));
self.service.put_encoded(url, settings, self.encoder).await
}
pub async fn update_settings_synonyms(&self, settings: Value) -> (Value, StatusCode) {
let url = format!("/indexes/{}/settings/synonyms", urlencode(self.uid.as_ref()));
self.service.put_encoded(url, settings, self.encoder).await
}
pub async fn update_settings_distinct_attribute(&self, settings: Value) -> (Value, StatusCode) {
let url = format!("/indexes/{}/settings/distinct-attribute", urlencode(self.uid.as_ref()));
self.service.put_encoded(url, settings, self.encoder).await
}
pub async fn update_settings_typo_tolerance(&self, settings: Value) -> (Value, StatusCode) {
let url = format!("/indexes/{}/settings/typo-tolerance", urlencode(self.uid.as_ref()));
self.service.patch_encoded(url, settings, self.encoder).await
}
pub async fn update_settings_faceting(&self, settings: Value) -> (Value, StatusCode) {
let url = format!("/indexes/{}/settings/faceting", urlencode(self.uid.as_ref()));
self.service.patch_encoded(url, settings, self.encoder).await
}
pub async fn update_settings_pagination(&self, settings: Value) -> (Value, StatusCode) {
let url = format!("/indexes/{}/settings/pagination", urlencode(self.uid.as_ref()));
self.service.patch_encoded(url, settings, self.encoder).await
}
2024-03-19 11:14:28 +01:00
pub async fn update_settings_search_cutoff_ms(&self, settings: Value) -> (Value, StatusCode) {
let url = format!("/indexes/{}/settings/search-cutoff-ms", urlencode(self.uid.as_ref()));
self.service.put_encoded(url, settings, self.encoder).await
}
2021-02-24 10:14:36 +01:00
pub async fn delete_settings(&self) -> (Value, StatusCode) {
let url = format!("/indexes/{}/settings", urlencode(self.uid.as_ref()));
2021-02-24 10:14:36 +01:00
self.service.delete(url).await
}
2021-04-01 21:54:37 +03:00
pub async fn update_distinct_attribute(&self, value: Value) -> (Value, StatusCode) {
let url =
format!("/indexes/{}/settings/{}", urlencode(self.uid.as_ref()), "distinct-attribute");
self.service.put_encoded(url, value, self.encoder).await
}
}
impl<'a> Index<'a, Shared> {
/// You cannot modify the content of a shared index, thus the delete_document_by_filter call
/// must fail. If the task successfully enqueue itself, we'll wait for the task to finishes,
/// and if it succeed the function will panic.
pub async fn delete_document_by_filter_fail(&self, body: Value) -> (Value, StatusCode) {
let (mut task, code) = self._delete_document_by_filter(body).await;
if code.is_success() {
task = self.wait_task(task.uid()).await;
if task.is_success() {
panic!(
"`delete_document_by_filter_fail` succeeded: {}",
serde_json::to_string_pretty(&task).unwrap()
);
}
}
(task, code)
}
2024-10-02 16:23:21 +02:00
pub async fn delete_index_fail(&self) -> (Value, StatusCode) {
let (mut task, code) = self._delete().await;
if code.is_success() {
task = self.wait_task(task.uid()).await;
if task.is_success() {
panic!(
"`delete_index_fail` succeeded: {}",
serde_json::to_string_pretty(&task).unwrap()
);
}
}
(task, code)
}
}
#[allow(dead_code)]
impl<State> Index<'_, State> {
pub async fn get(&self) -> (Value, StatusCode) {
let url = format!("/indexes/{}", urlencode(self.uid.as_ref()));
self.service.get(url).await
}
/// add_documents is not allowed on shared index but we need to use it to initialize
/// a bunch of very common indexes in `common/mod.rs`.
pub(super) async fn _add_documents(
&self,
documents: Value,
primary_key: Option<&str>,
) -> (Value, StatusCode) {
let url = match primary_key {
Some(key) => {
format!("/indexes/{}/documents?primaryKey={}", urlencode(self.uid.as_ref()), key)
}
None => format!("/indexes/{}/documents", urlencode(self.uid.as_ref())),
};
self.service.post_encoded(url, documents, self.encoder).await
}
pub(super) async fn _update_settings(&self, settings: Value) -> (Value, StatusCode) {
let url = format!("/indexes/{}/settings", urlencode(self.uid.as_ref()));
self.service.patch_encoded(url, settings, self.encoder).await
}
pub(super) async fn _delete_document_by_filter(&self, body: Value) -> (Value, StatusCode) {
let url = format!("/indexes/{}/documents/delete", urlencode(self.uid.as_ref()));
self.service.post_encoded(url, body, self.encoder).await
}
pub(super) async fn _create(&self, primary_key: Option<&str>) -> (Value, StatusCode) {
let body = json!({
"uid": self.uid,
"primaryKey": primary_key,
});
self.service.post_encoded("/indexes", body, self.encoder).await
}
2024-10-02 16:23:21 +02:00
pub(super) async fn _delete(&self) -> (Value, StatusCode) {
let url = format!("/indexes/{}", urlencode(self.uid.as_ref()));
self.service.delete(url).await
}
pub async fn wait_task(&self, update_id: u64) -> Value {
// try several times to get status, or panic to not wait forever
let url = format!("/tasks/{}", update_id);
for _ in 0..100 {
let (response, status_code) = self.service.get(&url).await;
assert_eq!(200, status_code, "response: {}", response);
if response["status"] == "succeeded" || response["status"] == "failed" {
return response;
}
// wait 0.5 second.
sleep(Duration::from_millis(500)).await;
}
panic!("Timeout waiting for update id");
}
pub async fn get_task(&self, update_id: u64) -> (Value, StatusCode) {
let url = format!("/tasks/{}", update_id);
self.service.get(url).await
}
pub async fn filtered_tasks(
&self,
types: &[&str],
statuses: &[&str],
canceled_by: &[&str],
) -> (Value, StatusCode) {
let mut url = format!("/tasks?indexUids={}", self.uid);
if !types.is_empty() {
let _ = write!(url, "&types={}", types.join(","));
}
if !statuses.is_empty() {
let _ = write!(url, "&statuses={}", statuses.join(","));
}
if !canceled_by.is_empty() {
let _ = write!(url, "&canceledBy={}", canceled_by.join(","));
}
self.service.get(url).await
}
pub async fn get_document(&self, id: u64, options: Option<Value>) -> (Value, StatusCode) {
let mut url = format!("/indexes/{}/documents/{}", urlencode(self.uid.as_ref()), id);
if let Some(options) = options {
write!(url, "{}", yaup::to_string(&options).unwrap()).unwrap();
}
self.service.get(url).await
}
pub async fn get_document_by_filter(&self, payload: Value) -> (Value, StatusCode) {
let url = format!("/indexes/{}/documents/fetch", urlencode(self.uid.as_ref()));
self.service.post(url, payload).await
}
pub async fn get_all_documents_raw(&self, options: &str) -> (Value, StatusCode) {
let url = format!("/indexes/{}/documents{}", urlencode(self.uid.as_ref()), options);
self.service.get(url).await
}
pub async fn get_all_documents(&self, options: GetAllDocumentsOptions) -> (Value, StatusCode) {
let url = format!(
"/indexes/{}/documents{}",
urlencode(self.uid.as_ref()),
yaup::to_string(&options).unwrap()
);
self.service.get(url).await
}
pub async fn settings(&self) -> (Value, StatusCode) {
let url = format!("/indexes/{}/settings", urlencode(self.uid.as_ref()));
self.service.get(url).await
}
2021-04-01 21:54:37 +03:00
pub async fn stats(&self) -> (Value, StatusCode) {
let url = format!("/indexes/{}/stats", urlencode(self.uid.as_ref()));
2021-04-01 21:54:37 +03:00
self.service.get(url).await
}
2021-04-20 12:07:22 +02:00
2021-07-06 11:54:37 +02:00
/// Performs both GET and POST search queries
pub async fn search(
&self,
query: Value,
test: impl Fn(Value, StatusCode) + UnwindSafe + Clone,
) {
2023-06-22 14:42:52 +02:00
let post = self.search_post(query.clone()).await;
let query = yaup::to_string(&query).unwrap();
2023-06-22 14:42:52 +02:00
let get = self.search_get(&query).await;
insta::allow_duplicates! {
let (response, code) = post;
let t = test.clone();
if let Err(e) = catch_unwind(move || t(response, code)) {
eprintln!("Error with post search");
resume_unwind(e);
}
let (response, code) = get;
if let Err(e) = catch_unwind(move || test(response, code)) {
eprintln!("Error with get search");
resume_unwind(e);
}
2021-07-06 11:54:09 +02:00
}
}
pub async fn search_post(&self, query: Value) -> (Value, StatusCode) {
let url = format!("/indexes/{}/search", urlencode(self.uid.as_ref()));
self.service.post_encoded(url, query, self.encoder).await
2021-07-06 11:54:09 +02:00
}
pub async fn search_get(&self, query: &str) -> (Value, StatusCode) {
2024-06-17 16:21:51 +02:00
let url = format!("/indexes/{}/search{}", urlencode(self.uid.as_ref()), query);
2021-07-06 11:54:09 +02:00
self.service.get(url).await
2024-05-27 14:29:13 +02:00
}
/// Performs both GET and POST similar queries
pub async fn similar(
&self,
query: Value,
test: impl Fn(Value, StatusCode) + UnwindSafe + Clone,
) {
let post = self.similar_post(query.clone()).await;
let query = yaup::to_string(&query).unwrap();
let get = self.similar_get(&query).await;
insta::allow_duplicates! {
let (response, code) = post;
let t = test.clone();
if let Err(e) = catch_unwind(move || t(response, code)) {
eprintln!("Error with post search");
resume_unwind(e);
}
let (response, code) = get;
if let Err(e) = catch_unwind(move || test(response, code)) {
eprintln!("Error with get search");
resume_unwind(e);
}
}
}
pub async fn similar_post(&self, query: Value) -> (Value, StatusCode) {
let url = format!("/indexes/{}/similar", urlencode(self.uid.as_ref()));
self.service.post_encoded(url, query, self.encoder).await
}
pub async fn similar_get(&self, query: &str) -> (Value, StatusCode) {
2024-06-17 16:21:51 +02:00
let url = format!("/indexes/{}/similar{}", urlencode(self.uid.as_ref()), query);
2024-05-27 14:29:13 +02:00
self.service.get(url).await
2021-07-06 11:54:09 +02:00
}
2023-06-28 11:31:52 +02:00
pub async fn facet_search(&self, query: Value) -> (Value, StatusCode) {
let url = format!("/indexes/{}/facet-search", urlencode(self.uid.as_ref()));
self.service.post_encoded(url, query, self.encoder).await
}
2022-06-02 12:16:46 +02:00
pub async fn get_distinct_attribute(&self) -> (Value, StatusCode) {
2022-10-20 18:00:07 +02:00
let url =
format!("/indexes/{}/settings/{}", urlencode(self.uid.as_ref()), "distinct-attribute");
2022-06-02 12:16:46 +02:00
self.service.get(url).await
}
2021-02-18 19:50:52 +01:00
}
2021-02-19 19:55:00 +01:00
#[derive(Debug, Default, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct GetAllDocumentsOptions {
2024-07-02 15:18:30 +02:00
#[serde(skip_serializing_if = "Option::is_none")]
pub limit: Option<usize>,
2024-07-02 15:18:30 +02:00
#[serde(skip_serializing_if = "Option::is_none")]
pub offset: Option<usize>,
2024-07-02 15:18:30 +02:00
#[serde(skip_serializing_if = "Option::is_none")]
2024-06-06 16:31:07 +02:00
pub fields: Option<Vec<&'static str>>,
2024-07-02 15:18:30 +02:00
pub retrieve_vectors: bool,
}