mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-25 20:57:35 +01:00
Merge #1797
1797: Import stable into main (v0.22.0) r=MarinPostma a=curquiza Co-authored-by: Tamo <tamo@meilisearch.com> Co-authored-by: bors[bot] <26634292+bors[bot]@users.noreply.github.com> Co-authored-by: mpostma <postma.marin@protonmail.com> Co-authored-by: many <maxime@meilisearch.com> Co-authored-by: Clémentine Urquizar <clementine@meilisearch.com>
This commit is contained in:
commit
776befc1f0
100
Cargo.lock
generated
100
Cargo.lock
generated
@ -825,6 +825,12 @@ version = "1.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f2c9736e15e7df1638a7f6eee92a6511615c738246a052af5ba86f039b65aede"
|
||||
|
||||
[[package]]
|
||||
name = "difference"
|
||||
version = "2.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198"
|
||||
|
||||
[[package]]
|
||||
name = "digest"
|
||||
version = "0.8.1"
|
||||
@ -849,6 +855,12 @@ version = "1.0.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0"
|
||||
|
||||
[[package]]
|
||||
name = "downcast"
|
||||
version = "0.10.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4bb454f0228b18c7f4c3b0ebbee346ed9c52e7443b0999cd543ff3571205701d"
|
||||
|
||||
[[package]]
|
||||
name = "either"
|
||||
version = "1.6.1"
|
||||
@ -933,6 +945,15 @@ dependencies = [
|
||||
"miniz_oxide",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "float-cmp"
|
||||
version = "0.8.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e1267f4ac4f343772758f7b1bdcbe767c218bbab93bb432acbf5162bbf85a6c4"
|
||||
dependencies = [
|
||||
"num-traits",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fnv"
|
||||
version = "1.0.7"
|
||||
@ -949,6 +970,12 @@ dependencies = [
|
||||
"percent-encoding",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fragile"
|
||||
version = "1.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "69a039c3498dc930fe810151a34ba0c1c70b02b8625035592e74432f678591f2"
|
||||
|
||||
[[package]]
|
||||
name = "fs_extra"
|
||||
version = "1.2.0"
|
||||
@ -1686,6 +1713,7 @@ dependencies = [
|
||||
"meilisearch-tokenizer",
|
||||
"milli",
|
||||
"mime",
|
||||
"mockall",
|
||||
"num_cpus",
|
||||
"obkv",
|
||||
"once_cell",
|
||||
@ -1755,8 +1783,8 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "milli"
|
||||
version = "0.17.0"
|
||||
source = "git+https://github.com/meilisearch/milli.git?tag=v0.17.0#22551d0941bee1a9cdcf7d5bfc4ca46517dd25f3"
|
||||
version = "0.17.2"
|
||||
source = "git+https://github.com/meilisearch/milli.git?tag=v0.17.2#07fb6d64e579b17e6565e9aa7f444e1b03802f4a"
|
||||
dependencies = [
|
||||
"bimap",
|
||||
"bincode",
|
||||
@ -1845,6 +1873,39 @@ dependencies = [
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mockall"
|
||||
version = "0.10.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6ab571328afa78ae322493cacca3efac6a0f2e0a67305b4df31fd439ef129ac0"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
"downcast",
|
||||
"fragile",
|
||||
"lazy_static",
|
||||
"mockall_derive",
|
||||
"predicates",
|
||||
"predicates-tree",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "mockall_derive"
|
||||
version = "0.10.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e7e25b214433f669161f414959594216d8e6ba83b6679d3db96899c0b4639033"
|
||||
dependencies = [
|
||||
"cfg-if 1.0.0",
|
||||
"proc-macro2 1.0.29",
|
||||
"quote 1.0.9",
|
||||
"syn 1.0.77",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "normalize-line-endings"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be"
|
||||
|
||||
[[package]]
|
||||
name = "ntapi"
|
||||
version = "0.3.6"
|
||||
@ -2117,6 +2178,35 @@ version = "0.2.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
|
||||
|
||||
[[package]]
|
||||
name = "predicates"
|
||||
version = "1.0.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f49cfaf7fdaa3bfacc6fa3e7054e65148878354a5cfddcf661df4c851f8021df"
|
||||
dependencies = [
|
||||
"difference",
|
||||
"float-cmp",
|
||||
"normalize-line-endings",
|
||||
"predicates-core",
|
||||
"regex",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "predicates-core"
|
||||
version = "1.0.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "57e35a3326b75e49aa85f5dc6ec15b41108cf5aee58eabb1f274dd18b73c2451"
|
||||
|
||||
[[package]]
|
||||
name = "predicates-tree"
|
||||
version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d7dd0fd014130206c9352efbdc92be592751b2b9274dff685348341082c6ea3d"
|
||||
dependencies = [
|
||||
"predicates-core",
|
||||
"treeline",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "proc-macro-error"
|
||||
version = "1.0.4"
|
||||
@ -3042,6 +3132,12 @@ dependencies = [
|
||||
"lazy_static",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "treeline"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a7f741b240f1a48843f9b8e0444fb55fb2a4ff67293b50a9179dfd5ea67f8d41"
|
||||
|
||||
[[package]]
|
||||
name = "try-lock"
|
||||
version = "0.2.3"
|
||||
|
@ -11,21 +11,31 @@ use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum MeilisearchHttpError {
|
||||
#[error("A Content-Type header is missing. Accepted values for the Content-Type header are: \"application/json\", \"application/x-ndjson\", \"text/csv\"")]
|
||||
MissingContentType,
|
||||
#[error("The Content-Type \"{0}\" is invalid. Accepted values for the Content-Type header are: \"application/json\", \"application/x-ndjson\", \"text/csv\"")]
|
||||
InvalidContentType(String),
|
||||
#[error("A Content-Type header is missing. Accepted values for the Content-Type header are: {}",
|
||||
.0.iter().map(|s| format!("\"{}\"", s)).collect::<Vec<_>>().join(", "))]
|
||||
MissingContentType(Vec<String>),
|
||||
#[error(
|
||||
"The Content-Type \"{0}\" is invalid. Accepted values for the Content-Type header are: {}",
|
||||
.1.iter().map(|s| format!("\"{}\"", s)).collect::<Vec<_>>().join(", ")
|
||||
)]
|
||||
InvalidContentType(String, Vec<String>),
|
||||
}
|
||||
|
||||
impl ErrorCode for MeilisearchHttpError {
|
||||
fn error_code(&self) -> Code {
|
||||
match self {
|
||||
MeilisearchHttpError::MissingContentType => Code::MissingContentType,
|
||||
MeilisearchHttpError::InvalidContentType(_) => Code::InvalidContentType,
|
||||
MeilisearchHttpError::MissingContentType(_) => Code::MissingContentType,
|
||||
MeilisearchHttpError::InvalidContentType(_, _) => Code::InvalidContentType,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<MeilisearchHttpError> for aweb::Error {
|
||||
fn from(other: MeilisearchHttpError) -> Self {
|
||||
aweb::Error::from(ResponseError::from(other))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ResponseError {
|
||||
@ -121,9 +131,8 @@ impl From<QueryPayloadError> for PayloadError {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn payload_error_handler<E>(err: E) -> ResponseError
|
||||
where
|
||||
E: Into<PayloadError>,
|
||||
{
|
||||
err.into().into()
|
||||
impl From<PayloadError> for aweb::Error {
|
||||
fn from(other: PayloadError) -> Self {
|
||||
aweb::Error::from(ResponseError::from(other))
|
||||
}
|
||||
}
|
||||
|
@ -168,7 +168,8 @@ impl<P: Policy + 'static, D: 'static + Clone> FromRequest for GuardedData<P, D>
|
||||
None => err(AuthenticationError::IrretrievableState.into()),
|
||||
}
|
||||
} else {
|
||||
err(AuthenticationError::InvalidToken(String::from("hello")).into())
|
||||
let token = token.to_str().unwrap_or("unknown").to_string();
|
||||
err(AuthenticationError::InvalidToken(token).into())
|
||||
}
|
||||
}
|
||||
None => err(AuthenticationError::MissingAuthorizationHeader.into()),
|
||||
|
@ -11,10 +11,14 @@ pub mod routes;
|
||||
use std::path::Path;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::error::MeilisearchHttpError;
|
||||
use crate::extractors::authentication::AuthConfig;
|
||||
use actix_web::error::JsonPayloadError;
|
||||
use error::PayloadError;
|
||||
use http::header::CONTENT_TYPE;
|
||||
pub use option::Opt;
|
||||
|
||||
use actix_web::web;
|
||||
use actix_web::{web, HttpRequest};
|
||||
|
||||
use extractors::authentication::policies::*;
|
||||
use extractors::payload::PayloadConfig;
|
||||
@ -98,14 +102,25 @@ pub fn configure_data(config: &mut web::ServiceConfig, data: MeiliSearch, opt: &
|
||||
.app_data(data)
|
||||
.app_data(
|
||||
web::JsonConfig::default()
|
||||
.limit(http_payload_size_limit)
|
||||
.content_type(|_mime| true) // Accept all mime types
|
||||
.error_handler(|err, _req| error::payload_error_handler(err).into()),
|
||||
.content_type(|mime| mime == mime::APPLICATION_JSON)
|
||||
.error_handler(|err, req: &HttpRequest| match err {
|
||||
JsonPayloadError::ContentType => match req.headers().get(CONTENT_TYPE) {
|
||||
Some(content_type) => MeilisearchHttpError::InvalidContentType(
|
||||
content_type.to_str().unwrap_or("unknown").to_string(),
|
||||
vec![mime::APPLICATION_JSON.to_string()],
|
||||
)
|
||||
.into(),
|
||||
None => MeilisearchHttpError::MissingContentType(vec![
|
||||
mime::APPLICATION_JSON.to_string(),
|
||||
])
|
||||
.into(),
|
||||
},
|
||||
err => PayloadError::from(err).into(),
|
||||
}),
|
||||
)
|
||||
.app_data(PayloadConfig::new(http_payload_size_limit))
|
||||
.app_data(
|
||||
web::QueryConfig::default()
|
||||
.error_handler(|err, _req| error::payload_error_handler(err).into()),
|
||||
web::QueryConfig::default().error_handler(|err, _req| PayloadError::from(err).into()),
|
||||
);
|
||||
}
|
||||
|
||||
@ -180,6 +195,7 @@ macro_rules! create_app {
|
||||
use actix_web::middleware::TrailingSlash;
|
||||
use actix_web::App;
|
||||
use actix_web::{middleware, web};
|
||||
use meilisearch_http::error::{MeilisearchHttpError, ResponseError};
|
||||
use meilisearch_http::routes;
|
||||
use meilisearch_http::{configure_auth, configure_data, dashboard};
|
||||
|
||||
|
@ -6,6 +6,7 @@ use log::debug;
|
||||
use meilisearch_lib::index_controller::{DocumentAdditionFormat, Update};
|
||||
use meilisearch_lib::milli::update::IndexDocumentsMethod;
|
||||
use meilisearch_lib::MeiliSearch;
|
||||
use once_cell::sync::Lazy;
|
||||
use serde::Deserialize;
|
||||
use serde_json::Value;
|
||||
use tokio::sync::mpsc;
|
||||
@ -176,14 +177,29 @@ async fn document_addition(
|
||||
body: Payload,
|
||||
method: IndexDocumentsMethod,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| {
|
||||
vec![
|
||||
"application/json".to_string(),
|
||||
"application/x-ndjson".to_string(),
|
||||
"application/csv".to_string(),
|
||||
]
|
||||
});
|
||||
let format = match content_type {
|
||||
Some("application/json") => DocumentAdditionFormat::Json,
|
||||
Some("application/x-ndjson") => DocumentAdditionFormat::Ndjson,
|
||||
Some("text/csv") => DocumentAdditionFormat::Csv,
|
||||
Some(other) => {
|
||||
return Err(MeilisearchHttpError::InvalidContentType(other.to_string()).into())
|
||||
return Err(MeilisearchHttpError::InvalidContentType(
|
||||
other.to_string(),
|
||||
ACCEPTED_CONTENT_TYPE.clone(),
|
||||
)
|
||||
.into())
|
||||
}
|
||||
None => {
|
||||
return Err(
|
||||
MeilisearchHttpError::MissingContentType(ACCEPTED_CONTENT_TYPE.clone()).into(),
|
||||
)
|
||||
}
|
||||
None => return Err(MeilisearchHttpError::MissingContentType.into()),
|
||||
};
|
||||
|
||||
let update = Update::DocumentAddition {
|
||||
|
111
meilisearch-http/tests/content_type.rs
Normal file
111
meilisearch-http/tests/content_type.rs
Normal file
@ -0,0 +1,111 @@
|
||||
#![allow(dead_code)]
|
||||
|
||||
mod common;
|
||||
|
||||
use crate::common::Server;
|
||||
use actix_web::test;
|
||||
use meilisearch_http::create_app;
|
||||
use serde_json::{json, Value};
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn strict_json_bad_content_type() {
|
||||
let routes = [
|
||||
// all the POST routes except the dumps that can be created without any body or content-type
|
||||
// and the search that is not a strict json
|
||||
"/indexes",
|
||||
"/indexes/doggo/documents/delete-batch",
|
||||
"/indexes/doggo/search",
|
||||
"/indexes/doggo/settings",
|
||||
"/indexes/doggo/settings/displayed-attributes",
|
||||
"/indexes/doggo/settings/distinct-attribute",
|
||||
"/indexes/doggo/settings/filterable-attributes",
|
||||
"/indexes/doggo/settings/ranking-rules",
|
||||
"/indexes/doggo/settings/searchable-attributes",
|
||||
"/indexes/doggo/settings/sortable-attributes",
|
||||
"/indexes/doggo/settings/stop-words",
|
||||
"/indexes/doggo/settings/synonyms",
|
||||
];
|
||||
let bad_content_types = [
|
||||
"application/csv",
|
||||
"application/x-ndjson",
|
||||
"application/x-www-form-urlencoded",
|
||||
"text/plain",
|
||||
"json",
|
||||
"application",
|
||||
"json/application",
|
||||
];
|
||||
|
||||
let document = "{}";
|
||||
let server = Server::new().await;
|
||||
let app = test::init_service(create_app!(
|
||||
&server.service.meilisearch,
|
||||
true,
|
||||
&server.service.options
|
||||
))
|
||||
.await;
|
||||
for route in routes {
|
||||
// Good content-type, we probably have an error since we didn't send anything in the json
|
||||
// so we only ensure we didn't get a bad media type error.
|
||||
let req = test::TestRequest::post()
|
||||
.uri(route)
|
||||
.set_payload(document)
|
||||
.insert_header(("content-type", "application/json"))
|
||||
.to_request();
|
||||
let res = test::call_service(&app, req).await;
|
||||
let status_code = res.status();
|
||||
assert_ne!(status_code, 415,
|
||||
"calling the route `{}` with a content-type of json isn't supposed to throw a bad media type error", route);
|
||||
|
||||
// No content-type.
|
||||
let req = test::TestRequest::post()
|
||||
.uri(route)
|
||||
.set_payload(document)
|
||||
.to_request();
|
||||
let res = test::call_service(&app, req).await;
|
||||
let status_code = res.status();
|
||||
let body = test::read_body(res).await;
|
||||
let response: Value = serde_json::from_slice(&body).unwrap_or_default();
|
||||
assert_eq!(status_code, 415, "calling the route `{}` without content-type is supposed to throw a bad media type error", route);
|
||||
assert_eq!(
|
||||
response,
|
||||
json!({
|
||||
"message": r#"A Content-Type header is missing. Accepted values for the Content-Type header are: "application/json""#,
|
||||
"errorCode": "missing_content_type",
|
||||
"errorType": "invalid_request_error",
|
||||
"errorLink": "https://docs.meilisearch.com/errors#missing_content_type",
|
||||
}),
|
||||
"when calling the route `{}` with no content-type",
|
||||
route,
|
||||
);
|
||||
|
||||
for bad_content_type in bad_content_types {
|
||||
// Always bad content-type
|
||||
let req = test::TestRequest::post()
|
||||
.uri(route)
|
||||
.set_payload(document.to_string())
|
||||
.insert_header(("content-type", bad_content_type))
|
||||
.to_request();
|
||||
let res = test::call_service(&app, req).await;
|
||||
let status_code = res.status();
|
||||
let body = test::read_body(res).await;
|
||||
let response: Value = serde_json::from_slice(&body).unwrap_or_default();
|
||||
assert_eq!(status_code, 415);
|
||||
let expected_error_message = format!(
|
||||
r#"The Content-Type "{}" is invalid. Accepted values for the Content-Type header are: "application/json""#,
|
||||
bad_content_type
|
||||
);
|
||||
assert_eq!(
|
||||
response,
|
||||
json!({
|
||||
"message": expected_error_message,
|
||||
"errorCode": "invalid_content_type",
|
||||
"errorType": "invalid_request_error",
|
||||
"errorLink": "https://docs.meilisearch.com/errors#invalid_content_type",
|
||||
}),
|
||||
"when calling the route `{}` with a content-type of `{}`",
|
||||
route,
|
||||
bad_content_type,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
@ -22,6 +22,7 @@ async fn add_documents_test_json_content_types() {
|
||||
&server.service.options
|
||||
))
|
||||
.await;
|
||||
// post
|
||||
let req = test::TestRequest::post()
|
||||
.uri("/indexes/dog/documents")
|
||||
.set_payload(document.to_string())
|
||||
@ -33,6 +34,19 @@ async fn add_documents_test_json_content_types() {
|
||||
let response: Value = serde_json::from_slice(&body).unwrap_or_default();
|
||||
assert_eq!(status_code, 202);
|
||||
assert_eq!(response, json!({ "updateId": 0 }));
|
||||
|
||||
// put
|
||||
let req = test::TestRequest::put()
|
||||
.uri("/indexes/dog/documents")
|
||||
.set_payload(document.to_string())
|
||||
.insert_header(("content-type", "application/json"))
|
||||
.to_request();
|
||||
let res = test::call_service(&app, req).await;
|
||||
let status_code = res.status();
|
||||
let body = test::read_body(res).await;
|
||||
let response: Value = serde_json::from_slice(&body).unwrap_or_default();
|
||||
assert_eq!(status_code, 202);
|
||||
assert_eq!(response, json!({ "updateId": 1 }));
|
||||
}
|
||||
|
||||
/// no content type is still supposed to be accepted as json
|
||||
@ -52,6 +66,7 @@ async fn add_documents_test_no_content_types() {
|
||||
&server.service.options
|
||||
))
|
||||
.await;
|
||||
// post
|
||||
let req = test::TestRequest::post()
|
||||
.uri("/indexes/dog/documents")
|
||||
.set_payload(document.to_string())
|
||||
@ -63,11 +78,23 @@ async fn add_documents_test_no_content_types() {
|
||||
let response: Value = serde_json::from_slice(&body).unwrap_or_default();
|
||||
assert_eq!(status_code, 202);
|
||||
assert_eq!(response, json!({ "updateId": 0 }));
|
||||
|
||||
// put
|
||||
let req = test::TestRequest::put()
|
||||
.uri("/indexes/dog/documents")
|
||||
.set_payload(document.to_string())
|
||||
.insert_header(("content-type", "application/json"))
|
||||
.to_request();
|
||||
let res = test::call_service(&app, req).await;
|
||||
let status_code = res.status();
|
||||
let body = test::read_body(res).await;
|
||||
let response: Value = serde_json::from_slice(&body).unwrap_or_default();
|
||||
assert_eq!(status_code, 202);
|
||||
assert_eq!(response, json!({ "updateId": 1 }));
|
||||
}
|
||||
|
||||
/// any other content-type is must be refused
|
||||
#[actix_rt::test]
|
||||
#[ignore]
|
||||
async fn add_documents_test_bad_content_types() {
|
||||
let document = json!([
|
||||
{
|
||||
@ -83,6 +110,7 @@ async fn add_documents_test_bad_content_types() {
|
||||
&server.service.options
|
||||
))
|
||||
.await;
|
||||
// post
|
||||
let req = test::TestRequest::post()
|
||||
.uri("/indexes/dog/documents")
|
||||
.set_payload(document.to_string())
|
||||
@ -91,8 +119,32 @@ async fn add_documents_test_bad_content_types() {
|
||||
let res = test::call_service(&app, req).await;
|
||||
let status_code = res.status();
|
||||
let body = test::read_body(res).await;
|
||||
assert_eq!(status_code, 405);
|
||||
assert!(body.is_empty());
|
||||
let response: Value = serde_json::from_slice(&body).unwrap_or_default();
|
||||
assert_eq!(status_code, 415);
|
||||
assert_eq!(
|
||||
response["message"],
|
||||
json!(
|
||||
r#"The Content-Type "text/plain" is invalid. Accepted values for the Content-Type header are: "application/json", "application/x-ndjson", "application/csv""#
|
||||
)
|
||||
);
|
||||
|
||||
// put
|
||||
let req = test::TestRequest::put()
|
||||
.uri("/indexes/dog/documents")
|
||||
.set_payload(document.to_string())
|
||||
.insert_header(("content-type", "text/plain"))
|
||||
.to_request();
|
||||
let res = test::call_service(&app, req).await;
|
||||
let status_code = res.status();
|
||||
let body = test::read_body(res).await;
|
||||
let response: Value = serde_json::from_slice(&body).unwrap_or_default();
|
||||
assert_eq!(status_code, 415);
|
||||
assert_eq!(
|
||||
response["message"],
|
||||
json!(
|
||||
r#"The Content-Type "text/plain" is invalid. Accepted values for the Content-Type header are: "application/json", "application/x-ndjson", "application/csv""#
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
|
@ -30,7 +30,7 @@ lazy_static = "1.4.0"
|
||||
log = "0.4.14"
|
||||
meilisearch-error = { path = "../meilisearch-error" }
|
||||
meilisearch-tokenizer = { git = "https://github.com/meilisearch/tokenizer.git", tag = "v0.2.5" }
|
||||
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.17.0"}
|
||||
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.17.2" }
|
||||
mime = "0.3.16"
|
||||
num_cpus = "1.13.0"
|
||||
once_cell = "1.8.0"
|
||||
@ -59,4 +59,5 @@ derivative = "2.2.0"
|
||||
|
||||
[dev-dependencies]
|
||||
actix-rt = "2.2.0"
|
||||
mockall = "0.10.2"
|
||||
paste = "1.0.5"
|
||||
|
@ -13,7 +13,7 @@ use crate::index::update_handler::UpdateHandler;
|
||||
use crate::index::updates::apply_settings_to_builder;
|
||||
|
||||
use super::error::Result;
|
||||
use super::{Index, Settings, Unchecked};
|
||||
use super::{index::Index, Settings, Unchecked};
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct DumpMeta {
|
||||
|
286
meilisearch-lib/src/index/index.rs
Normal file
286
meilisearch-lib/src/index/index.rs
Normal file
@ -0,0 +1,286 @@
|
||||
use std::collections::{BTreeSet, HashSet};
|
||||
use std::fs::create_dir_all;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::Deref;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use heed::{EnvOpenOptions, RoTxn};
|
||||
use milli::update::Setting;
|
||||
use milli::{obkv_to_json, FieldDistribution, FieldId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{Map, Value};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::index_controller::update_file_store::UpdateFileStore;
|
||||
use crate::EnvSizer;
|
||||
|
||||
use super::error::IndexError;
|
||||
use super::error::Result;
|
||||
use super::update_handler::UpdateHandler;
|
||||
use super::{Checked, Settings};
|
||||
|
||||
pub type Document = Map<String, Value>;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct IndexMeta {
|
||||
created_at: DateTime<Utc>,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
pub primary_key: Option<String>,
|
||||
}
|
||||
|
||||
impl IndexMeta {
|
||||
pub fn new(index: &Index) -> Result<Self> {
|
||||
let txn = index.read_txn()?;
|
||||
Self::new_txn(index, &txn)
|
||||
}
|
||||
|
||||
pub fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result<Self> {
|
||||
let created_at = index.created_at(txn)?;
|
||||
let updated_at = index.updated_at(txn)?;
|
||||
let primary_key = index.primary_key(txn)?.map(String::from);
|
||||
Ok(Self {
|
||||
created_at,
|
||||
updated_at,
|
||||
primary_key,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct IndexStats {
|
||||
#[serde(skip)]
|
||||
pub size: u64,
|
||||
pub number_of_documents: u64,
|
||||
/// Whether the current index is performing an update. It is initially `None` when the
|
||||
/// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is
|
||||
/// later set to either true or false, we we retrieve the information from the `UpdateStore`
|
||||
pub is_indexing: Option<bool>,
|
||||
pub field_distribution: FieldDistribution,
|
||||
}
|
||||
|
||||
#[derive(Clone, derivative::Derivative)]
|
||||
#[derivative(Debug)]
|
||||
pub struct Index {
|
||||
pub uuid: Uuid,
|
||||
#[derivative(Debug = "ignore")]
|
||||
pub inner: Arc<milli::Index>,
|
||||
#[derivative(Debug = "ignore")]
|
||||
pub update_file_store: Arc<UpdateFileStore>,
|
||||
#[derivative(Debug = "ignore")]
|
||||
pub update_handler: Arc<UpdateHandler>,
|
||||
}
|
||||
|
||||
impl Deref for Index {
|
||||
type Target = milli::Index;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.inner.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl Index {
|
||||
pub fn open(
|
||||
path: impl AsRef<Path>,
|
||||
size: usize,
|
||||
update_file_store: Arc<UpdateFileStore>,
|
||||
uuid: Uuid,
|
||||
update_handler: Arc<UpdateHandler>,
|
||||
) -> Result<Self> {
|
||||
create_dir_all(&path)?;
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(size);
|
||||
let inner = Arc::new(milli::Index::new(options, &path)?);
|
||||
Ok(Index {
|
||||
inner,
|
||||
update_file_store,
|
||||
uuid,
|
||||
update_handler,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn inner(&self) -> &milli::Index {
|
||||
&self.inner
|
||||
}
|
||||
|
||||
pub fn stats(&self) -> Result<IndexStats> {
|
||||
let rtxn = self.read_txn()?;
|
||||
|
||||
Ok(IndexStats {
|
||||
size: self.size(),
|
||||
number_of_documents: self.number_of_documents(&rtxn)?,
|
||||
is_indexing: None,
|
||||
field_distribution: self.field_distribution(&rtxn)?,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn meta(&self) -> Result<IndexMeta> {
|
||||
IndexMeta::new(self)
|
||||
}
|
||||
pub fn settings(&self) -> Result<Settings<Checked>> {
|
||||
let txn = self.read_txn()?;
|
||||
self.settings_txn(&txn)
|
||||
}
|
||||
|
||||
pub fn uuid(&self) -> Uuid {
|
||||
self.uuid
|
||||
}
|
||||
|
||||
pub fn settings_txn(&self, txn: &RoTxn) -> Result<Settings<Checked>> {
|
||||
let displayed_attributes = self
|
||||
.displayed_fields(txn)?
|
||||
.map(|fields| fields.into_iter().map(String::from).collect());
|
||||
|
||||
let searchable_attributes = self
|
||||
.searchable_fields(txn)?
|
||||
.map(|fields| fields.into_iter().map(String::from).collect());
|
||||
|
||||
let filterable_attributes = self.filterable_fields(txn)?.into_iter().collect();
|
||||
|
||||
let sortable_attributes = self.sortable_fields(txn)?.into_iter().collect();
|
||||
|
||||
let criteria = self
|
||||
.criteria(txn)?
|
||||
.into_iter()
|
||||
.map(|c| c.to_string())
|
||||
.collect();
|
||||
|
||||
let stop_words = self
|
||||
.stop_words(txn)?
|
||||
.map(|stop_words| -> Result<BTreeSet<_>> {
|
||||
Ok(stop_words.stream().into_strs()?.into_iter().collect())
|
||||
})
|
||||
.transpose()?
|
||||
.unwrap_or_else(BTreeSet::new);
|
||||
let distinct_field = self.distinct_field(txn)?.map(String::from);
|
||||
|
||||
// in milli each word in the synonyms map were split on their separator. Since we lost
|
||||
// this information we are going to put space between words.
|
||||
let synonyms = self
|
||||
.synonyms(txn)?
|
||||
.iter()
|
||||
.map(|(key, values)| {
|
||||
(
|
||||
key.join(" "),
|
||||
values.iter().map(|value| value.join(" ")).collect(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(Settings {
|
||||
displayed_attributes: match displayed_attributes {
|
||||
Some(attrs) => Setting::Set(attrs),
|
||||
None => Setting::Reset,
|
||||
},
|
||||
searchable_attributes: match searchable_attributes {
|
||||
Some(attrs) => Setting::Set(attrs),
|
||||
None => Setting::Reset,
|
||||
},
|
||||
filterable_attributes: Setting::Set(filterable_attributes),
|
||||
sortable_attributes: Setting::Set(sortable_attributes),
|
||||
ranking_rules: Setting::Set(criteria),
|
||||
stop_words: Setting::Set(stop_words),
|
||||
distinct_attribute: match distinct_field {
|
||||
Some(field) => Setting::Set(field),
|
||||
None => Setting::Reset,
|
||||
},
|
||||
synonyms: Setting::Set(synonyms),
|
||||
_kind: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn retrieve_documents<S: AsRef<str>>(
|
||||
&self,
|
||||
offset: usize,
|
||||
limit: usize,
|
||||
attributes_to_retrieve: Option<Vec<S>>,
|
||||
) -> Result<Vec<Map<String, Value>>> {
|
||||
let txn = self.read_txn()?;
|
||||
|
||||
let fields_ids_map = self.fields_ids_map(&txn)?;
|
||||
let fields_to_display =
|
||||
self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?;
|
||||
|
||||
let iter = self.documents.range(&txn, &(..))?.skip(offset).take(limit);
|
||||
|
||||
let mut documents = Vec::new();
|
||||
|
||||
for entry in iter {
|
||||
let (_id, obkv) = entry?;
|
||||
let object = obkv_to_json(&fields_to_display, &fields_ids_map, obkv)?;
|
||||
documents.push(object);
|
||||
}
|
||||
|
||||
Ok(documents)
|
||||
}
|
||||
|
||||
pub fn retrieve_document<S: AsRef<str>>(
|
||||
&self,
|
||||
doc_id: String,
|
||||
attributes_to_retrieve: Option<Vec<S>>,
|
||||
) -> Result<Map<String, Value>> {
|
||||
let txn = self.read_txn()?;
|
||||
|
||||
let fields_ids_map = self.fields_ids_map(&txn)?;
|
||||
|
||||
let fields_to_display =
|
||||
self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?;
|
||||
|
||||
let internal_id = self
|
||||
.external_documents_ids(&txn)?
|
||||
.get(doc_id.as_bytes())
|
||||
.ok_or_else(|| IndexError::DocumentNotFound(doc_id.clone()))?;
|
||||
|
||||
let document = self
|
||||
.documents(&txn, std::iter::once(internal_id))?
|
||||
.into_iter()
|
||||
.next()
|
||||
.map(|(_, d)| d)
|
||||
.ok_or(IndexError::DocumentNotFound(doc_id))?;
|
||||
|
||||
let document = obkv_to_json(&fields_to_display, &fields_ids_map, document)?;
|
||||
|
||||
Ok(document)
|
||||
}
|
||||
|
||||
pub fn size(&self) -> u64 {
|
||||
self.env.size()
|
||||
}
|
||||
|
||||
fn fields_to_display<S: AsRef<str>>(
|
||||
&self,
|
||||
txn: &heed::RoTxn,
|
||||
attributes_to_retrieve: &Option<Vec<S>>,
|
||||
fields_ids_map: &milli::FieldsIdsMap,
|
||||
) -> Result<Vec<FieldId>> {
|
||||
let mut displayed_fields_ids = match self.displayed_fields_ids(txn)? {
|
||||
Some(ids) => ids.into_iter().collect::<Vec<_>>(),
|
||||
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
|
||||
};
|
||||
|
||||
let attributes_to_retrieve_ids = match attributes_to_retrieve {
|
||||
Some(attrs) => attrs
|
||||
.iter()
|
||||
.filter_map(|f| fields_ids_map.id(f.as_ref()))
|
||||
.collect::<HashSet<_>>(),
|
||||
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
|
||||
};
|
||||
|
||||
displayed_fields_ids.retain(|fid| attributes_to_retrieve_ids.contains(fid));
|
||||
Ok(displayed_fields_ids)
|
||||
}
|
||||
|
||||
pub fn snapshot(&self, path: impl AsRef<Path>) -> Result<()> {
|
||||
let mut dst = path.as_ref().join(format!("indexes/{}/", self.uuid));
|
||||
create_dir_all(&dst)?;
|
||||
dst.push("data.mdb");
|
||||
let _txn = self.write_txn()?;
|
||||
self.inner
|
||||
.env
|
||||
.copy_to_path(dst, heed::CompactionOption::Enabled)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -1,97 +1,203 @@
|
||||
use std::collections::{BTreeSet, HashSet};
|
||||
use std::fs::create_dir_all;
|
||||
use std::marker::PhantomData;
|
||||
use std::ops::Deref;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use heed::{EnvOpenOptions, RoTxn};
|
||||
use milli::update::Setting;
|
||||
use milli::{obkv_to_json, FieldDistribution, FieldId};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{Map, Value};
|
||||
|
||||
use error::Result;
|
||||
pub use search::{default_crop_length, SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT};
|
||||
pub use updates::{apply_settings_to_builder, Checked, Facets, Settings, Unchecked};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::index_controller::update_file_store::UpdateFileStore;
|
||||
use crate::EnvSizer;
|
||||
|
||||
use self::error::IndexError;
|
||||
use self::update_handler::UpdateHandler;
|
||||
|
||||
pub mod error;
|
||||
pub mod update_handler;
|
||||
|
||||
mod dump;
|
||||
pub mod error;
|
||||
mod search;
|
||||
pub mod update_handler;
|
||||
mod updates;
|
||||
|
||||
pub type Document = Map<String, Value>;
|
||||
#[allow(clippy::module_inception)]
|
||||
mod index;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct IndexMeta {
|
||||
created_at: DateTime<Utc>,
|
||||
pub updated_at: DateTime<Utc>,
|
||||
pub primary_key: Option<String>,
|
||||
}
|
||||
pub use index::{Document, IndexMeta, IndexStats};
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct IndexStats {
|
||||
#[serde(skip)]
|
||||
pub size: u64,
|
||||
pub number_of_documents: u64,
|
||||
/// Whether the current index is performing an update. It is initially `None` when the
|
||||
/// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is
|
||||
/// later set to either true or false, we we retrieve the information from the `UpdateStore`
|
||||
pub is_indexing: Option<bool>,
|
||||
pub field_distribution: FieldDistribution,
|
||||
}
|
||||
#[cfg(not(test))]
|
||||
pub use index::Index;
|
||||
|
||||
impl IndexMeta {
|
||||
pub fn new(index: &Index) -> Result<Self> {
|
||||
let txn = index.read_txn()?;
|
||||
Self::new_txn(index, &txn)
|
||||
#[cfg(test)]
|
||||
pub use test::MockIndex as Index;
|
||||
|
||||
/// The index::test module provides means of mocking an index instance. I can be used throughout the
|
||||
/// code for unit testing, in places where an index would normally be used.
|
||||
#[cfg(test)]
|
||||
pub mod test {
|
||||
use std::any::Any;
|
||||
use std::collections::HashMap;
|
||||
use std::panic::{RefUnwindSafe, UnwindSafe};
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use serde_json::{Map, Value};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::index_controller::update_file_store::UpdateFileStore;
|
||||
use crate::index_controller::updates::status::{Failed, Processed, Processing};
|
||||
|
||||
use super::error::Result;
|
||||
use super::index::Index;
|
||||
use super::update_handler::UpdateHandler;
|
||||
use super::{Checked, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings};
|
||||
|
||||
pub struct Stub<A, R> {
|
||||
name: String,
|
||||
times: Mutex<Option<usize>>,
|
||||
stub: Box<dyn Fn(A) -> R + Sync + Send>,
|
||||
invalidated: AtomicBool,
|
||||
}
|
||||
|
||||
fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result<Self> {
|
||||
let created_at = index.created_at(txn)?;
|
||||
let updated_at = index.updated_at(txn)?;
|
||||
let primary_key = index.primary_key(txn)?.map(String::from);
|
||||
Ok(Self {
|
||||
created_at,
|
||||
updated_at,
|
||||
primary_key,
|
||||
})
|
||||
impl<A, R> Drop for Stub<A, R> {
|
||||
fn drop(&mut self) {
|
||||
if !self.invalidated.load(Ordering::Relaxed) {
|
||||
let lock = self.times.lock().unwrap();
|
||||
if let Some(n) = *lock {
|
||||
assert_eq!(n, 0, "{} not called enough times", self.name);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, derivative::Derivative)]
|
||||
#[derivative(Debug)]
|
||||
pub struct Index {
|
||||
pub uuid: Uuid,
|
||||
#[derivative(Debug = "ignore")]
|
||||
pub inner: Arc<milli::Index>,
|
||||
#[derivative(Debug = "ignore")]
|
||||
update_file_store: Arc<UpdateFileStore>,
|
||||
#[derivative(Debug = "ignore")]
|
||||
update_handler: Arc<UpdateHandler>,
|
||||
}
|
||||
|
||||
impl Deref for Index {
|
||||
type Target = milli::Index;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
self.inner.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
impl Index {
|
||||
impl<A, R> Stub<A, R> {
|
||||
fn invalidate(&self) {
|
||||
self.invalidated.store(true, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
impl<A: UnwindSafe, R> Stub<A, R> {
|
||||
fn call(&self, args: A) -> R {
|
||||
let mut lock = self.times.lock().unwrap();
|
||||
match *lock {
|
||||
Some(0) => panic!("{} called to many times", self.name),
|
||||
Some(ref mut times) => {
|
||||
*times -= 1;
|
||||
}
|
||||
None => (),
|
||||
}
|
||||
|
||||
// Since we add assertions in the drop implementation for Stub, a panic can occur in a
|
||||
// panic, causing a hard abort of the program. To handle that, we catch the panic, and
|
||||
// set the stub as invalidated so the assertions aren't run during the drop.
|
||||
impl<'a, A, R> RefUnwindSafe for StubHolder<'a, A, R> {}
|
||||
struct StubHolder<'a, A, R>(&'a (dyn Fn(A) -> R + Sync + Send));
|
||||
|
||||
let stub = StubHolder(self.stub.as_ref());
|
||||
|
||||
match std::panic::catch_unwind(|| (stub.0)(args)) {
|
||||
Ok(r) => r,
|
||||
Err(panic) => {
|
||||
self.invalidate();
|
||||
std::panic::resume_unwind(panic);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct StubStore {
|
||||
inner: Arc<Mutex<HashMap<String, Box<dyn Any + Sync + Send>>>>,
|
||||
}
|
||||
|
||||
impl StubStore {
|
||||
pub fn insert<A: 'static, R: 'static>(&self, name: String, stub: Stub<A, R>) {
|
||||
let mut lock = self.inner.lock().unwrap();
|
||||
lock.insert(name, Box::new(stub));
|
||||
}
|
||||
|
||||
pub fn get<A, B>(&self, name: &str) -> Option<&Stub<A, B>> {
|
||||
let mut lock = self.inner.lock().unwrap();
|
||||
match lock.get_mut(name) {
|
||||
Some(s) => {
|
||||
let s = s.as_mut() as *mut dyn Any as *mut Stub<A, B>;
|
||||
Some(unsafe { &mut *s })
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct StubBuilder<'a, A, R> {
|
||||
name: String,
|
||||
store: &'a StubStore,
|
||||
times: Option<usize>,
|
||||
_f: std::marker::PhantomData<fn(A) -> R>,
|
||||
}
|
||||
|
||||
impl<'a, A: 'static, R: 'static> StubBuilder<'a, A, R> {
|
||||
/// Asserts the stub has been called exactly `times` times.
|
||||
#[must_use]
|
||||
pub fn times(mut self, times: usize) -> Self {
|
||||
self.times = Some(times);
|
||||
self
|
||||
}
|
||||
|
||||
/// Asserts the stub has been called exactly once.
|
||||
#[must_use]
|
||||
pub fn once(mut self) -> Self {
|
||||
self.times = Some(1);
|
||||
self
|
||||
}
|
||||
|
||||
/// The function that will be called when the stub is called. This needs to be called to
|
||||
/// actually build the stub and register it to the stub store.
|
||||
pub fn then(self, f: impl Fn(A) -> R + Sync + Send + 'static) {
|
||||
let times = Mutex::new(self.times);
|
||||
let stub = Stub {
|
||||
stub: Box::new(f),
|
||||
times,
|
||||
name: self.name.clone(),
|
||||
invalidated: AtomicBool::new(false),
|
||||
};
|
||||
|
||||
self.store.insert(self.name, stub);
|
||||
}
|
||||
}
|
||||
|
||||
/// Mocker allows to stub metod call on any struct. you can register stubs by calling
|
||||
/// `Mocker::when` and retrieve it in the proxy implementation when with `Mocker::get`.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct Mocker {
|
||||
store: StubStore,
|
||||
}
|
||||
|
||||
impl Mocker {
|
||||
pub fn when<A, R>(&self, name: &str) -> StubBuilder<A, R> {
|
||||
StubBuilder {
|
||||
name: name.to_string(),
|
||||
store: &self.store,
|
||||
times: None,
|
||||
_f: std::marker::PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get<A, R>(&self, name: &str) -> &Stub<A, R> {
|
||||
match self.store.get(name) {
|
||||
Some(stub) => stub,
|
||||
None => {
|
||||
// panic here causes the stubs to get dropped, and panic in turn. To prevent
|
||||
// that, we forget them, and let them be cleaned by the os later. This is not
|
||||
// optimal, but is still better than nested panicks.
|
||||
let mut stubs = self.store.inner.lock().unwrap();
|
||||
let stubs = std::mem::take(&mut *stubs);
|
||||
std::mem::forget(stubs);
|
||||
panic!("unexpected call to {}", name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum MockIndex {
|
||||
Vrai(Index),
|
||||
Faux(Arc<Mocker>),
|
||||
}
|
||||
|
||||
impl MockIndex {
|
||||
pub fn faux(faux: Mocker) -> Self {
|
||||
Self::Faux(Arc::new(faux))
|
||||
}
|
||||
|
||||
pub fn open(
|
||||
path: impl AsRef<Path>,
|
||||
size: usize,
|
||||
@ -99,98 +205,52 @@ impl Index {
|
||||
uuid: Uuid,
|
||||
update_handler: Arc<UpdateHandler>,
|
||||
) -> Result<Self> {
|
||||
create_dir_all(&path)?;
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(size);
|
||||
let inner = Arc::new(milli::Index::new(options, &path)?);
|
||||
Ok(Index {
|
||||
inner,
|
||||
update_file_store,
|
||||
uuid,
|
||||
update_handler,
|
||||
})
|
||||
let index = Index::open(path, size, update_file_store, uuid, update_handler)?;
|
||||
Ok(Self::Vrai(index))
|
||||
}
|
||||
|
||||
pub fn load_dump(
|
||||
src: impl AsRef<Path>,
|
||||
dst: impl AsRef<Path>,
|
||||
size: usize,
|
||||
update_handler: &UpdateHandler,
|
||||
) -> anyhow::Result<()> {
|
||||
Index::load_dump(src, dst, size, update_handler)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn handle_update(&self, update: Processing) -> std::result::Result<Processed, Failed> {
|
||||
match self {
|
||||
MockIndex::Vrai(index) => index.handle_update(update),
|
||||
MockIndex::Faux(faux) => faux.get("handle_update").call(update),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn uuid(&self) -> Uuid {
|
||||
match self {
|
||||
MockIndex::Vrai(index) => index.uuid(),
|
||||
MockIndex::Faux(faux) => faux.get("uuid").call(()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stats(&self) -> Result<IndexStats> {
|
||||
let rtxn = self.read_txn()?;
|
||||
|
||||
Ok(IndexStats {
|
||||
size: self.size(),
|
||||
number_of_documents: self.number_of_documents(&rtxn)?,
|
||||
is_indexing: None,
|
||||
field_distribution: self.field_distribution(&rtxn)?,
|
||||
})
|
||||
match self {
|
||||
MockIndex::Vrai(index) => index.stats(),
|
||||
MockIndex::Faux(_) => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn meta(&self) -> Result<IndexMeta> {
|
||||
IndexMeta::new(self)
|
||||
match self {
|
||||
MockIndex::Vrai(index) => index.meta(),
|
||||
MockIndex::Faux(_) => todo!(),
|
||||
}
|
||||
}
|
||||
pub fn settings(&self) -> Result<Settings<Checked>> {
|
||||
let txn = self.read_txn()?;
|
||||
self.settings_txn(&txn)
|
||||
match self {
|
||||
MockIndex::Vrai(index) => index.settings(),
|
||||
MockIndex::Faux(_) => todo!(),
|
||||
}
|
||||
|
||||
pub fn settings_txn(&self, txn: &RoTxn) -> Result<Settings<Checked>> {
|
||||
let displayed_attributes = self
|
||||
.displayed_fields(txn)?
|
||||
.map(|fields| fields.into_iter().map(String::from).collect());
|
||||
|
||||
let searchable_attributes = self
|
||||
.searchable_fields(txn)?
|
||||
.map(|fields| fields.into_iter().map(String::from).collect());
|
||||
|
||||
let filterable_attributes = self.filterable_fields(txn)?.into_iter().collect();
|
||||
|
||||
let sortable_attributes = self.sortable_fields(txn)?.into_iter().collect();
|
||||
|
||||
let criteria = self
|
||||
.criteria(txn)?
|
||||
.into_iter()
|
||||
.map(|c| c.to_string())
|
||||
.collect();
|
||||
|
||||
let stop_words = self
|
||||
.stop_words(txn)?
|
||||
.map(|stop_words| -> Result<BTreeSet<_>> {
|
||||
Ok(stop_words.stream().into_strs()?.into_iter().collect())
|
||||
})
|
||||
.transpose()?
|
||||
.unwrap_or_else(BTreeSet::new);
|
||||
let distinct_field = self.distinct_field(txn)?.map(String::from);
|
||||
|
||||
// in milli each word in the synonyms map were split on their separator. Since we lost
|
||||
// this information we are going to put space between words.
|
||||
let synonyms = self
|
||||
.synonyms(txn)?
|
||||
.iter()
|
||||
.map(|(key, values)| {
|
||||
(
|
||||
key.join(" "),
|
||||
values.iter().map(|value| value.join(" ")).collect(),
|
||||
)
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(Settings {
|
||||
displayed_attributes: match displayed_attributes {
|
||||
Some(attrs) => Setting::Set(attrs),
|
||||
None => Setting::Reset,
|
||||
},
|
||||
searchable_attributes: match searchable_attributes {
|
||||
Some(attrs) => Setting::Set(attrs),
|
||||
None => Setting::Reset,
|
||||
},
|
||||
filterable_attributes: Setting::Set(filterable_attributes),
|
||||
sortable_attributes: Setting::Set(sortable_attributes),
|
||||
ranking_rules: Setting::Set(criteria),
|
||||
stop_words: Setting::Set(stop_words),
|
||||
distinct_attribute: match distinct_field {
|
||||
Some(field) => Setting::Set(field),
|
||||
None => Setting::Reset,
|
||||
},
|
||||
synonyms: Setting::Set(synonyms),
|
||||
_kind: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn retrieve_documents<S: AsRef<str>>(
|
||||
@ -199,23 +259,12 @@ impl Index {
|
||||
limit: usize,
|
||||
attributes_to_retrieve: Option<Vec<S>>,
|
||||
) -> Result<Vec<Map<String, Value>>> {
|
||||
let txn = self.read_txn()?;
|
||||
|
||||
let fields_ids_map = self.fields_ids_map(&txn)?;
|
||||
let fields_to_display =
|
||||
self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?;
|
||||
|
||||
let iter = self.documents.range(&txn, &(..))?.skip(offset).take(limit);
|
||||
|
||||
let mut documents = Vec::new();
|
||||
|
||||
for entry in iter {
|
||||
let (_id, obkv) = entry?;
|
||||
let object = obkv_to_json(&fields_to_display, &fields_ids_map, obkv)?;
|
||||
documents.push(object);
|
||||
match self {
|
||||
MockIndex::Vrai(index) => {
|
||||
index.retrieve_documents(offset, limit, attributes_to_retrieve)
|
||||
}
|
||||
MockIndex::Faux(_) => todo!(),
|
||||
}
|
||||
|
||||
Ok(documents)
|
||||
}
|
||||
|
||||
pub fn retrieve_document<S: AsRef<str>>(
|
||||
@ -223,65 +272,94 @@ impl Index {
|
||||
doc_id: String,
|
||||
attributes_to_retrieve: Option<Vec<S>>,
|
||||
) -> Result<Map<String, Value>> {
|
||||
let txn = self.read_txn()?;
|
||||
|
||||
let fields_ids_map = self.fields_ids_map(&txn)?;
|
||||
|
||||
let fields_to_display =
|
||||
self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?;
|
||||
|
||||
let internal_id = self
|
||||
.external_documents_ids(&txn)?
|
||||
.get(doc_id.as_bytes())
|
||||
.ok_or_else(|| IndexError::DocumentNotFound(doc_id.clone()))?;
|
||||
|
||||
let document = self
|
||||
.documents(&txn, std::iter::once(internal_id))?
|
||||
.into_iter()
|
||||
.next()
|
||||
.map(|(_, d)| d)
|
||||
.ok_or(IndexError::DocumentNotFound(doc_id))?;
|
||||
|
||||
let document = obkv_to_json(&fields_to_display, &fields_ids_map, document)?;
|
||||
|
||||
Ok(document)
|
||||
match self {
|
||||
MockIndex::Vrai(index) => index.retrieve_document(doc_id, attributes_to_retrieve),
|
||||
MockIndex::Faux(_) => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn size(&self) -> u64 {
|
||||
self.env.size()
|
||||
match self {
|
||||
MockIndex::Vrai(index) => index.size(),
|
||||
MockIndex::Faux(_) => todo!(),
|
||||
}
|
||||
|
||||
fn fields_to_display<S: AsRef<str>>(
|
||||
&self,
|
||||
txn: &heed::RoTxn,
|
||||
attributes_to_retrieve: &Option<Vec<S>>,
|
||||
fields_ids_map: &milli::FieldsIdsMap,
|
||||
) -> Result<Vec<FieldId>> {
|
||||
let mut displayed_fields_ids = match self.displayed_fields_ids(txn)? {
|
||||
Some(ids) => ids.into_iter().collect::<Vec<_>>(),
|
||||
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
|
||||
};
|
||||
|
||||
let attributes_to_retrieve_ids = match attributes_to_retrieve {
|
||||
Some(attrs) => attrs
|
||||
.iter()
|
||||
.filter_map(|f| fields_ids_map.id(f.as_ref()))
|
||||
.collect::<HashSet<_>>(),
|
||||
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
|
||||
};
|
||||
|
||||
displayed_fields_ids.retain(|fid| attributes_to_retrieve_ids.contains(fid));
|
||||
Ok(displayed_fields_ids)
|
||||
}
|
||||
|
||||
pub fn snapshot(&self, path: impl AsRef<Path>) -> Result<()> {
|
||||
let mut dst = path.as_ref().join(format!("indexes/{}/", self.uuid));
|
||||
create_dir_all(&dst)?;
|
||||
dst.push("data.mdb");
|
||||
let _txn = self.write_txn()?;
|
||||
self.inner
|
||||
.env
|
||||
.copy_to_path(dst, heed::CompactionOption::Enabled)?;
|
||||
Ok(())
|
||||
match self {
|
||||
MockIndex::Vrai(index) => index.snapshot(path),
|
||||
MockIndex::Faux(faux) => faux.get("snapshot").call(path.as_ref()),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn inner(&self) -> &milli::Index {
|
||||
match self {
|
||||
MockIndex::Vrai(index) => index.inner(),
|
||||
MockIndex::Faux(_) => todo!(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_primary_key(&self, primary_key: Option<String>) -> Result<IndexMeta> {
|
||||
match self {
|
||||
MockIndex::Vrai(index) => index.update_primary_key(primary_key),
|
||||
MockIndex::Faux(_) => todo!(),
|
||||
}
|
||||
}
|
||||
pub fn perform_search(&self, query: SearchQuery) -> Result<SearchResult> {
|
||||
match self {
|
||||
MockIndex::Vrai(index) => index.perform_search(query),
|
||||
MockIndex::Faux(faux) => faux.get("perform_search").call(query),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
|
||||
match self {
|
||||
MockIndex::Vrai(index) => index.dump(path),
|
||||
MockIndex::Faux(faux) => faux.get("dump").call(path.as_ref()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_faux_index() {
|
||||
let faux = Mocker::default();
|
||||
faux.when("snapshot")
|
||||
.times(2)
|
||||
.then(|_: &Path| -> Result<()> { Ok(()) });
|
||||
|
||||
let index = MockIndex::faux(faux);
|
||||
|
||||
let path = PathBuf::from("hello");
|
||||
index.snapshot(&path).unwrap();
|
||||
index.snapshot(&path).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn test_faux_unexisting_method_stub() {
|
||||
let faux = Mocker::default();
|
||||
|
||||
let index = MockIndex::faux(faux);
|
||||
|
||||
let path = PathBuf::from("hello");
|
||||
index.snapshot(&path).unwrap();
|
||||
index.snapshot(&path).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[should_panic]
|
||||
fn test_faux_panic() {
|
||||
let faux = Mocker::default();
|
||||
faux.when("snapshot")
|
||||
.times(2)
|
||||
.then(|_: &Path| -> Result<()> {
|
||||
panic!();
|
||||
});
|
||||
|
||||
let index = MockIndex::faux(faux);
|
||||
|
||||
let path = PathBuf::from("hello");
|
||||
index.snapshot(&path).unwrap();
|
||||
index.snapshot(&path).unwrap();
|
||||
}
|
||||
}
|
||||
|
@ -12,15 +12,14 @@ use serde::{Deserialize, Serialize};
|
||||
use serde_json::{json, Value};
|
||||
|
||||
use crate::index::error::FacetError;
|
||||
use crate::index::IndexError;
|
||||
|
||||
use super::error::Result;
|
||||
use super::Index;
|
||||
use super::error::{IndexError, Result};
|
||||
use super::index::Index;
|
||||
|
||||
pub type Document = IndexMap<String, Value>;
|
||||
type MatchesInfo = BTreeMap<String, Vec<MatchInfo>>;
|
||||
|
||||
#[derive(Serialize, Debug, Clone)]
|
||||
#[derive(Serialize, Debug, Clone, PartialEq)]
|
||||
pub struct MatchInfo {
|
||||
start: usize,
|
||||
length: usize,
|
||||
@ -36,7 +35,7 @@ pub const fn default_crop_length() -> usize {
|
||||
DEFAULT_CROP_LENGTH
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug)]
|
||||
#[derive(Deserialize, Debug, Clone, PartialEq)]
|
||||
#[serde(rename_all = "camelCase", deny_unknown_fields)]
|
||||
pub struct SearchQuery {
|
||||
pub q: Option<String>,
|
||||
@ -56,7 +55,7 @@ pub struct SearchQuery {
|
||||
pub facets_distribution: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[derive(Debug, Clone, Serialize, PartialEq)]
|
||||
pub struct SearchHit {
|
||||
#[serde(flatten)]
|
||||
pub document: Document,
|
||||
@ -66,7 +65,7 @@ pub struct SearchHit {
|
||||
pub matches_info: Option<MatchesInfo>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Debug)]
|
||||
#[derive(Serialize, Debug, Clone, PartialEq)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct SearchResult {
|
||||
pub hits: Vec<SearchHit>,
|
||||
|
@ -12,7 +12,7 @@ use crate::index_controller::updates::status::{Failed, Processed, Processing, Up
|
||||
use crate::Update;
|
||||
|
||||
use super::error::{IndexError, Result};
|
||||
use super::{Index, IndexMeta};
|
||||
use super::index::{Index, IndexMeta};
|
||||
|
||||
fn serialize_with_wildcard<S>(
|
||||
field: &Setting<Vec<String>>,
|
||||
|
@ -10,14 +10,16 @@ use tokio::sync::{mpsc, oneshot, RwLock};
|
||||
|
||||
use super::error::{DumpActorError, Result};
|
||||
use super::{DumpInfo, DumpMsg, DumpStatus, DumpTask};
|
||||
use crate::index_controller::index_resolver::HardStateIndexResolver;
|
||||
use crate::index_controller::index_resolver::index_store::IndexStore;
|
||||
use crate::index_controller::index_resolver::uuid_store::UuidStore;
|
||||
use crate::index_controller::index_resolver::IndexResolver;
|
||||
use crate::index_controller::updates::UpdateSender;
|
||||
|
||||
pub const CONCURRENT_DUMP_MSG: usize = 10;
|
||||
|
||||
pub struct DumpActor {
|
||||
pub struct DumpActor<U, I> {
|
||||
inbox: Option<mpsc::Receiver<DumpMsg>>,
|
||||
index_resolver: Arc<HardStateIndexResolver>,
|
||||
index_resolver: Arc<IndexResolver<U, I>>,
|
||||
update: UpdateSender,
|
||||
dump_path: PathBuf,
|
||||
lock: Arc<Mutex<()>>,
|
||||
@ -31,10 +33,14 @@ fn generate_uid() -> String {
|
||||
Utc::now().format("%Y%m%d-%H%M%S%3f").to_string()
|
||||
}
|
||||
|
||||
impl DumpActor {
|
||||
impl<U, I> DumpActor<U, I>
|
||||
where
|
||||
U: UuidStore + Sync + Send + 'static,
|
||||
I: IndexStore + Sync + Send + 'static,
|
||||
{
|
||||
pub fn new(
|
||||
inbox: mpsc::Receiver<DumpMsg>,
|
||||
index_resolver: Arc<HardStateIndexResolver>,
|
||||
index_resolver: Arc<IndexResolver<U, I>>,
|
||||
update: UpdateSender,
|
||||
dump_path: impl AsRef<Path>,
|
||||
index_db_size: usize,
|
||||
@ -114,7 +120,7 @@ impl DumpActor {
|
||||
let task = DumpTask {
|
||||
path: self.dump_path.clone(),
|
||||
index_resolver: self.index_resolver.clone(),
|
||||
update_handle: self.update.clone(),
|
||||
update_sender: self.update.clone(),
|
||||
uid: uid.clone(),
|
||||
update_db_size: self.update_db_size,
|
||||
index_db_size: self.index_db_size,
|
||||
|
@ -4,7 +4,6 @@ use std::path::{Path, PathBuf};
|
||||
|
||||
use serde_json::{Deserializer, Value};
|
||||
use tempfile::NamedTempFile;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::index_controller::dump_actor::loaders::compat::{asc_ranking_rule, desc_ranking_rule};
|
||||
use crate::index_controller::dump_actor::Metadata;
|
||||
@ -200,7 +199,7 @@ impl From<compat::Enqueued> for Enqueued {
|
||||
method,
|
||||
// Just ignore if the uuid is no present. If it is needed later, an error will
|
||||
// be thrown.
|
||||
content_uuid: content.unwrap_or_else(Uuid::default),
|
||||
content_uuid: content.unwrap_or_default(),
|
||||
}
|
||||
}
|
||||
compat::UpdateMeta::ClearDocuments => Update::ClearDocuments,
|
||||
|
@ -13,7 +13,9 @@ pub use actor::DumpActor;
|
||||
pub use handle_impl::*;
|
||||
pub use message::DumpMsg;
|
||||
|
||||
use super::index_resolver::HardStateIndexResolver;
|
||||
use super::index_resolver::index_store::IndexStore;
|
||||
use super::index_resolver::uuid_store::UuidStore;
|
||||
use super::index_resolver::IndexResolver;
|
||||
use super::updates::UpdateSender;
|
||||
use crate::compression::{from_tar_gz, to_tar_gz};
|
||||
use crate::index_controller::dump_actor::error::DumpActorError;
|
||||
@ -51,6 +53,7 @@ impl Metadata {
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[cfg_attr(test, mockall::automock)]
|
||||
pub trait DumpActorHandle {
|
||||
/// Start the creation of a dump
|
||||
/// Implementation: [handle_impl::DumpActorHandleImpl::create_dump]
|
||||
@ -218,16 +221,20 @@ pub fn load_dump(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
struct DumpTask {
|
||||
struct DumpTask<U, I> {
|
||||
path: PathBuf,
|
||||
index_resolver: Arc<HardStateIndexResolver>,
|
||||
update_handle: UpdateSender,
|
||||
index_resolver: Arc<IndexResolver<U, I>>,
|
||||
update_sender: UpdateSender,
|
||||
uid: String,
|
||||
update_db_size: usize,
|
||||
index_db_size: usize,
|
||||
}
|
||||
|
||||
impl DumpTask {
|
||||
impl<U, I> DumpTask<U, I>
|
||||
where
|
||||
U: UuidStore + Sync + Send + 'static,
|
||||
I: IndexStore + Sync + Send + 'static,
|
||||
{
|
||||
async fn run(self) -> Result<()> {
|
||||
trace!("Performing dump.");
|
||||
|
||||
@ -243,7 +250,7 @@ impl DumpTask {
|
||||
|
||||
let uuids = self.index_resolver.dump(temp_dump_path.clone()).await?;
|
||||
|
||||
UpdateMsg::dump(&self.update_handle, uuids, temp_dump_path.clone()).await?;
|
||||
UpdateMsg::dump(&self.update_sender, uuids, temp_dump_path.clone()).await?;
|
||||
|
||||
let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> {
|
||||
let temp_dump_file = tempfile::NamedTempFile::new()?;
|
||||
@ -262,3 +269,110 @@ impl DumpTask {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::collections::HashSet;
|
||||
|
||||
use futures::future::{err, ok};
|
||||
use once_cell::sync::Lazy;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::*;
|
||||
use crate::index::error::Result as IndexResult;
|
||||
use crate::index::test::Mocker;
|
||||
use crate::index::Index;
|
||||
use crate::index_controller::index_resolver::error::IndexResolverError;
|
||||
use crate::index_controller::index_resolver::index_store::MockIndexStore;
|
||||
use crate::index_controller::index_resolver::uuid_store::MockUuidStore;
|
||||
use crate::index_controller::updates::create_update_handler;
|
||||
|
||||
fn setup() {
|
||||
static SETUP: Lazy<()> = Lazy::new(|| {
|
||||
if cfg!(windows) {
|
||||
std::env::set_var("TMP", ".");
|
||||
} else {
|
||||
std::env::set_var("TMPDIR", ".");
|
||||
}
|
||||
});
|
||||
|
||||
// just deref to make sure the env is setup
|
||||
*SETUP
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_dump_normal() {
|
||||
setup();
|
||||
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
|
||||
let uuids = std::iter::repeat_with(Uuid::new_v4)
|
||||
.take(4)
|
||||
.collect::<HashSet<_>>();
|
||||
let mut uuid_store = MockUuidStore::new();
|
||||
let uuids_cloned = uuids.clone();
|
||||
uuid_store
|
||||
.expect_dump()
|
||||
.once()
|
||||
.returning(move |_| Box::pin(ok(uuids_cloned.clone())));
|
||||
|
||||
let mut index_store = MockIndexStore::new();
|
||||
index_store.expect_get().times(4).returning(move |uuid| {
|
||||
let mocker = Mocker::default();
|
||||
let uuids_clone = uuids.clone();
|
||||
mocker.when::<(), Uuid>("uuid").once().then(move |_| {
|
||||
assert!(uuids_clone.contains(&uuid));
|
||||
uuid
|
||||
});
|
||||
mocker
|
||||
.when::<&Path, IndexResult<()>>("dump")
|
||||
.once()
|
||||
.then(move |_| Ok(()));
|
||||
Box::pin(ok(Some(Index::faux(mocker))))
|
||||
});
|
||||
|
||||
let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
|
||||
|
||||
let update_sender =
|
||||
create_update_handler(index_resolver.clone(), tmp.path(), 4096 * 100).unwrap();
|
||||
|
||||
let task = DumpTask {
|
||||
path: tmp.path().to_owned(),
|
||||
index_resolver,
|
||||
update_sender,
|
||||
uid: String::from("test"),
|
||||
update_db_size: 4096 * 10,
|
||||
index_db_size: 4096 * 10,
|
||||
};
|
||||
|
||||
task.run().await.unwrap();
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn error_performing_dump() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
|
||||
let mut uuid_store = MockUuidStore::new();
|
||||
uuid_store
|
||||
.expect_dump()
|
||||
.once()
|
||||
.returning(move |_| Box::pin(err(IndexResolverError::ExistingPrimaryKey)));
|
||||
|
||||
let index_store = MockIndexStore::new();
|
||||
let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
|
||||
|
||||
let update_sender =
|
||||
create_update_handler(index_resolver.clone(), tmp.path(), 4096 * 100).unwrap();
|
||||
|
||||
let task = DumpTask {
|
||||
path: tmp.path().to_owned(),
|
||||
index_resolver,
|
||||
update_sender,
|
||||
uid: String::from("test"),
|
||||
update_db_size: 4096 * 10,
|
||||
index_db_size: 4096 * 10,
|
||||
};
|
||||
|
||||
assert!(task.run().await.is_err());
|
||||
}
|
||||
}
|
||||
|
@ -17,6 +17,7 @@ use crate::options::IndexerOpts;
|
||||
type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[cfg_attr(test, mockall::automock)]
|
||||
pub trait IndexStore {
|
||||
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index>;
|
||||
async fn get(&self, uuid: Uuid) -> Result<Option<Index>>;
|
||||
@ -72,9 +73,10 @@ impl IndexStore for MapIndexStore {
|
||||
let index = spawn_blocking(move || -> Result<Index> {
|
||||
let index = Index::open(path, index_size, file_store, uuid, update_handler)?;
|
||||
if let Some(primary_key) = primary_key {
|
||||
let mut txn = index.write_txn()?;
|
||||
let inner = index.inner();
|
||||
let mut txn = inner.write_txn()?;
|
||||
|
||||
let mut builder = UpdateBuilder::new(0).settings(&mut txn, &index);
|
||||
let mut builder = UpdateBuilder::new(0).settings(&mut txn, index.inner());
|
||||
builder.set_primary_key(primary_key);
|
||||
builder.execute(|_, _| ())?;
|
||||
|
||||
|
@ -1,11 +1,12 @@
|
||||
pub mod error;
|
||||
mod index_store;
|
||||
pub mod index_store;
|
||||
pub mod uuid_store;
|
||||
|
||||
use std::path::Path;
|
||||
|
||||
use error::{IndexResolverError, Result};
|
||||
use index_store::{IndexStore, MapIndexStore};
|
||||
use log::error;
|
||||
use uuid::Uuid;
|
||||
use uuid_store::{HeedUuidStore, UuidStore};
|
||||
|
||||
@ -98,8 +99,19 @@ where
|
||||
}
|
||||
let uuid = Uuid::new_v4();
|
||||
let index = self.index_store.create(uuid, primary_key).await?;
|
||||
self.index_uuid_store.insert(uid, uuid).await?;
|
||||
Ok(index)
|
||||
match self.index_uuid_store.insert(uid, uuid).await {
|
||||
Err(e) => {
|
||||
match self.index_store.delete(uuid).await {
|
||||
Ok(Some(index)) => {
|
||||
index.inner().clone().prepare_for_closing();
|
||||
}
|
||||
Ok(None) => (),
|
||||
Err(e) => error!("Error while deleting index: {:?}", e),
|
||||
}
|
||||
Err(e)
|
||||
}
|
||||
Ok(()) => Ok(index),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn list(&self) -> Result<Vec<(String, Index)>> {
|
||||
@ -121,7 +133,13 @@ where
|
||||
pub async fn delete_index(&self, uid: String) -> Result<Uuid> {
|
||||
match self.index_uuid_store.delete(uid.clone()).await? {
|
||||
Some(uuid) => {
|
||||
let _ = self.index_store.delete(uuid).await;
|
||||
match self.index_store.delete(uuid).await {
|
||||
Ok(Some(index)) => {
|
||||
index.inner().clone().prepare_for_closing();
|
||||
}
|
||||
Ok(None) => (),
|
||||
Err(e) => error!("Error while deleting index: {:?}", e),
|
||||
}
|
||||
Ok(uuid)
|
||||
}
|
||||
None => Err(IndexResolverError::UnexistingIndex(uid)),
|
||||
|
@ -22,6 +22,7 @@ struct DumpEntry {
|
||||
const UUIDS_DB_PATH: &str = "index_uuids";
|
||||
|
||||
#[async_trait::async_trait]
|
||||
#[cfg_attr(test, mockall::automock)]
|
||||
pub trait UuidStore: Sized {
|
||||
// Create a new entry for `name`. Return an error if `err` and the entry already exists, return
|
||||
// the uuid otherwise.
|
||||
|
@ -30,7 +30,9 @@ use error::Result;
|
||||
|
||||
use self::dump_actor::load_dump;
|
||||
use self::index_resolver::error::IndexResolverError;
|
||||
use self::index_resolver::HardStateIndexResolver;
|
||||
use self::index_resolver::index_store::{IndexStore, MapIndexStore};
|
||||
use self::index_resolver::uuid_store::{HeedUuidStore, UuidStore};
|
||||
use self::index_resolver::IndexResolver;
|
||||
use self::updates::status::UpdateStatus;
|
||||
use self::updates::UpdateMsg;
|
||||
|
||||
@ -41,6 +43,10 @@ mod snapshot;
|
||||
pub mod update_file_store;
|
||||
pub mod updates;
|
||||
|
||||
/// Concrete implementation of the IndexController, exposed by meilisearch-lib
|
||||
pub type MeiliSearch =
|
||||
IndexController<HeedUuidStore, MapIndexStore, dump_actor::DumpActorHandleImpl>;
|
||||
|
||||
pub type Payload = Box<
|
||||
dyn Stream<Item = std::result::Result<Bytes, PayloadError>> + Send + Sync + 'static + Unpin,
|
||||
>;
|
||||
@ -62,13 +68,6 @@ pub struct IndexSettings {
|
||||
pub primary_key: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct IndexController {
|
||||
index_resolver: Arc<HardStateIndexResolver>,
|
||||
update_sender: updates::UpdateSender,
|
||||
dump_handle: dump_actor::DumpActorHandleImpl,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum DocumentAdditionFormat {
|
||||
Json,
|
||||
@ -129,7 +128,7 @@ impl IndexControllerBuilder {
|
||||
self,
|
||||
db_path: impl AsRef<Path>,
|
||||
indexer_options: IndexerOpts,
|
||||
) -> anyhow::Result<IndexController> {
|
||||
) -> anyhow::Result<MeiliSearch> {
|
||||
let index_size = self
|
||||
.max_index_size
|
||||
.ok_or_else(|| anyhow::anyhow!("Missing index size"))?;
|
||||
@ -178,6 +177,8 @@ impl IndexControllerBuilder {
|
||||
update_store_size,
|
||||
)?;
|
||||
|
||||
let dump_handle = Arc::new(dump_handle);
|
||||
|
||||
if self.schedule_snapshot {
|
||||
let snapshot_service = SnapshotService::new(
|
||||
index_resolver.clone(),
|
||||
@ -266,7 +267,22 @@ impl IndexControllerBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
impl IndexController {
|
||||
// We are using derivative here to derive Clone, because U, I and D do not necessarily implement
|
||||
// Clone themselves.
|
||||
#[derive(derivative::Derivative)]
|
||||
#[derivative(Clone(bound = ""))]
|
||||
pub struct IndexController<U, I, D> {
|
||||
index_resolver: Arc<IndexResolver<U, I>>,
|
||||
update_sender: updates::UpdateSender,
|
||||
dump_handle: Arc<D>,
|
||||
}
|
||||
|
||||
impl<U, I, D> IndexController<U, I, D>
|
||||
where
|
||||
U: UuidStore + Sync + Send + 'static,
|
||||
I: IndexStore + Sync + Send + 'static,
|
||||
D: DumpActorHandle + Send + Sync,
|
||||
{
|
||||
pub fn builder() -> IndexControllerBuilder {
|
||||
IndexControllerBuilder::default()
|
||||
}
|
||||
@ -286,7 +302,7 @@ impl IndexController {
|
||||
if create_index {
|
||||
let index = self.index_resolver.create_index(name, None).await?;
|
||||
let update_result =
|
||||
UpdateMsg::update(&self.update_sender, index.uuid, update).await?;
|
||||
UpdateMsg::update(&self.update_sender, index.uuid(), update).await?;
|
||||
Ok(update_result)
|
||||
} else {
|
||||
Err(IndexResolverError::UnexistingIndex(name).into())
|
||||
@ -314,7 +330,7 @@ impl IndexController {
|
||||
for (uid, index) in indexes {
|
||||
let meta = index.meta()?;
|
||||
let meta = IndexMetadata {
|
||||
uuid: index.uuid,
|
||||
uuid: index.uuid(),
|
||||
name: uid.clone(),
|
||||
uid,
|
||||
meta,
|
||||
@ -366,7 +382,7 @@ impl IndexController {
|
||||
index_settings.uid.take();
|
||||
|
||||
let index = self.index_resolver.get_index(uid.clone()).await?;
|
||||
let uuid = index.uuid;
|
||||
let uuid = index.uuid();
|
||||
let meta =
|
||||
spawn_blocking(move || index.update_primary_key(index_settings.primary_key)).await??;
|
||||
let meta = IndexMetadata {
|
||||
@ -386,7 +402,7 @@ impl IndexController {
|
||||
|
||||
pub async fn get_index(&self, uid: String) -> Result<IndexMetadata> {
|
||||
let index = self.index_resolver.get_index(uid.clone()).await?;
|
||||
let uuid = index.uuid;
|
||||
let uuid = index.uuid();
|
||||
let meta = spawn_blocking(move || index.meta()).await??;
|
||||
let meta = IndexMetadata {
|
||||
uuid,
|
||||
@ -400,7 +416,7 @@ impl IndexController {
|
||||
pub async fn get_index_stats(&self, uid: String) -> Result<IndexStats> {
|
||||
let update_infos = UpdateMsg::get_info(&self.update_sender).await?;
|
||||
let index = self.index_resolver.get_index(uid).await?;
|
||||
let uuid = index.uuid;
|
||||
let uuid = index.uuid();
|
||||
let mut stats = spawn_blocking(move || index.stats()).await??;
|
||||
// Check if the currently indexing update is from our index.
|
||||
stats.is_indexing = Some(Some(uuid) == update_infos.processing);
|
||||
@ -414,7 +430,7 @@ impl IndexController {
|
||||
let mut indexes = BTreeMap::new();
|
||||
|
||||
for (index_uid, index) in self.index_resolver.list().await? {
|
||||
let uuid = index.uuid;
|
||||
let uuid = index.uuid();
|
||||
let (mut stats, meta) = spawn_blocking::<_, IndexResult<_>>(move || {
|
||||
let stats = index.stats()?;
|
||||
let meta = index.meta()?;
|
||||
@ -461,7 +477,7 @@ impl IndexController {
|
||||
let meta = spawn_blocking(move || -> IndexResult<_> {
|
||||
let meta = index.meta()?;
|
||||
let meta = IndexMetadata {
|
||||
uuid: index.uuid,
|
||||
uuid: index.uuid(),
|
||||
uid: uid.clone(),
|
||||
name: uid,
|
||||
meta,
|
||||
@ -497,3 +513,103 @@ pub async fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use futures::future::ok;
|
||||
use mockall::predicate::eq;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use crate::index::error::Result as IndexResult;
|
||||
use crate::index::test::Mocker;
|
||||
use crate::index::Index;
|
||||
use crate::index_controller::dump_actor::MockDumpActorHandle;
|
||||
use crate::index_controller::index_resolver::index_store::MockIndexStore;
|
||||
use crate::index_controller::index_resolver::uuid_store::MockUuidStore;
|
||||
|
||||
use super::updates::UpdateSender;
|
||||
use super::*;
|
||||
|
||||
impl<D: DumpActorHandle> IndexController<MockUuidStore, MockIndexStore, D> {
|
||||
pub fn mock(
|
||||
index_resolver: IndexResolver<MockUuidStore, MockIndexStore>,
|
||||
update_sender: UpdateSender,
|
||||
dump_handle: D,
|
||||
) -> Self {
|
||||
IndexController {
|
||||
index_resolver: Arc::new(index_resolver),
|
||||
update_sender,
|
||||
dump_handle: Arc::new(dump_handle),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn test_search_simple() {
|
||||
let index_uid = "test";
|
||||
let index_uuid = Uuid::new_v4();
|
||||
let query = SearchQuery {
|
||||
q: Some(String::from("hello world")),
|
||||
offset: Some(10),
|
||||
limit: 0,
|
||||
attributes_to_retrieve: Some(vec!["string".to_owned()].into_iter().collect()),
|
||||
attributes_to_crop: None,
|
||||
crop_length: 18,
|
||||
attributes_to_highlight: None,
|
||||
matches: true,
|
||||
filter: None,
|
||||
sort: None,
|
||||
facets_distribution: None,
|
||||
};
|
||||
|
||||
let result = SearchResult {
|
||||
hits: vec![],
|
||||
nb_hits: 29,
|
||||
exhaustive_nb_hits: true,
|
||||
query: "hello world".to_string(),
|
||||
limit: 24,
|
||||
offset: 0,
|
||||
processing_time_ms: 50,
|
||||
facets_distribution: None,
|
||||
exhaustive_facets_count: Some(true),
|
||||
};
|
||||
|
||||
let mut uuid_store = MockUuidStore::new();
|
||||
uuid_store
|
||||
.expect_get_uuid()
|
||||
.with(eq(index_uid.to_owned()))
|
||||
.returning(move |s| Box::pin(ok((s, Some(index_uuid)))));
|
||||
|
||||
let mut index_store = MockIndexStore::new();
|
||||
let result_clone = result.clone();
|
||||
let query_clone = query.clone();
|
||||
index_store
|
||||
.expect_get()
|
||||
.with(eq(index_uuid))
|
||||
.returning(move |_uuid| {
|
||||
let result = result_clone.clone();
|
||||
let query = query_clone.clone();
|
||||
let mocker = Mocker::default();
|
||||
mocker
|
||||
.when::<SearchQuery, IndexResult<SearchResult>>("perform_search")
|
||||
.once()
|
||||
.then(move |q| {
|
||||
assert_eq!(&q, &query);
|
||||
Ok(result.clone())
|
||||
});
|
||||
let index = Index::faux(mocker);
|
||||
Box::pin(ok(Some(index)))
|
||||
});
|
||||
|
||||
let index_resolver = IndexResolver::new(uuid_store, index_store);
|
||||
let (update_sender, _) = mpsc::channel(1);
|
||||
let dump_actor = MockDumpActorHandle::new();
|
||||
let index_controller = IndexController::mock(index_resolver, update_sender, dump_actor);
|
||||
|
||||
let r = index_controller
|
||||
.search(index_uid.to_owned(), query.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(r, result);
|
||||
}
|
||||
}
|
||||
|
@ -11,20 +11,26 @@ use tokio::time::sleep;
|
||||
use crate::compression::from_tar_gz;
|
||||
use crate::index_controller::updates::UpdateMsg;
|
||||
|
||||
use super::index_resolver::HardStateIndexResolver;
|
||||
use super::index_resolver::index_store::IndexStore;
|
||||
use super::index_resolver::uuid_store::UuidStore;
|
||||
use super::index_resolver::IndexResolver;
|
||||
use super::updates::UpdateSender;
|
||||
|
||||
pub struct SnapshotService {
|
||||
index_resolver: Arc<HardStateIndexResolver>,
|
||||
pub struct SnapshotService<U, I> {
|
||||
index_resolver: Arc<IndexResolver<U, I>>,
|
||||
update_sender: UpdateSender,
|
||||
snapshot_period: Duration,
|
||||
snapshot_path: PathBuf,
|
||||
db_name: String,
|
||||
}
|
||||
|
||||
impl SnapshotService {
|
||||
impl<U, I> SnapshotService<U, I>
|
||||
where
|
||||
U: UuidStore + Sync + Send + 'static,
|
||||
I: IndexStore + Sync + Send + 'static,
|
||||
{
|
||||
pub fn new(
|
||||
index_resolver: Arc<HardStateIndexResolver>,
|
||||
index_resolver: Arc<IndexResolver<U, I>>,
|
||||
update_sender: UpdateSender,
|
||||
snapshot_period: Duration,
|
||||
snapshot_path: PathBuf,
|
||||
@ -125,133 +131,169 @@ pub fn load_snapshot(
|
||||
}
|
||||
}
|
||||
|
||||
//#[cfg(test)]
|
||||
//mod test {
|
||||
//use std::iter::FromIterator;
|
||||
//use std::{collections::HashSet, sync::Arc};
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::{collections::HashSet, sync::Arc};
|
||||
|
||||
//use futures::future::{err, ok};
|
||||
//use rand::Rng;
|
||||
//use tokio::time::timeout;
|
||||
//use uuid::Uuid;
|
||||
use futures::future::{err, ok};
|
||||
use once_cell::sync::Lazy;
|
||||
use rand::Rng;
|
||||
use uuid::Uuid;
|
||||
|
||||
//use super::*;
|
||||
use crate::index::error::IndexError;
|
||||
use crate::index::test::Mocker;
|
||||
use crate::index::{error::Result as IndexResult, Index};
|
||||
use crate::index_controller::index_resolver::error::IndexResolverError;
|
||||
use crate::index_controller::index_resolver::index_store::MockIndexStore;
|
||||
use crate::index_controller::index_resolver::uuid_store::MockUuidStore;
|
||||
use crate::index_controller::index_resolver::IndexResolver;
|
||||
use crate::index_controller::updates::create_update_handler;
|
||||
|
||||
//#[actix_rt::test]
|
||||
//async fn test_normal() {
|
||||
//let mut rng = rand::thread_rng();
|
||||
//let uuids_num: usize = rng.gen_range(5..10);
|
||||
//let uuids = (0..uuids_num)
|
||||
//.map(|_| Uuid::new_v4())
|
||||
//.collect::<HashSet<_>>();
|
||||
use super::*;
|
||||
|
||||
//let mut uuid_resolver = MockUuidResolverHandle::new();
|
||||
//let uuids_clone = uuids.clone();
|
||||
//uuid_resolver
|
||||
//.expect_snapshot()
|
||||
//.times(1)
|
||||
//.returning(move |_| Box::pin(ok(uuids_clone.clone())));
|
||||
fn setup() {
|
||||
static SETUP: Lazy<()> = Lazy::new(|| {
|
||||
if cfg!(windows) {
|
||||
std::env::set_var("TMP", ".");
|
||||
} else {
|
||||
std::env::set_var("TMPDIR", ".");
|
||||
}
|
||||
});
|
||||
|
||||
//let uuids_clone = uuids.clone();
|
||||
//let mut index_handle = MockIndexActorHandle::new();
|
||||
//index_handle
|
||||
//.expect_snapshot()
|
||||
//.withf(move |uuid, _path| uuids_clone.contains(uuid))
|
||||
//.times(uuids_num)
|
||||
//.returning(move |_, _| Box::pin(ok(())));
|
||||
// just deref to make sure the env is setup
|
||||
*SETUP
|
||||
}
|
||||
|
||||
//let dir = tempfile::tempdir_in(".").unwrap();
|
||||
//let handle = Arc::new(index_handle);
|
||||
//let update_handle =
|
||||
//UpdateActorHandleImpl::<Vec<u8>>::new(handle.clone(), dir.path(), 4096 * 100).unwrap();
|
||||
#[actix_rt::test]
|
||||
async fn test_normal() {
|
||||
setup();
|
||||
|
||||
//let snapshot_path = tempfile::tempdir_in(".").unwrap();
|
||||
//let snapshot_service = SnapshotService::new(
|
||||
//uuid_resolver,
|
||||
//update_handle,
|
||||
//Duration::from_millis(100),
|
||||
//snapshot_path.path().to_owned(),
|
||||
//"data.ms".to_string(),
|
||||
//);
|
||||
let mut rng = rand::thread_rng();
|
||||
let uuids_num: usize = rng.gen_range(5..10);
|
||||
let uuids = (0..uuids_num)
|
||||
.map(|_| Uuid::new_v4())
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
//snapshot_service.perform_snapshot().await.unwrap();
|
||||
//}
|
||||
let mut uuid_store = MockUuidStore::new();
|
||||
let uuids_clone = uuids.clone();
|
||||
uuid_store
|
||||
.expect_snapshot()
|
||||
.times(1)
|
||||
.returning(move |_| Box::pin(ok(uuids_clone.clone())));
|
||||
|
||||
//#[actix_rt::test]
|
||||
//async fn error_performing_uuid_snapshot() {
|
||||
//let mut uuid_resolver = MockUuidResolverHandle::new();
|
||||
//uuid_resolver
|
||||
//.expect_snapshot()
|
||||
//.times(1)
|
||||
////abitrary error
|
||||
//.returning(|_| Box::pin(err(UuidResolverError::NameAlreadyExist)));
|
||||
let mut indexes = uuids.clone().into_iter().map(|uuid| {
|
||||
let mocker = Mocker::default();
|
||||
mocker
|
||||
.when("snapshot")
|
||||
.times(1)
|
||||
.then(|_: &Path| -> IndexResult<()> { Ok(()) });
|
||||
mocker.when("uuid").then(move |_: ()| uuid);
|
||||
Index::faux(mocker)
|
||||
});
|
||||
|
||||
//let update_handle = MockUpdateActorHandle::new();
|
||||
let uuids_clone = uuids.clone();
|
||||
let mut index_store = MockIndexStore::new();
|
||||
index_store
|
||||
.expect_get()
|
||||
.withf(move |uuid| uuids_clone.contains(uuid))
|
||||
.times(uuids_num)
|
||||
.returning(move |_| Box::pin(ok(Some(indexes.next().unwrap()))));
|
||||
|
||||
//let snapshot_path = tempfile::tempdir_in(".").unwrap();
|
||||
//let snapshot_service = SnapshotService::new(
|
||||
//uuid_resolver,
|
||||
//update_handle,
|
||||
//Duration::from_millis(100),
|
||||
//snapshot_path.path().to_owned(),
|
||||
//"data.ms".to_string(),
|
||||
//);
|
||||
let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
|
||||
|
||||
//assert!(snapshot_service.perform_snapshot().await.is_err());
|
||||
////Nothing was written to the file
|
||||
//assert!(!snapshot_path.path().join("data.ms.snapshot").exists());
|
||||
//}
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let update_sender =
|
||||
create_update_handler(index_resolver.clone(), dir.path(), 4096 * 100).unwrap();
|
||||
|
||||
//#[actix_rt::test]
|
||||
//async fn error_performing_index_snapshot() {
|
||||
//let uuid = Uuid::new_v4();
|
||||
//let mut uuid_resolver = MockUuidResolverHandle::new();
|
||||
//uuid_resolver
|
||||
//.expect_snapshot()
|
||||
//.times(1)
|
||||
//.returning(move |_| Box::pin(ok(HashSet::from_iter(Some(uuid)))));
|
||||
let snapshot_path = tempfile::tempdir().unwrap();
|
||||
let snapshot_service = SnapshotService::new(
|
||||
index_resolver,
|
||||
update_sender,
|
||||
Duration::from_millis(100),
|
||||
snapshot_path.path().to_owned(),
|
||||
"data.ms".to_string(),
|
||||
);
|
||||
|
||||
//let mut update_handle = MockUpdateActorHandle::new();
|
||||
//update_handle
|
||||
//.expect_snapshot()
|
||||
////abitrary error
|
||||
//.returning(|_, _| Box::pin(err(UpdateActorError::UnexistingUpdate(0))));
|
||||
snapshot_service.perform_snapshot().await.unwrap();
|
||||
}
|
||||
|
||||
//let snapshot_path = tempfile::tempdir_in(".").unwrap();
|
||||
//let snapshot_service = SnapshotService::new(
|
||||
//uuid_resolver,
|
||||
//update_handle,
|
||||
//Duration::from_millis(100),
|
||||
//snapshot_path.path().to_owned(),
|
||||
//"data.ms".to_string(),
|
||||
//);
|
||||
#[actix_rt::test]
|
||||
async fn error_performing_uuid_snapshot() {
|
||||
setup();
|
||||
|
||||
//assert!(snapshot_service.perform_snapshot().await.is_err());
|
||||
////Nothing was written to the file
|
||||
//assert!(!snapshot_path.path().join("data.ms.snapshot").exists());
|
||||
//}
|
||||
let mut uuid_store = MockUuidStore::new();
|
||||
uuid_store
|
||||
.expect_snapshot()
|
||||
.once()
|
||||
.returning(move |_| Box::pin(err(IndexResolverError::IndexAlreadyExists)));
|
||||
|
||||
//#[actix_rt::test]
|
||||
//async fn test_loop() {
|
||||
//let mut uuid_resolver = MockUuidResolverHandle::new();
|
||||
//uuid_resolver
|
||||
//.expect_snapshot()
|
||||
////we expect the funtion to be called between 2 and 3 time in the given interval.
|
||||
//.times(2..4)
|
||||
////abitrary error, to short-circuit the function
|
||||
//.returning(move |_| Box::pin(err(UuidResolverError::NameAlreadyExist)));
|
||||
let mut index_store = MockIndexStore::new();
|
||||
index_store.expect_get().never();
|
||||
|
||||
//let update_handle = MockUpdateActorHandle::new();
|
||||
let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
|
||||
|
||||
//let snapshot_path = tempfile::tempdir_in(".").unwrap();
|
||||
//let snapshot_service = SnapshotService::new(
|
||||
//uuid_resolver,
|
||||
//update_handle,
|
||||
//Duration::from_millis(100),
|
||||
//snapshot_path.path().to_owned(),
|
||||
//"data.ms".to_string(),
|
||||
//);
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let update_sender =
|
||||
create_update_handler(index_resolver.clone(), dir.path(), 4096 * 100).unwrap();
|
||||
|
||||
//let _ = timeout(Duration::from_millis(300), snapshot_service.run()).await;
|
||||
//}
|
||||
//}
|
||||
let snapshot_path = tempfile::tempdir().unwrap();
|
||||
let snapshot_service = SnapshotService::new(
|
||||
index_resolver,
|
||||
update_sender,
|
||||
Duration::from_millis(100),
|
||||
snapshot_path.path().to_owned(),
|
||||
"data.ms".to_string(),
|
||||
);
|
||||
|
||||
assert!(snapshot_service.perform_snapshot().await.is_err());
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
async fn error_performing_index_snapshot() {
|
||||
setup();
|
||||
|
||||
let uuids: HashSet<Uuid> = vec![Uuid::new_v4()].into_iter().collect();
|
||||
|
||||
let mut uuid_store = MockUuidStore::new();
|
||||
let uuids_clone = uuids.clone();
|
||||
uuid_store
|
||||
.expect_snapshot()
|
||||
.once()
|
||||
.returning(move |_| Box::pin(ok(uuids_clone.clone())));
|
||||
|
||||
let mut indexes = uuids.clone().into_iter().map(|uuid| {
|
||||
let mocker = Mocker::default();
|
||||
// index returns random error
|
||||
mocker
|
||||
.when("snapshot")
|
||||
.then(|_: &Path| -> IndexResult<()> { Err(IndexError::ExistingPrimaryKey) });
|
||||
mocker.when("uuid").then(move |_: ()| uuid);
|
||||
Index::faux(mocker)
|
||||
});
|
||||
|
||||
let uuids_clone = uuids.clone();
|
||||
let mut index_store = MockIndexStore::new();
|
||||
index_store
|
||||
.expect_get()
|
||||
.withf(move |uuid| uuids_clone.contains(uuid))
|
||||
.once()
|
||||
.returning(move |_| Box::pin(ok(Some(indexes.next().unwrap()))));
|
||||
|
||||
let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
|
||||
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let update_sender =
|
||||
create_update_handler(index_resolver.clone(), dir.path(), 4096 * 100).unwrap();
|
||||
|
||||
let snapshot_path = tempfile::tempdir().unwrap();
|
||||
let snapshot_service = SnapshotService::new(
|
||||
index_resolver,
|
||||
update_sender,
|
||||
Duration::from_millis(100),
|
||||
snapshot_path.path().to_owned(),
|
||||
"data.ms".to_string(),
|
||||
);
|
||||
|
||||
assert!(snapshot_service.perform_snapshot().await.is_err());
|
||||
}
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ use meilisearch_error::{Code, ErrorCode};
|
||||
|
||||
use crate::{
|
||||
document_formats::DocumentFormatError,
|
||||
index::error::IndexError,
|
||||
index_controller::{update_file_store::UpdateFileStoreError, DocumentAdditionFormat},
|
||||
};
|
||||
|
||||
@ -28,6 +29,8 @@ pub enum UpdateLoopError {
|
||||
PayloadError(#[from] actix_web::error::PayloadError),
|
||||
#[error("A {0} payload is missing.")]
|
||||
MissingPayload(DocumentAdditionFormat),
|
||||
#[error("{0}")]
|
||||
IndexError(#[from] IndexError),
|
||||
}
|
||||
|
||||
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for UpdateLoopError
|
||||
@ -58,7 +61,6 @@ impl ErrorCode for UpdateLoopError {
|
||||
match self {
|
||||
Self::UnexistingUpdate(_) => Code::NotFound,
|
||||
Self::Internal(_) => Code::Internal,
|
||||
//Self::IndexActor(e) => e.error_code(),
|
||||
Self::FatalUpdateStoreError => Code::Internal,
|
||||
Self::DocumentFormatError(error) => error.error_code(),
|
||||
Self::PayloadError(error) => match error {
|
||||
@ -66,6 +68,7 @@ impl ErrorCode for UpdateLoopError {
|
||||
_ => Code::Internal,
|
||||
},
|
||||
Self::MissingPayload(_) => Code::MissingPayload,
|
||||
Self::IndexError(e) => e.error_code(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -26,16 +26,22 @@ use crate::index::{Index, Settings, Unchecked};
|
||||
use crate::index_controller::update_file_store::UpdateFileStore;
|
||||
use status::UpdateStatus;
|
||||
|
||||
use super::index_resolver::HardStateIndexResolver;
|
||||
use super::index_resolver::index_store::IndexStore;
|
||||
use super::index_resolver::uuid_store::UuidStore;
|
||||
use super::index_resolver::IndexResolver;
|
||||
use super::{DocumentAdditionFormat, Update};
|
||||
|
||||
pub type UpdateSender = mpsc::Sender<UpdateMsg>;
|
||||
|
||||
pub fn create_update_handler(
|
||||
index_resolver: Arc<HardStateIndexResolver>,
|
||||
pub fn create_update_handler<U, I>(
|
||||
index_resolver: Arc<IndexResolver<U, I>>,
|
||||
db_path: impl AsRef<Path>,
|
||||
update_store_size: usize,
|
||||
) -> anyhow::Result<UpdateSender> {
|
||||
) -> anyhow::Result<UpdateSender>
|
||||
where
|
||||
U: UuidStore + Sync + Send + 'static,
|
||||
I: IndexStore + Sync + Send + 'static,
|
||||
{
|
||||
let path = db_path.as_ref().to_owned();
|
||||
let (sender, receiver) = mpsc::channel(100);
|
||||
let actor = UpdateLoop::new(update_store_size, receiver, path, index_resolver)?;
|
||||
@ -95,12 +101,16 @@ pub struct UpdateLoop {
|
||||
}
|
||||
|
||||
impl UpdateLoop {
|
||||
pub fn new(
|
||||
pub fn new<U, I>(
|
||||
update_db_size: usize,
|
||||
inbox: mpsc::Receiver<UpdateMsg>,
|
||||
path: impl AsRef<Path>,
|
||||
index_resolver: Arc<HardStateIndexResolver>,
|
||||
) -> anyhow::Result<Self> {
|
||||
index_resolver: Arc<IndexResolver<U, I>>,
|
||||
) -> anyhow::Result<Self>
|
||||
where
|
||||
U: UuidStore + Sync + Send + 'static,
|
||||
I: IndexStore + Sync + Send + 'static,
|
||||
{
|
||||
let path = path.as_ref().to_owned();
|
||||
std::fs::create_dir_all(&path)?;
|
||||
|
||||
|
@ -34,7 +34,7 @@ impl UpdateStore {
|
||||
// txn must *always* be acquired after state lock, or it will dead lock.
|
||||
let txn = self.env.write_txn()?;
|
||||
|
||||
let uuids = indexes.iter().map(|i| i.uuid).collect();
|
||||
let uuids = indexes.iter().map(|i| i.uuid()).collect();
|
||||
|
||||
self.dump_updates(&txn, &uuids, &path)?;
|
||||
|
||||
|
@ -29,6 +29,8 @@ use codec::*;
|
||||
use super::error::Result;
|
||||
use super::status::{Enqueued, Processing};
|
||||
use crate::index::Index;
|
||||
use crate::index_controller::index_resolver::index_store::IndexStore;
|
||||
use crate::index_controller::index_resolver::uuid_store::UuidStore;
|
||||
use crate::index_controller::updates::*;
|
||||
use crate::EnvSizer;
|
||||
|
||||
@ -157,13 +159,17 @@ impl UpdateStore {
|
||||
))
|
||||
}
|
||||
|
||||
pub fn open(
|
||||
pub fn open<U, I>(
|
||||
options: EnvOpenOptions,
|
||||
path: impl AsRef<Path>,
|
||||
index_resolver: Arc<HardStateIndexResolver>,
|
||||
index_resolver: Arc<IndexResolver<U, I>>,
|
||||
must_exit: Arc<AtomicBool>,
|
||||
update_file_store: UpdateFileStore,
|
||||
) -> anyhow::Result<Arc<Self>> {
|
||||
) -> anyhow::Result<Arc<Self>>
|
||||
where
|
||||
U: UuidStore + Sync + Send + 'static,
|
||||
I: IndexStore + Sync + Send + 'static,
|
||||
{
|
||||
let (update_store, mut notification_receiver) =
|
||||
Self::new(options, path, update_file_store)?;
|
||||
let update_store = Arc::new(update_store);
|
||||
@ -296,10 +302,14 @@ impl UpdateStore {
|
||||
/// Executes the user provided function on the next pending update (the one with the lowest id).
|
||||
/// This is asynchronous as it let the user process the update with a read-only txn and
|
||||
/// only writing the result meta to the processed-meta store *after* it has been processed.
|
||||
fn process_pending_update(
|
||||
fn process_pending_update<U, I>(
|
||||
&self,
|
||||
index_resolver: Arc<HardStateIndexResolver>,
|
||||
) -> Result<Option<()>> {
|
||||
index_resolver: Arc<IndexResolver<U, I>>,
|
||||
) -> Result<Option<()>>
|
||||
where
|
||||
U: UuidStore + Sync + Send + 'static,
|
||||
I: IndexStore + Sync + Send + 'static,
|
||||
{
|
||||
// Create a read transaction to be able to retrieve the pending update in order.
|
||||
let rtxn = self.env.read_txn()?;
|
||||
let first_meta = self.pending_queue.first(&rtxn)?;
|
||||
@ -325,13 +335,17 @@ impl UpdateStore {
|
||||
}
|
||||
}
|
||||
|
||||
fn perform_update(
|
||||
fn perform_update<U, I>(
|
||||
&self,
|
||||
processing: Processing,
|
||||
index_resolver: Arc<HardStateIndexResolver>,
|
||||
index_resolver: Arc<IndexResolver<U, I>>,
|
||||
index_uuid: Uuid,
|
||||
global_id: u64,
|
||||
) -> Result<Option<()>> {
|
||||
) -> Result<Option<()>>
|
||||
where
|
||||
U: UuidStore + Sync + Send + 'static,
|
||||
I: IndexStore + Sync + Send + 'static,
|
||||
{
|
||||
// Process the pending update using the provided user function.
|
||||
let handle = Handle::current();
|
||||
let update_id = processing.id();
|
||||
@ -509,7 +523,7 @@ impl UpdateStore {
|
||||
|
||||
let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
|
||||
|
||||
let uuids: HashSet<_> = indexes.iter().map(|i| i.uuid).collect();
|
||||
let uuids: HashSet<_> = indexes.iter().map(|i| i.uuid()).collect();
|
||||
for entry in pendings {
|
||||
let ((_, uuid, _), pending) = entry?;
|
||||
if uuids.contains(&uuid) {
|
||||
@ -518,9 +532,7 @@ impl UpdateStore {
|
||||
..
|
||||
} = pending.decode()?
|
||||
{
|
||||
self.update_file_store
|
||||
.snapshot(content_uuid, &path)
|
||||
.unwrap();
|
||||
self.update_file_store.snapshot(content_uuid, &path)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -528,8 +540,7 @@ impl UpdateStore {
|
||||
let path = path.as_ref().to_owned();
|
||||
indexes
|
||||
.par_iter()
|
||||
.try_for_each(|index| index.snapshot(path.clone()))
|
||||
.unwrap();
|
||||
.try_for_each(|index| index.snapshot(&path))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@ -557,149 +568,217 @@ impl UpdateStore {
|
||||
}
|
||||
}
|
||||
|
||||
//#[cfg(test)]
|
||||
//mod test {
|
||||
//use super::*;
|
||||
//use crate::index_controller::{
|
||||
//index_actor::{error::IndexActorError, MockIndexActorHandle},
|
||||
//UpdateResult,
|
||||
//};
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use futures::future::ok;
|
||||
use mockall::predicate::eq;
|
||||
|
||||
//use futures::future::ok;
|
||||
use crate::index::error::IndexError;
|
||||
use crate::index::test::Mocker;
|
||||
use crate::index_controller::index_resolver::index_store::MockIndexStore;
|
||||
use crate::index_controller::index_resolver::uuid_store::MockUuidStore;
|
||||
use crate::index_controller::updates::status::{Failed, Processed};
|
||||
|
||||
//#[actix_rt::test]
|
||||
//async fn test_next_id() {
|
||||
//let dir = tempfile::tempdir_in(".").unwrap();
|
||||
//let mut options = EnvOpenOptions::new();
|
||||
//let handle = Arc::new(MockIndexActorHandle::new());
|
||||
//options.map_size(4096 * 100);
|
||||
//let update_store = UpdateStore::open(
|
||||
//options,
|
||||
//dir.path(),
|
||||
//handle,
|
||||
//Arc::new(AtomicBool::new(false)),
|
||||
//)
|
||||
//.unwrap();
|
||||
use super::*;
|
||||
|
||||
//let index1_uuid = Uuid::new_v4();
|
||||
//let index2_uuid = Uuid::new_v4();
|
||||
#[actix_rt::test]
|
||||
async fn test_next_id() {
|
||||
let dir = tempfile::tempdir_in(".").unwrap();
|
||||
let mut options = EnvOpenOptions::new();
|
||||
let index_store = MockIndexStore::new();
|
||||
let uuid_store = MockUuidStore::new();
|
||||
let index_resolver = IndexResolver::new(uuid_store, index_store);
|
||||
let update_file_store = UpdateFileStore::new(dir.path()).unwrap();
|
||||
options.map_size(4096 * 100);
|
||||
let update_store = UpdateStore::open(
|
||||
options,
|
||||
dir.path(),
|
||||
Arc::new(index_resolver),
|
||||
Arc::new(AtomicBool::new(false)),
|
||||
update_file_store,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
//let mut txn = update_store.env.write_txn().unwrap();
|
||||
//let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
|
||||
//txn.commit().unwrap();
|
||||
//assert_eq!((0, 0), ids);
|
||||
let index1_uuid = Uuid::new_v4();
|
||||
let index2_uuid = Uuid::new_v4();
|
||||
|
||||
//let mut txn = update_store.env.write_txn().unwrap();
|
||||
//let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap();
|
||||
//txn.commit().unwrap();
|
||||
//assert_eq!((1, 0), ids);
|
||||
let mut txn = update_store.env.write_txn().unwrap();
|
||||
let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
|
||||
txn.commit().unwrap();
|
||||
assert_eq!((0, 0), ids);
|
||||
|
||||
//let mut txn = update_store.env.write_txn().unwrap();
|
||||
//let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
|
||||
//txn.commit().unwrap();
|
||||
//assert_eq!((2, 1), ids);
|
||||
//}
|
||||
let mut txn = update_store.env.write_txn().unwrap();
|
||||
let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap();
|
||||
txn.commit().unwrap();
|
||||
assert_eq!((1, 0), ids);
|
||||
|
||||
//#[actix_rt::test]
|
||||
//async fn test_register_update() {
|
||||
//let dir = tempfile::tempdir_in(".").unwrap();
|
||||
//let mut options = EnvOpenOptions::new();
|
||||
//let handle = Arc::new(MockIndexActorHandle::new());
|
||||
//options.map_size(4096 * 100);
|
||||
//let update_store = UpdateStore::open(
|
||||
//options,
|
||||
//dir.path(),
|
||||
//handle,
|
||||
//Arc::new(AtomicBool::new(false)),
|
||||
//)
|
||||
//.unwrap();
|
||||
//let meta = UpdateMeta::ClearDocuments;
|
||||
//let uuid = Uuid::new_v4();
|
||||
//let store_clone = update_store.clone();
|
||||
//tokio::task::spawn_blocking(move || {
|
||||
//store_clone.register_update(meta, None, uuid).unwrap();
|
||||
//})
|
||||
//.await
|
||||
//.unwrap();
|
||||
let mut txn = update_store.env.write_txn().unwrap();
|
||||
let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
|
||||
txn.commit().unwrap();
|
||||
assert_eq!((2, 1), ids);
|
||||
}
|
||||
|
||||
//let txn = update_store.env.read_txn().unwrap();
|
||||
//assert!(update_store
|
||||
//.pending_queue
|
||||
//.get(&txn, &(0, uuid, 0))
|
||||
//.unwrap()
|
||||
//.is_some());
|
||||
//}
|
||||
#[actix_rt::test]
|
||||
async fn test_register_update() {
|
||||
let dir = tempfile::tempdir_in(".").unwrap();
|
||||
let index_store = MockIndexStore::new();
|
||||
let uuid_store = MockUuidStore::new();
|
||||
let index_resolver = IndexResolver::new(uuid_store, index_store);
|
||||
let update_file_store = UpdateFileStore::new(dir.path()).unwrap();
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(4096 * 100);
|
||||
let update_store = UpdateStore::open(
|
||||
options,
|
||||
dir.path(),
|
||||
Arc::new(index_resolver),
|
||||
Arc::new(AtomicBool::new(false)),
|
||||
update_file_store,
|
||||
)
|
||||
.unwrap();
|
||||
let update = Update::ClearDocuments;
|
||||
let uuid = Uuid::new_v4();
|
||||
let store_clone = update_store.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
store_clone.register_update(uuid, update).unwrap();
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
//#[actix_rt::test]
|
||||
//async fn test_process_update() {
|
||||
//let dir = tempfile::tempdir_in(".").unwrap();
|
||||
//let mut handle = MockIndexActorHandle::new();
|
||||
let txn = update_store.env.read_txn().unwrap();
|
||||
assert!(update_store
|
||||
.pending_queue
|
||||
.get(&txn, &(0, uuid, 0))
|
||||
.unwrap()
|
||||
.is_some());
|
||||
}
|
||||
|
||||
//handle
|
||||
//.expect_update()
|
||||
//.times(2)
|
||||
//.returning(|_index_uuid, processing, _file| {
|
||||
//if processing.id() == 0 {
|
||||
//Box::pin(ok(Ok(processing.process(UpdateResult::Other))))
|
||||
//} else {
|
||||
//Box::pin(ok(Err(
|
||||
//processing.fail(IndexActorError::ExistingPrimaryKey.into())
|
||||
//)))
|
||||
//}
|
||||
//});
|
||||
#[actix_rt::test]
|
||||
async fn test_process_update_success() {
|
||||
let dir = tempfile::tempdir_in(".").unwrap();
|
||||
let index_uuid = Uuid::new_v4();
|
||||
|
||||
//let handle = Arc::new(handle);
|
||||
let mut index_store = MockIndexStore::new();
|
||||
index_store
|
||||
.expect_get()
|
||||
.with(eq(index_uuid))
|
||||
.returning(|_uuid| {
|
||||
let mocker = Mocker::default();
|
||||
mocker
|
||||
.when::<Processing, std::result::Result<Processed, Failed>>("handle_update")
|
||||
.once()
|
||||
.then(|update| Ok(update.process(status::UpdateResult::Other)));
|
||||
|
||||
//let mut options = EnvOpenOptions::new();
|
||||
//options.map_size(4096 * 100);
|
||||
//let store = UpdateStore::open(
|
||||
//options,
|
||||
//dir.path(),
|
||||
//handle.clone(),
|
||||
//Arc::new(AtomicBool::new(false)),
|
||||
//)
|
||||
//.unwrap();
|
||||
Box::pin(ok(Some(Index::faux(mocker))))
|
||||
});
|
||||
|
||||
//// wait a bit for the event loop exit.
|
||||
//tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
let uuid_store = MockUuidStore::new();
|
||||
let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
|
||||
|
||||
//let mut txn = store.env.write_txn().unwrap();
|
||||
let update_file_store = UpdateFileStore::new(dir.path()).unwrap();
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(4096 * 100);
|
||||
let store = UpdateStore::open(
|
||||
options,
|
||||
dir.path(),
|
||||
index_resolver.clone(),
|
||||
Arc::new(AtomicBool::new(false)),
|
||||
update_file_store,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
//let update = Enqueued::new(UpdateMeta::ClearDocuments, 0, None);
|
||||
//let uuid = Uuid::new_v4();
|
||||
// wait a bit for the event loop exit.
|
||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
|
||||
//store
|
||||
//.pending_queue
|
||||
//.put(&mut txn, &(0, uuid, 0), &update)
|
||||
//.unwrap();
|
||||
let mut txn = store.env.write_txn().unwrap();
|
||||
|
||||
//let update = Enqueued::new(UpdateMeta::ClearDocuments, 1, None);
|
||||
let update = Enqueued::new(Update::ClearDocuments, 0);
|
||||
|
||||
//store
|
||||
//.pending_queue
|
||||
//.put(&mut txn, &(1, uuid, 1), &update)
|
||||
//.unwrap();
|
||||
store
|
||||
.pending_queue
|
||||
.put(&mut txn, &(0, index_uuid, 0), &update)
|
||||
.unwrap();
|
||||
|
||||
//txn.commit().unwrap();
|
||||
txn.commit().unwrap();
|
||||
|
||||
//// Process the pending, and check that it has been moved to the update databases, and
|
||||
//// removed from the pending database.
|
||||
//let store_clone = store.clone();
|
||||
//tokio::task::spawn_blocking(move || {
|
||||
//store_clone.process_pending_update(handle.clone()).unwrap();
|
||||
//store_clone.process_pending_update(handle).unwrap();
|
||||
//})
|
||||
//.await
|
||||
//.unwrap();
|
||||
// Process the pending, and check that it has been moved to the update databases, and
|
||||
// removed from the pending database.
|
||||
let store_clone = store.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
store_clone.process_pending_update(index_resolver).unwrap();
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
//let txn = store.env.read_txn().unwrap();
|
||||
let txn = store.env.read_txn().unwrap();
|
||||
|
||||
//assert!(store.pending_queue.first(&txn).unwrap().is_none());
|
||||
//let update = store.updates.get(&txn, &(uuid, 0)).unwrap().unwrap();
|
||||
assert!(store.pending_queue.first(&txn).unwrap().is_none());
|
||||
let update = store.updates.get(&txn, &(index_uuid, 0)).unwrap().unwrap();
|
||||
|
||||
//assert!(matches!(update, UpdateStatus::Processed(_)));
|
||||
//let update = store.updates.get(&txn, &(uuid, 1)).unwrap().unwrap();
|
||||
assert!(matches!(update, UpdateStatus::Processed(_)));
|
||||
}
|
||||
|
||||
//assert!(matches!(update, UpdateStatus::Failed(_)));
|
||||
//}
|
||||
//}
|
||||
#[actix_rt::test]
|
||||
async fn test_process_update_failure() {
|
||||
let dir = tempfile::tempdir_in(".").unwrap();
|
||||
let index_uuid = Uuid::new_v4();
|
||||
|
||||
let mut index_store = MockIndexStore::new();
|
||||
index_store
|
||||
.expect_get()
|
||||
.with(eq(index_uuid))
|
||||
.returning(|_uuid| {
|
||||
let mocker = Mocker::default();
|
||||
mocker
|
||||
.when::<Processing, std::result::Result<Processed, Failed>>("handle_update")
|
||||
.once()
|
||||
.then(|update| Err(update.fail(IndexError::ExistingPrimaryKey)));
|
||||
|
||||
Box::pin(ok(Some(Index::faux(mocker))))
|
||||
});
|
||||
|
||||
let uuid_store = MockUuidStore::new();
|
||||
let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
|
||||
|
||||
let update_file_store = UpdateFileStore::new(dir.path()).unwrap();
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(4096 * 100);
|
||||
let store = UpdateStore::open(
|
||||
options,
|
||||
dir.path(),
|
||||
index_resolver.clone(),
|
||||
Arc::new(AtomicBool::new(false)),
|
||||
update_file_store,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
// wait a bit for the event loop exit.
|
||||
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
||||
|
||||
let mut txn = store.env.write_txn().unwrap();
|
||||
|
||||
let update = Enqueued::new(Update::ClearDocuments, 0);
|
||||
|
||||
store
|
||||
.pending_queue
|
||||
.put(&mut txn, &(0, index_uuid, 0), &update)
|
||||
.unwrap();
|
||||
|
||||
txn.commit().unwrap();
|
||||
|
||||
// Process the pending, and check that it has been moved to the update databases, and
|
||||
// removed from the pending database.
|
||||
let store_clone = store.clone();
|
||||
tokio::task::spawn_blocking(move || {
|
||||
store_clone.process_pending_update(index_resolver).unwrap();
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let txn = store.env.read_txn().unwrap();
|
||||
|
||||
assert!(store.pending_queue.first(&txn).unwrap().is_none());
|
||||
let update = store.updates.get(&txn, &(index_uuid, 0)).unwrap().unwrap();
|
||||
|
||||
assert!(matches!(update, UpdateStatus::Failed(_)));
|
||||
}
|
||||
}
|
||||
|
@ -5,7 +5,8 @@ pub mod options;
|
||||
pub mod index;
|
||||
pub mod index_controller;
|
||||
|
||||
pub use index_controller::{updates::store::Update, IndexController as MeiliSearch};
|
||||
pub use index_controller::updates::store::Update;
|
||||
pub use index_controller::MeiliSearch;
|
||||
|
||||
pub use milli;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user