Merge branch 'main' into feature-docker-as-non-root

This commit is contained in:
Clémentine Urquizar 2021-10-14 14:45:28 +02:00 committed by GitHub
commit 44149bec60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 1655 additions and 608 deletions

View File

@ -14,9 +14,9 @@ It's [Hacktoberfest month](https://blog.meilisearch.com/contribute-hacktoberfest
🚀 If your PR gets accepted it will count into your participation to Hacktoberfest! 🚀 If your PR gets accepted it will count into your participation to Hacktoberfest!
✅ To be accepted it has either to have been merged, approved or tagged with the `hacktoberest-accepted` label. ✅ To be accepted it has either to have been merged, approved or tagged with the `hacktoberfest-accepted` label.
🧐 Don't forget to check the [quality standards](https://hacktoberfest.digitalocean.com/resources/qualitystandards), otherwise your PR could be marked as `spam` or `invalid`, and it will not be counted toward your participation in Hacktoberfest. 🧐 Don't forget to check the [quality standards](https://hacktoberfest.digitalocean.com/resources/qualitystandards)! Low-quality PRs might get marked as `spam` or `invalid`, and will not count toward your participation in Hacktoberfest.
## Assumptions ## Assumptions

108
Cargo.lock generated
View File

@ -825,6 +825,12 @@ version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2c9736e15e7df1638a7f6eee92a6511615c738246a052af5ba86f039b65aede" checksum = "f2c9736e15e7df1638a7f6eee92a6511615c738246a052af5ba86f039b65aede"
[[package]]
name = "difference"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198"
[[package]] [[package]]
name = "digest" name = "digest"
version = "0.8.1" version = "0.8.1"
@ -849,6 +855,12 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0"
[[package]]
name = "downcast"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bb454f0228b18c7f4c3b0ebbee346ed9c52e7443b0999cd543ff3571205701d"
[[package]] [[package]]
name = "either" name = "either"
version = "1.6.1" version = "1.6.1"
@ -933,6 +945,15 @@ dependencies = [
"miniz_oxide", "miniz_oxide",
] ]
[[package]]
name = "float-cmp"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1267f4ac4f343772758f7b1bdcbe767c218bbab93bb432acbf5162bbf85a6c4"
dependencies = [
"num-traits",
]
[[package]] [[package]]
name = "fnv" name = "fnv"
version = "1.0.7" version = "1.0.7"
@ -949,6 +970,12 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "fragile"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69a039c3498dc930fe810151a34ba0c1c70b02b8625035592e74432f678591f2"
[[package]] [[package]]
name = "fs_extra" name = "fs_extra"
version = "1.2.0" version = "1.2.0"
@ -1581,7 +1608,7 @@ checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]] [[package]]
name = "meilisearch-error" name = "meilisearch-error"
version = "0.23.0" version = "0.23.1"
dependencies = [ dependencies = [
"actix-http", "actix-http",
"serde", "serde",
@ -1589,7 +1616,7 @@ dependencies = [
[[package]] [[package]]
name = "meilisearch-http" name = "meilisearch-http"
version = "0.23.0" version = "0.23.1"
dependencies = [ dependencies = [
"actix-cors", "actix-cors",
"actix-rt", "actix-rt",
@ -1619,7 +1646,6 @@ dependencies = [
"meilisearch-error", "meilisearch-error",
"meilisearch-lib", "meilisearch-lib",
"meilisearch-tokenizer", "meilisearch-tokenizer",
"memmap",
"mime", "mime",
"num_cpus", "num_cpus",
"obkv", "obkv",
@ -1657,7 +1683,7 @@ dependencies = [
[[package]] [[package]]
name = "meilisearch-lib" name = "meilisearch-lib"
version = "0.23.0" version = "0.23.1"
dependencies = [ dependencies = [
"actix-rt", "actix-rt",
"actix-web", "actix-web",
@ -1685,9 +1711,9 @@ dependencies = [
"log", "log",
"meilisearch-error", "meilisearch-error",
"meilisearch-tokenizer", "meilisearch-tokenizer",
"memmap",
"milli", "milli",
"mime", "mime",
"mockall",
"num_cpus", "num_cpus",
"obkv", "obkv",
"once_cell", "once_cell",
@ -1757,8 +1783,8 @@ dependencies = [
[[package]] [[package]]
name = "milli" name = "milli"
version = "0.17.0" version = "0.17.2"
source = "git+https://github.com/meilisearch/milli.git?tag=v0.17.0#22551d0941bee1a9cdcf7d5bfc4ca46517dd25f3" source = "git+https://github.com/meilisearch/milli.git?tag=v0.17.3#1e8acaa20b323a198229ad8ede96d045072e45c8"
dependencies = [ dependencies = [
"bimap", "bimap",
"bincode", "bincode",
@ -1847,6 +1873,39 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "mockall"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ab571328afa78ae322493cacca3efac6a0f2e0a67305b4df31fd439ef129ac0"
dependencies = [
"cfg-if 1.0.0",
"downcast",
"fragile",
"lazy_static",
"mockall_derive",
"predicates",
"predicates-tree",
]
[[package]]
name = "mockall_derive"
version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7e25b214433f669161f414959594216d8e6ba83b6679d3db96899c0b4639033"
dependencies = [
"cfg-if 1.0.0",
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.77",
]
[[package]]
name = "normalize-line-endings"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be"
[[package]] [[package]]
name = "ntapi" name = "ntapi"
version = "0.3.6" version = "0.3.6"
@ -2119,6 +2178,35 @@ version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857"
[[package]]
name = "predicates"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f49cfaf7fdaa3bfacc6fa3e7054e65148878354a5cfddcf661df4c851f8021df"
dependencies = [
"difference",
"float-cmp",
"normalize-line-endings",
"predicates-core",
"regex",
]
[[package]]
name = "predicates-core"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57e35a3326b75e49aa85f5dc6ec15b41108cf5aee58eabb1f274dd18b73c2451"
[[package]]
name = "predicates-tree"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d7dd0fd014130206c9352efbdc92be592751b2b9274dff685348341082c6ea3d"
dependencies = [
"predicates-core",
"treeline",
]
[[package]] [[package]]
name = "proc-macro-error" name = "proc-macro-error"
version = "1.0.4" version = "1.0.4"
@ -3044,6 +3132,12 @@ dependencies = [
"lazy_static", "lazy_static",
] ]
[[package]]
name = "treeline"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7f741b240f1a48843f9b8e0444fb55fb2a4ff67293b50a9179dfd5ea67f8d41"
[[package]] [[package]]
name = "try-lock" name = "try-lock"
version = "0.2.3" version = "0.2.3"

View File

@ -6,8 +6,5 @@ members = [
] ]
resolver = "2" resolver = "2"
[profile.release]
debug = true
[patch.crates-io] [patch.crates-io]
pest = { git = "https://github.com/pest-parser/pest.git", rev = "51fd1d49f1041f7839975664ef71fe15c7dcaf67" } pest = { git = "https://github.com/pest-parser/pest.git", rev = "51fd1d49f1041f7839975664ef71fe15c7dcaf67" }

View File

@ -187,7 +187,7 @@ Search and indexation are the domain of our core engine, [`milli`](https://githu
MeiliSearch collects anonymous data regarding general usage. MeiliSearch collects anonymous data regarding general usage.
This helps us better understand developers' usage of MeiliSearch features. This helps us better understand developers' usage of MeiliSearch features.
To see what information we're retrieving, please see the complete list [on the dedicated issue](https://github.com/meilisearch/MeiliSearch/issues/720). To find out more on what information we're retrieving, please see our documentation on [Telemetry](https://docs.meilisearch.com/learn/what_is_meilisearch/telemetry.html).
This program is optional, you can disable these analytics by using the `MEILI_NO_ANALYTICS` env variable. This program is optional, you can disable these analytics by using the `MEILI_NO_ANALYTICS` env variable.

View File

@ -1,6 +1,6 @@
[package] [package]
name = "meilisearch-error" name = "meilisearch-error"
version = "0.23.0" version = "0.23.1"
authors = ["marin <postma.marin@protonmail.com>"] authors = ["marin <postma.marin@protonmail.com>"]
edition = "2018" edition = "2018"

View File

@ -4,7 +4,7 @@ description = "MeiliSearch HTTP server"
edition = "2018" edition = "2018"
license = "MIT" license = "MIT"
name = "meilisearch-http" name = "meilisearch-http"
version = "0.23.0" version = "0.23.1"
[[bin]] [[bin]]
name = "meilisearch" name = "meilisearch"
@ -47,7 +47,6 @@ log = "0.4.14"
meilisearch-lib = { path = "../meilisearch-lib" } meilisearch-lib = { path = "../meilisearch-lib" }
meilisearch-error = { path = "../meilisearch-error" } meilisearch-error = { path = "../meilisearch-error" }
meilisearch-tokenizer = { git = "https://github.com/meilisearch/tokenizer.git", tag = "v0.2.5" } meilisearch-tokenizer = { git = "https://github.com/meilisearch/tokenizer.git", tag = "v0.2.5" }
memmap = "0.7.0"
mime = "0.3.16" mime = "0.3.16"
num_cpus = "1.13.0" num_cpus = "1.13.0"
once_cell = "1.8.0" once_cell = "1.8.0"

View File

@ -11,21 +11,31 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub enum MeilisearchHttpError { pub enum MeilisearchHttpError {
#[error("A Content-Type header is missing. Accepted values for the Content-Type header are: \"application/json\", \"application/x-ndjson\", \"text/csv\"")] #[error("A Content-Type header is missing. Accepted values for the Content-Type header are: {}",
MissingContentType, .0.iter().map(|s| format!("\"{}\"", s)).collect::<Vec<_>>().join(", "))]
#[error("The Content-Type \"{0}\" is invalid. Accepted values for the Content-Type header are: \"application/json\", \"application/x-ndjson\", \"text/csv\"")] MissingContentType(Vec<String>),
InvalidContentType(String), #[error(
"The Content-Type \"{0}\" is invalid. Accepted values for the Content-Type header are: {}",
.1.iter().map(|s| format!("\"{}\"", s)).collect::<Vec<_>>().join(", ")
)]
InvalidContentType(String, Vec<String>),
} }
impl ErrorCode for MeilisearchHttpError { impl ErrorCode for MeilisearchHttpError {
fn error_code(&self) -> Code { fn error_code(&self) -> Code {
match self { match self {
MeilisearchHttpError::MissingContentType => Code::MissingContentType, MeilisearchHttpError::MissingContentType(_) => Code::MissingContentType,
MeilisearchHttpError::InvalidContentType(_) => Code::InvalidContentType, MeilisearchHttpError::InvalidContentType(_, _) => Code::InvalidContentType,
} }
} }
} }
impl From<MeilisearchHttpError> for aweb::Error {
fn from(other: MeilisearchHttpError) -> Self {
aweb::Error::from(ResponseError::from(other))
}
}
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct ResponseError { pub struct ResponseError {
@ -121,9 +131,8 @@ impl From<QueryPayloadError> for PayloadError {
} }
} }
pub fn payload_error_handler<E>(err: E) -> ResponseError impl From<PayloadError> for aweb::Error {
where fn from(other: PayloadError) -> Self {
E: Into<PayloadError>, aweb::Error::from(ResponseError::from(other))
{ }
err.into().into()
} }

View File

@ -168,7 +168,8 @@ impl<P: Policy + 'static, D: 'static + Clone> FromRequest for GuardedData<P, D>
None => err(AuthenticationError::IrretrievableState.into()), None => err(AuthenticationError::IrretrievableState.into()),
} }
} else { } else {
err(AuthenticationError::InvalidToken(String::from("hello")).into()) let token = token.to_str().unwrap_or("unknown").to_string();
err(AuthenticationError::InvalidToken(token).into())
} }
} }
None => err(AuthenticationError::MissingAuthorizationHeader.into()), None => err(AuthenticationError::MissingAuthorizationHeader.into()),

View File

@ -11,10 +11,14 @@ pub mod routes;
use std::path::Path; use std::path::Path;
use std::time::Duration; use std::time::Duration;
use crate::error::MeilisearchHttpError;
use crate::extractors::authentication::AuthConfig; use crate::extractors::authentication::AuthConfig;
use actix_web::error::JsonPayloadError;
use error::PayloadError;
use http::header::CONTENT_TYPE;
pub use option::Opt; pub use option::Opt;
use actix_web::web; use actix_web::{web, HttpRequest};
use extractors::authentication::policies::*; use extractors::authentication::policies::*;
use extractors::payload::PayloadConfig; use extractors::payload::PayloadConfig;
@ -98,14 +102,25 @@ pub fn configure_data(config: &mut web::ServiceConfig, data: MeiliSearch, opt: &
.app_data(data) .app_data(data)
.app_data( .app_data(
web::JsonConfig::default() web::JsonConfig::default()
.limit(http_payload_size_limit) .content_type(|mime| mime == mime::APPLICATION_JSON)
.content_type(|_mime| true) // Accept all mime types .error_handler(|err, req: &HttpRequest| match err {
.error_handler(|err, _req| error::payload_error_handler(err).into()), JsonPayloadError::ContentType => match req.headers().get(CONTENT_TYPE) {
Some(content_type) => MeilisearchHttpError::InvalidContentType(
content_type.to_str().unwrap_or("unknown").to_string(),
vec![mime::APPLICATION_JSON.to_string()],
)
.into(),
None => MeilisearchHttpError::MissingContentType(vec![
mime::APPLICATION_JSON.to_string(),
])
.into(),
},
err => PayloadError::from(err).into(),
}),
) )
.app_data(PayloadConfig::new(http_payload_size_limit)) .app_data(PayloadConfig::new(http_payload_size_limit))
.app_data( .app_data(
web::QueryConfig::default() web::QueryConfig::default().error_handler(|err, _req| PayloadError::from(err).into()),
.error_handler(|err, _req| error::payload_error_handler(err).into()),
); );
} }
@ -147,7 +162,6 @@ pub fn dashboard(config: &mut web::ServiceConfig, enable_frontend: bool) {
if enable_frontend { if enable_frontend {
let generated = generated::generate(); let generated = generated::generate();
let mut scope = web::scope("/");
// Generate routes for mini-dashboard assets // Generate routes for mini-dashboard assets
for (path, resource) in generated.into_iter() { for (path, resource) in generated.into_iter() {
let Resource { let Resource {
@ -159,12 +173,11 @@ pub fn dashboard(config: &mut web::ServiceConfig, enable_frontend: bool) {
web::get().to(move || HttpResponse::Ok().content_type(mime_type).body(data)), web::get().to(move || HttpResponse::Ok().content_type(mime_type).body(data)),
)); ));
} else { } else {
scope = scope.service(web::resource(path).route( config.service(web::resource(path).route(
web::get().to(move || HttpResponse::Ok().content_type(mime_type).body(data)), web::get().to(move || HttpResponse::Ok().content_type(mime_type).body(data)),
)); ));
} }
} }
config.service(scope);
} else { } else {
config.service(web::resource("/").route(web::get().to(routes::running))); config.service(web::resource("/").route(web::get().to(routes::running)));
} }
@ -182,6 +195,7 @@ macro_rules! create_app {
use actix_web::middleware::TrailingSlash; use actix_web::middleware::TrailingSlash;
use actix_web::App; use actix_web::App;
use actix_web::{middleware, web}; use actix_web::{middleware, web};
use meilisearch_http::error::{MeilisearchHttpError, ResponseError};
use meilisearch_http::routes; use meilisearch_http::routes;
use meilisearch_http::{configure_auth, configure_data, dashboard}; use meilisearch_http::{configure_auth, configure_data, dashboard};

View File

@ -6,6 +6,7 @@ use log::debug;
use meilisearch_lib::index_controller::{DocumentAdditionFormat, Update}; use meilisearch_lib::index_controller::{DocumentAdditionFormat, Update};
use meilisearch_lib::milli::update::IndexDocumentsMethod; use meilisearch_lib::milli::update::IndexDocumentsMethod;
use meilisearch_lib::MeiliSearch; use meilisearch_lib::MeiliSearch;
use once_cell::sync::Lazy;
use serde::Deserialize; use serde::Deserialize;
use serde_json::Value; use serde_json::Value;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -176,14 +177,29 @@ async fn document_addition(
body: Payload, body: Payload,
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| {
vec![
"application/json".to_string(),
"application/x-ndjson".to_string(),
"text/csv".to_string(),
]
});
let format = match content_type { let format = match content_type {
Some("application/json") => DocumentAdditionFormat::Json, Some("application/json") => DocumentAdditionFormat::Json,
Some("application/x-ndjson") => DocumentAdditionFormat::Ndjson, Some("application/x-ndjson") => DocumentAdditionFormat::Ndjson,
Some("text/csv") => DocumentAdditionFormat::Csv, Some("text/csv") => DocumentAdditionFormat::Csv,
Some(other) => { Some(other) => {
return Err(MeilisearchHttpError::InvalidContentType(other.to_string()).into()) return Err(MeilisearchHttpError::InvalidContentType(
other.to_string(),
ACCEPTED_CONTENT_TYPE.clone(),
)
.into())
}
None => {
return Err(
MeilisearchHttpError::MissingContentType(ACCEPTED_CONTENT_TYPE.clone()).into(),
)
} }
None => return Err(MeilisearchHttpError::MissingContentType.into()),
}; };
let update = Update::DocumentAddition { let update = Update::DocumentAddition {

View File

@ -0,0 +1,111 @@
#![allow(dead_code)]
mod common;
use crate::common::Server;
use actix_web::test;
use meilisearch_http::create_app;
use serde_json::{json, Value};
#[actix_rt::test]
async fn strict_json_bad_content_type() {
let routes = [
// all the POST routes except the dumps that can be created without any body or content-type
// and the search that is not a strict json
"/indexes",
"/indexes/doggo/documents/delete-batch",
"/indexes/doggo/search",
"/indexes/doggo/settings",
"/indexes/doggo/settings/displayed-attributes",
"/indexes/doggo/settings/distinct-attribute",
"/indexes/doggo/settings/filterable-attributes",
"/indexes/doggo/settings/ranking-rules",
"/indexes/doggo/settings/searchable-attributes",
"/indexes/doggo/settings/sortable-attributes",
"/indexes/doggo/settings/stop-words",
"/indexes/doggo/settings/synonyms",
];
let bad_content_types = [
"application/csv",
"application/x-ndjson",
"application/x-www-form-urlencoded",
"text/plain",
"json",
"application",
"json/application",
];
let document = "{}";
let server = Server::new().await;
let app = test::init_service(create_app!(
&server.service.meilisearch,
true,
&server.service.options
))
.await;
for route in routes {
// Good content-type, we probably have an error since we didn't send anything in the json
// so we only ensure we didn't get a bad media type error.
let req = test::TestRequest::post()
.uri(route)
.set_payload(document)
.insert_header(("content-type", "application/json"))
.to_request();
let res = test::call_service(&app, req).await;
let status_code = res.status();
assert_ne!(status_code, 415,
"calling the route `{}` with a content-type of json isn't supposed to throw a bad media type error", route);
// No content-type.
let req = test::TestRequest::post()
.uri(route)
.set_payload(document)
.to_request();
let res = test::call_service(&app, req).await;
let status_code = res.status();
let body = test::read_body(res).await;
let response: Value = serde_json::from_slice(&body).unwrap_or_default();
assert_eq!(status_code, 415, "calling the route `{}` without content-type is supposed to throw a bad media type error", route);
assert_eq!(
response,
json!({
"message": r#"A Content-Type header is missing. Accepted values for the Content-Type header are: "application/json""#,
"errorCode": "missing_content_type",
"errorType": "invalid_request_error",
"errorLink": "https://docs.meilisearch.com/errors#missing_content_type",
}),
"when calling the route `{}` with no content-type",
route,
);
for bad_content_type in bad_content_types {
// Always bad content-type
let req = test::TestRequest::post()
.uri(route)
.set_payload(document.to_string())
.insert_header(("content-type", bad_content_type))
.to_request();
let res = test::call_service(&app, req).await;
let status_code = res.status();
let body = test::read_body(res).await;
let response: Value = serde_json::from_slice(&body).unwrap_or_default();
assert_eq!(status_code, 415);
let expected_error_message = format!(
r#"The Content-Type "{}" is invalid. Accepted values for the Content-Type header are: "application/json""#,
bad_content_type
);
assert_eq!(
response,
json!({
"message": expected_error_message,
"errorCode": "invalid_content_type",
"errorType": "invalid_request_error",
"errorLink": "https://docs.meilisearch.com/errors#invalid_content_type",
}),
"when calling the route `{}` with a content-type of `{}`",
route,
bad_content_type,
);
}
}
}

View File

@ -22,6 +22,7 @@ async fn add_documents_test_json_content_types() {
&server.service.options &server.service.options
)) ))
.await; .await;
// post
let req = test::TestRequest::post() let req = test::TestRequest::post()
.uri("/indexes/dog/documents") .uri("/indexes/dog/documents")
.set_payload(document.to_string()) .set_payload(document.to_string())
@ -33,6 +34,19 @@ async fn add_documents_test_json_content_types() {
let response: Value = serde_json::from_slice(&body).unwrap_or_default(); let response: Value = serde_json::from_slice(&body).unwrap_or_default();
assert_eq!(status_code, 202); assert_eq!(status_code, 202);
assert_eq!(response, json!({ "updateId": 0 })); assert_eq!(response, json!({ "updateId": 0 }));
// put
let req = test::TestRequest::put()
.uri("/indexes/dog/documents")
.set_payload(document.to_string())
.insert_header(("content-type", "application/json"))
.to_request();
let res = test::call_service(&app, req).await;
let status_code = res.status();
let body = test::read_body(res).await;
let response: Value = serde_json::from_slice(&body).unwrap_or_default();
assert_eq!(status_code, 202);
assert_eq!(response, json!({ "updateId": 1 }));
} }
/// no content type is still supposed to be accepted as json /// no content type is still supposed to be accepted as json
@ -52,6 +66,7 @@ async fn add_documents_test_no_content_types() {
&server.service.options &server.service.options
)) ))
.await; .await;
// post
let req = test::TestRequest::post() let req = test::TestRequest::post()
.uri("/indexes/dog/documents") .uri("/indexes/dog/documents")
.set_payload(document.to_string()) .set_payload(document.to_string())
@ -63,11 +78,23 @@ async fn add_documents_test_no_content_types() {
let response: Value = serde_json::from_slice(&body).unwrap_or_default(); let response: Value = serde_json::from_slice(&body).unwrap_or_default();
assert_eq!(status_code, 202); assert_eq!(status_code, 202);
assert_eq!(response, json!({ "updateId": 0 })); assert_eq!(response, json!({ "updateId": 0 }));
// put
let req = test::TestRequest::put()
.uri("/indexes/dog/documents")
.set_payload(document.to_string())
.insert_header(("content-type", "application/json"))
.to_request();
let res = test::call_service(&app, req).await;
let status_code = res.status();
let body = test::read_body(res).await;
let response: Value = serde_json::from_slice(&body).unwrap_or_default();
assert_eq!(status_code, 202);
assert_eq!(response, json!({ "updateId": 1 }));
} }
/// any other content-type is must be refused /// any other content-type is must be refused
#[actix_rt::test] #[actix_rt::test]
#[ignore]
async fn add_documents_test_bad_content_types() { async fn add_documents_test_bad_content_types() {
let document = json!([ let document = json!([
{ {
@ -83,6 +110,7 @@ async fn add_documents_test_bad_content_types() {
&server.service.options &server.service.options
)) ))
.await; .await;
// post
let req = test::TestRequest::post() let req = test::TestRequest::post()
.uri("/indexes/dog/documents") .uri("/indexes/dog/documents")
.set_payload(document.to_string()) .set_payload(document.to_string())
@ -91,8 +119,32 @@ async fn add_documents_test_bad_content_types() {
let res = test::call_service(&app, req).await; let res = test::call_service(&app, req).await;
let status_code = res.status(); let status_code = res.status();
let body = test::read_body(res).await; let body = test::read_body(res).await;
assert_eq!(status_code, 405); let response: Value = serde_json::from_slice(&body).unwrap_or_default();
assert!(body.is_empty()); assert_eq!(status_code, 415);
assert_eq!(
response["message"],
json!(
r#"The Content-Type "text/plain" is invalid. Accepted values for the Content-Type header are: "application/json", "application/x-ndjson", "text/csv""#
)
);
// put
let req = test::TestRequest::put()
.uri("/indexes/dog/documents")
.set_payload(document.to_string())
.insert_header(("content-type", "text/plain"))
.to_request();
let res = test::call_service(&app, req).await;
let status_code = res.status();
let body = test::read_body(res).await;
let response: Value = serde_json::from_slice(&body).unwrap_or_default();
assert_eq!(status_code, 415);
assert_eq!(
response["message"],
json!(
r#"The Content-Type "text/plain" is invalid. Accepted values for the Content-Type header are: "application/json", "application/x-ndjson", "text/csv""#
)
);
} }
#[actix_rt::test] #[actix_rt::test]

View File

@ -1,6 +1,6 @@
[package] [package]
name = "meilisearch-lib" name = "meilisearch-lib"
version = "0.23.0" version = "0.23.1"
edition = "2018" edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@ -30,8 +30,7 @@ lazy_static = "1.4.0"
log = "0.4.14" log = "0.4.14"
meilisearch-error = { path = "../meilisearch-error" } meilisearch-error = { path = "../meilisearch-error" }
meilisearch-tokenizer = { git = "https://github.com/meilisearch/tokenizer.git", tag = "v0.2.5" } meilisearch-tokenizer = { git = "https://github.com/meilisearch/tokenizer.git", tag = "v0.2.5" }
memmap = "0.7.0" milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.17.3" }
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.17.0"}
mime = "0.3.16" mime = "0.3.16"
num_cpus = "1.13.0" num_cpus = "1.13.0"
once_cell = "1.8.0" once_cell = "1.8.0"
@ -60,4 +59,5 @@ derivative = "2.2.0"
[dev-dependencies] [dev-dependencies]
actix-rt = "2.2.0" actix-rt = "2.2.0"
mockall = "0.10.2"
paste = "1.0.5" paste = "1.0.5"

View File

@ -13,7 +13,7 @@ use crate::index::update_handler::UpdateHandler;
use crate::index::updates::apply_settings_to_builder; use crate::index::updates::apply_settings_to_builder;
use super::error::Result; use super::error::Result;
use super::{Index, Settings, Unchecked}; use super::{index::Index, Settings, Unchecked};
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct DumpMeta { struct DumpMeta {

View File

@ -0,0 +1,286 @@
use std::collections::{BTreeSet, HashSet};
use std::fs::create_dir_all;
use std::marker::PhantomData;
use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use heed::{EnvOpenOptions, RoTxn};
use milli::update::Setting;
use milli::{obkv_to_json, FieldDistribution, FieldId};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use uuid::Uuid;
use crate::index_controller::update_file_store::UpdateFileStore;
use crate::EnvSizer;
use super::error::IndexError;
use super::error::Result;
use super::update_handler::UpdateHandler;
use super::{Checked, Settings};
pub type Document = Map<String, Value>;
#[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct IndexMeta {
created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub primary_key: Option<String>,
}
impl IndexMeta {
pub fn new(index: &Index) -> Result<Self> {
let txn = index.read_txn()?;
Self::new_txn(index, &txn)
}
pub fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result<Self> {
let created_at = index.created_at(txn)?;
let updated_at = index.updated_at(txn)?;
let primary_key = index.primary_key(txn)?.map(String::from);
Ok(Self {
created_at,
updated_at,
primary_key,
})
}
}
#[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct IndexStats {
#[serde(skip)]
pub size: u64,
pub number_of_documents: u64,
/// Whether the current index is performing an update. It is initially `None` when the
/// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is
/// later set to either true or false, we we retrieve the information from the `UpdateStore`
pub is_indexing: Option<bool>,
pub field_distribution: FieldDistribution,
}
#[derive(Clone, derivative::Derivative)]
#[derivative(Debug)]
pub struct Index {
pub uuid: Uuid,
#[derivative(Debug = "ignore")]
pub inner: Arc<milli::Index>,
#[derivative(Debug = "ignore")]
pub update_file_store: Arc<UpdateFileStore>,
#[derivative(Debug = "ignore")]
pub update_handler: Arc<UpdateHandler>,
}
impl Deref for Index {
type Target = milli::Index;
fn deref(&self) -> &Self::Target {
self.inner.as_ref()
}
}
impl Index {
pub fn open(
path: impl AsRef<Path>,
size: usize,
update_file_store: Arc<UpdateFileStore>,
uuid: Uuid,
update_handler: Arc<UpdateHandler>,
) -> Result<Self> {
create_dir_all(&path)?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let inner = Arc::new(milli::Index::new(options, &path)?);
Ok(Index {
inner,
update_file_store,
uuid,
update_handler,
})
}
pub fn inner(&self) -> &milli::Index {
&self.inner
}
pub fn stats(&self) -> Result<IndexStats> {
let rtxn = self.read_txn()?;
Ok(IndexStats {
size: self.size(),
number_of_documents: self.number_of_documents(&rtxn)?,
is_indexing: None,
field_distribution: self.field_distribution(&rtxn)?,
})
}
pub fn meta(&self) -> Result<IndexMeta> {
IndexMeta::new(self)
}
pub fn settings(&self) -> Result<Settings<Checked>> {
let txn = self.read_txn()?;
self.settings_txn(&txn)
}
pub fn uuid(&self) -> Uuid {
self.uuid
}
pub fn settings_txn(&self, txn: &RoTxn) -> Result<Settings<Checked>> {
let displayed_attributes = self
.displayed_fields(txn)?
.map(|fields| fields.into_iter().map(String::from).collect());
let searchable_attributes = self
.searchable_fields(txn)?
.map(|fields| fields.into_iter().map(String::from).collect());
let filterable_attributes = self.filterable_fields(txn)?.into_iter().collect();
let sortable_attributes = self.sortable_fields(txn)?.into_iter().collect();
let criteria = self
.criteria(txn)?
.into_iter()
.map(|c| c.to_string())
.collect();
let stop_words = self
.stop_words(txn)?
.map(|stop_words| -> Result<BTreeSet<_>> {
Ok(stop_words.stream().into_strs()?.into_iter().collect())
})
.transpose()?
.unwrap_or_else(BTreeSet::new);
let distinct_field = self.distinct_field(txn)?.map(String::from);
// in milli each word in the synonyms map were split on their separator. Since we lost
// this information we are going to put space between words.
let synonyms = self
.synonyms(txn)?
.iter()
.map(|(key, values)| {
(
key.join(" "),
values.iter().map(|value| value.join(" ")).collect(),
)
})
.collect();
Ok(Settings {
displayed_attributes: match displayed_attributes {
Some(attrs) => Setting::Set(attrs),
None => Setting::Reset,
},
searchable_attributes: match searchable_attributes {
Some(attrs) => Setting::Set(attrs),
None => Setting::Reset,
},
filterable_attributes: Setting::Set(filterable_attributes),
sortable_attributes: Setting::Set(sortable_attributes),
ranking_rules: Setting::Set(criteria),
stop_words: Setting::Set(stop_words),
distinct_attribute: match distinct_field {
Some(field) => Setting::Set(field),
None => Setting::Reset,
},
synonyms: Setting::Set(synonyms),
_kind: PhantomData,
})
}
pub fn retrieve_documents<S: AsRef<str>>(
&self,
offset: usize,
limit: usize,
attributes_to_retrieve: Option<Vec<S>>,
) -> Result<Vec<Map<String, Value>>> {
let txn = self.read_txn()?;
let fields_ids_map = self.fields_ids_map(&txn)?;
let fields_to_display =
self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?;
let iter = self.documents.range(&txn, &(..))?.skip(offset).take(limit);
let mut documents = Vec::new();
for entry in iter {
let (_id, obkv) = entry?;
let object = obkv_to_json(&fields_to_display, &fields_ids_map, obkv)?;
documents.push(object);
}
Ok(documents)
}
pub fn retrieve_document<S: AsRef<str>>(
&self,
doc_id: String,
attributes_to_retrieve: Option<Vec<S>>,
) -> Result<Map<String, Value>> {
let txn = self.read_txn()?;
let fields_ids_map = self.fields_ids_map(&txn)?;
let fields_to_display =
self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?;
let internal_id = self
.external_documents_ids(&txn)?
.get(doc_id.as_bytes())
.ok_or_else(|| IndexError::DocumentNotFound(doc_id.clone()))?;
let document = self
.documents(&txn, std::iter::once(internal_id))?
.into_iter()
.next()
.map(|(_, d)| d)
.ok_or(IndexError::DocumentNotFound(doc_id))?;
let document = obkv_to_json(&fields_to_display, &fields_ids_map, document)?;
Ok(document)
}
pub fn size(&self) -> u64 {
self.env.size()
}
fn fields_to_display<S: AsRef<str>>(
&self,
txn: &heed::RoTxn,
attributes_to_retrieve: &Option<Vec<S>>,
fields_ids_map: &milli::FieldsIdsMap,
) -> Result<Vec<FieldId>> {
let mut displayed_fields_ids = match self.displayed_fields_ids(txn)? {
Some(ids) => ids.into_iter().collect::<Vec<_>>(),
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
};
let attributes_to_retrieve_ids = match attributes_to_retrieve {
Some(attrs) => attrs
.iter()
.filter_map(|f| fields_ids_map.id(f.as_ref()))
.collect::<HashSet<_>>(),
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
};
displayed_fields_ids.retain(|fid| attributes_to_retrieve_ids.contains(fid));
Ok(displayed_fields_ids)
}
pub fn snapshot(&self, path: impl AsRef<Path>) -> Result<()> {
let mut dst = path.as_ref().join(format!("indexes/{}/", self.uuid));
create_dir_all(&dst)?;
dst.push("data.mdb");
let _txn = self.write_txn()?;
self.inner
.env
.copy_to_path(dst, heed::CompactionOption::Enabled)?;
Ok(())
}
}

View File

@ -1,97 +1,203 @@
use std::collections::{BTreeSet, HashSet};
use std::fs::create_dir_all;
use std::marker::PhantomData;
use std::ops::Deref;
use std::path::Path;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use heed::{EnvOpenOptions, RoTxn};
use milli::update::Setting;
use milli::{obkv_to_json, FieldDistribution, FieldId};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use error::Result;
pub use search::{default_crop_length, SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT}; pub use search::{default_crop_length, SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT};
pub use updates::{apply_settings_to_builder, Checked, Facets, Settings, Unchecked}; pub use updates::{apply_settings_to_builder, Checked, Facets, Settings, Unchecked};
use uuid::Uuid;
use crate::index_controller::update_file_store::UpdateFileStore;
use crate::EnvSizer;
use self::error::IndexError;
use self::update_handler::UpdateHandler;
pub mod error;
pub mod update_handler;
mod dump; mod dump;
pub mod error;
mod search; mod search;
pub mod update_handler;
mod updates; mod updates;
pub type Document = Map<String, Value>; #[allow(clippy::module_inception)]
mod index;
#[derive(Debug, Serialize, Deserialize, Clone)] pub use index::{Document, IndexMeta, IndexStats};
#[serde(rename_all = "camelCase")]
pub struct IndexMeta {
created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub primary_key: Option<String>,
}
#[derive(Serialize, Debug)] #[cfg(not(test))]
#[serde(rename_all = "camelCase")] pub use index::Index;
pub struct IndexStats {
#[serde(skip)]
pub size: u64,
pub number_of_documents: u64,
/// Whether the current index is performing an update. It is initially `None` when the
/// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is
/// later set to either true or false, we we retrieve the information from the `UpdateStore`
pub is_indexing: Option<bool>,
pub field_distribution: FieldDistribution,
}
impl IndexMeta { #[cfg(test)]
pub fn new(index: &Index) -> Result<Self> { pub use test::MockIndex as Index;
let txn = index.read_txn()?;
Self::new_txn(index, &txn) /// The index::test module provides means of mocking an index instance. I can be used throughout the
/// code for unit testing, in places where an index would normally be used.
#[cfg(test)]
pub mod test {
use std::any::Any;
use std::collections::HashMap;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::path::Path;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use serde_json::{Map, Value};
use uuid::Uuid;
use crate::index_controller::update_file_store::UpdateFileStore;
use crate::index_controller::updates::status::{Failed, Processed, Processing};
use super::error::Result;
use super::index::Index;
use super::update_handler::UpdateHandler;
use super::{Checked, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings};
pub struct Stub<A, R> {
name: String,
times: Mutex<Option<usize>>,
stub: Box<dyn Fn(A) -> R + Sync + Send>,
invalidated: AtomicBool,
} }
fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result<Self> { impl<A, R> Drop for Stub<A, R> {
let created_at = index.created_at(txn)?; fn drop(&mut self) {
let updated_at = index.updated_at(txn)?; if !self.invalidated.load(Ordering::Relaxed) {
let primary_key = index.primary_key(txn)?.map(String::from); let lock = self.times.lock().unwrap();
Ok(Self { if let Some(n) = *lock {
created_at, assert_eq!(n, 0, "{} not called enough times", self.name);
updated_at, }
primary_key, }
})
} }
}
#[derive(Clone, derivative::Derivative)]
#[derivative(Debug)]
pub struct Index {
pub uuid: Uuid,
#[derivative(Debug = "ignore")]
pub inner: Arc<milli::Index>,
#[derivative(Debug = "ignore")]
update_file_store: Arc<UpdateFileStore>,
#[derivative(Debug = "ignore")]
update_handler: Arc<UpdateHandler>,
}
impl Deref for Index {
type Target = milli::Index;
fn deref(&self) -> &Self::Target {
self.inner.as_ref()
} }
}
impl Index { impl<A, R> Stub<A, R> {
fn invalidate(&self) {
self.invalidated.store(true, Ordering::Relaxed);
}
}
impl<A: UnwindSafe, R> Stub<A, R> {
fn call(&self, args: A) -> R {
let mut lock = self.times.lock().unwrap();
match *lock {
Some(0) => panic!("{} called to many times", self.name),
Some(ref mut times) => {
*times -= 1;
}
None => (),
}
// Since we add assertions in the drop implementation for Stub, a panic can occur in a
// panic, causing a hard abort of the program. To handle that, we catch the panic, and
// set the stub as invalidated so the assertions aren't run during the drop.
impl<'a, A, R> RefUnwindSafe for StubHolder<'a, A, R> {}
struct StubHolder<'a, A, R>(&'a (dyn Fn(A) -> R + Sync + Send));
let stub = StubHolder(self.stub.as_ref());
match std::panic::catch_unwind(|| (stub.0)(args)) {
Ok(r) => r,
Err(panic) => {
self.invalidate();
std::panic::resume_unwind(panic);
}
}
}
}
#[derive(Debug, Default)]
struct StubStore {
inner: Arc<Mutex<HashMap<String, Box<dyn Any + Sync + Send>>>>,
}
impl StubStore {
pub fn insert<A: 'static, R: 'static>(&self, name: String, stub: Stub<A, R>) {
let mut lock = self.inner.lock().unwrap();
lock.insert(name, Box::new(stub));
}
pub fn get<A, B>(&self, name: &str) -> Option<&Stub<A, B>> {
let mut lock = self.inner.lock().unwrap();
match lock.get_mut(name) {
Some(s) => {
let s = s.as_mut() as *mut dyn Any as *mut Stub<A, B>;
Some(unsafe { &mut *s })
}
None => None,
}
}
}
pub struct StubBuilder<'a, A, R> {
name: String,
store: &'a StubStore,
times: Option<usize>,
_f: std::marker::PhantomData<fn(A) -> R>,
}
impl<'a, A: 'static, R: 'static> StubBuilder<'a, A, R> {
/// Asserts the stub has been called exactly `times` times.
#[must_use]
pub fn times(mut self, times: usize) -> Self {
self.times = Some(times);
self
}
/// Asserts the stub has been called exactly once.
#[must_use]
pub fn once(mut self) -> Self {
self.times = Some(1);
self
}
/// The function that will be called when the stub is called. This needs to be called to
/// actually build the stub and register it to the stub store.
pub fn then(self, f: impl Fn(A) -> R + Sync + Send + 'static) {
let times = Mutex::new(self.times);
let stub = Stub {
stub: Box::new(f),
times,
name: self.name.clone(),
invalidated: AtomicBool::new(false),
};
self.store.insert(self.name, stub);
}
}
/// Mocker allows to stub metod call on any struct. you can register stubs by calling
/// `Mocker::when` and retrieve it in the proxy implementation when with `Mocker::get`.
#[derive(Debug, Default)]
pub struct Mocker {
store: StubStore,
}
impl Mocker {
pub fn when<A, R>(&self, name: &str) -> StubBuilder<A, R> {
StubBuilder {
name: name.to_string(),
store: &self.store,
times: None,
_f: std::marker::PhantomData,
}
}
pub fn get<A, R>(&self, name: &str) -> &Stub<A, R> {
match self.store.get(name) {
Some(stub) => stub,
None => {
// panic here causes the stubs to get dropped, and panic in turn. To prevent
// that, we forget them, and let them be cleaned by the os later. This is not
// optimal, but is still better than nested panicks.
let mut stubs = self.store.inner.lock().unwrap();
let stubs = std::mem::take(&mut *stubs);
std::mem::forget(stubs);
panic!("unexpected call to {}", name)
}
}
}
}
#[derive(Debug, Clone)]
pub enum MockIndex {
Vrai(Index),
Faux(Arc<Mocker>),
}
impl MockIndex {
pub fn faux(faux: Mocker) -> Self {
Self::Faux(Arc::new(faux))
}
pub fn open( pub fn open(
path: impl AsRef<Path>, path: impl AsRef<Path>,
size: usize, size: usize,
@ -99,98 +205,52 @@ impl Index {
uuid: Uuid, uuid: Uuid,
update_handler: Arc<UpdateHandler>, update_handler: Arc<UpdateHandler>,
) -> Result<Self> { ) -> Result<Self> {
create_dir_all(&path)?; let index = Index::open(path, size, update_file_store, uuid, update_handler)?;
let mut options = EnvOpenOptions::new(); Ok(Self::Vrai(index))
options.map_size(size); }
let inner = Arc::new(milli::Index::new(options, &path)?);
Ok(Index { pub fn load_dump(
inner, src: impl AsRef<Path>,
update_file_store, dst: impl AsRef<Path>,
uuid, size: usize,
update_handler, update_handler: &UpdateHandler,
}) ) -> anyhow::Result<()> {
Index::load_dump(src, dst, size, update_handler)?;
Ok(())
}
pub fn handle_update(&self, update: Processing) -> std::result::Result<Processed, Failed> {
match self {
MockIndex::Vrai(index) => index.handle_update(update),
MockIndex::Faux(faux) => faux.get("handle_update").call(update),
}
}
pub fn uuid(&self) -> Uuid {
match self {
MockIndex::Vrai(index) => index.uuid(),
MockIndex::Faux(faux) => faux.get("uuid").call(()),
}
} }
pub fn stats(&self) -> Result<IndexStats> { pub fn stats(&self) -> Result<IndexStats> {
let rtxn = self.read_txn()?; match self {
MockIndex::Vrai(index) => index.stats(),
Ok(IndexStats { MockIndex::Faux(_) => todo!(),
size: self.size(), }
number_of_documents: self.number_of_documents(&rtxn)?,
is_indexing: None,
field_distribution: self.field_distribution(&rtxn)?,
})
} }
pub fn meta(&self) -> Result<IndexMeta> { pub fn meta(&self) -> Result<IndexMeta> {
IndexMeta::new(self) match self {
MockIndex::Vrai(index) => index.meta(),
MockIndex::Faux(_) => todo!(),
}
} }
pub fn settings(&self) -> Result<Settings<Checked>> { pub fn settings(&self) -> Result<Settings<Checked>> {
let txn = self.read_txn()?; match self {
self.settings_txn(&txn) MockIndex::Vrai(index) => index.settings(),
MockIndex::Faux(_) => todo!(),
} }
pub fn settings_txn(&self, txn: &RoTxn) -> Result<Settings<Checked>> {
let displayed_attributes = self
.displayed_fields(txn)?
.map(|fields| fields.into_iter().map(String::from).collect());
let searchable_attributes = self
.searchable_fields(txn)?
.map(|fields| fields.into_iter().map(String::from).collect());
let filterable_attributes = self.filterable_fields(txn)?.into_iter().collect();
let sortable_attributes = self.sortable_fields(txn)?.into_iter().collect();
let criteria = self
.criteria(txn)?
.into_iter()
.map(|c| c.to_string())
.collect();
let stop_words = self
.stop_words(txn)?
.map(|stop_words| -> Result<BTreeSet<_>> {
Ok(stop_words.stream().into_strs()?.into_iter().collect())
})
.transpose()?
.unwrap_or_else(BTreeSet::new);
let distinct_field = self.distinct_field(txn)?.map(String::from);
// in milli each word in the synonyms map were split on their separator. Since we lost
// this information we are going to put space between words.
let synonyms = self
.synonyms(txn)?
.iter()
.map(|(key, values)| {
(
key.join(" "),
values.iter().map(|value| value.join(" ")).collect(),
)
})
.collect();
Ok(Settings {
displayed_attributes: match displayed_attributes {
Some(attrs) => Setting::Set(attrs),
None => Setting::Reset,
},
searchable_attributes: match searchable_attributes {
Some(attrs) => Setting::Set(attrs),
None => Setting::Reset,
},
filterable_attributes: Setting::Set(filterable_attributes),
sortable_attributes: Setting::Set(sortable_attributes),
ranking_rules: Setting::Set(criteria),
stop_words: Setting::Set(stop_words),
distinct_attribute: match distinct_field {
Some(field) => Setting::Set(field),
None => Setting::Reset,
},
synonyms: Setting::Set(synonyms),
_kind: PhantomData,
})
} }
pub fn retrieve_documents<S: AsRef<str>>( pub fn retrieve_documents<S: AsRef<str>>(
@ -199,23 +259,12 @@ impl Index {
limit: usize, limit: usize,
attributes_to_retrieve: Option<Vec<S>>, attributes_to_retrieve: Option<Vec<S>>,
) -> Result<Vec<Map<String, Value>>> { ) -> Result<Vec<Map<String, Value>>> {
let txn = self.read_txn()?; match self {
MockIndex::Vrai(index) => {
let fields_ids_map = self.fields_ids_map(&txn)?; index.retrieve_documents(offset, limit, attributes_to_retrieve)
let fields_to_display = }
self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?; MockIndex::Faux(_) => todo!(),
let iter = self.documents.range(&txn, &(..))?.skip(offset).take(limit);
let mut documents = Vec::new();
for entry in iter {
let (_id, obkv) = entry?;
let object = obkv_to_json(&fields_to_display, &fields_ids_map, obkv)?;
documents.push(object);
} }
Ok(documents)
} }
pub fn retrieve_document<S: AsRef<str>>( pub fn retrieve_document<S: AsRef<str>>(
@ -223,65 +272,94 @@ impl Index {
doc_id: String, doc_id: String,
attributes_to_retrieve: Option<Vec<S>>, attributes_to_retrieve: Option<Vec<S>>,
) -> Result<Map<String, Value>> { ) -> Result<Map<String, Value>> {
let txn = self.read_txn()?; match self {
MockIndex::Vrai(index) => index.retrieve_document(doc_id, attributes_to_retrieve),
let fields_ids_map = self.fields_ids_map(&txn)?; MockIndex::Faux(_) => todo!(),
}
let fields_to_display =
self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?;
let internal_id = self
.external_documents_ids(&txn)?
.get(doc_id.as_bytes())
.ok_or_else(|| IndexError::DocumentNotFound(doc_id.clone()))?;
let document = self
.documents(&txn, std::iter::once(internal_id))?
.into_iter()
.next()
.map(|(_, d)| d)
.ok_or(IndexError::DocumentNotFound(doc_id))?;
let document = obkv_to_json(&fields_to_display, &fields_ids_map, document)?;
Ok(document)
} }
pub fn size(&self) -> u64 { pub fn size(&self) -> u64 {
self.env.size() match self {
MockIndex::Vrai(index) => index.size(),
MockIndex::Faux(_) => todo!(),
} }
fn fields_to_display<S: AsRef<str>>(
&self,
txn: &heed::RoTxn,
attributes_to_retrieve: &Option<Vec<S>>,
fields_ids_map: &milli::FieldsIdsMap,
) -> Result<Vec<FieldId>> {
let mut displayed_fields_ids = match self.displayed_fields_ids(txn)? {
Some(ids) => ids.into_iter().collect::<Vec<_>>(),
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
};
let attributes_to_retrieve_ids = match attributes_to_retrieve {
Some(attrs) => attrs
.iter()
.filter_map(|f| fields_ids_map.id(f.as_ref()))
.collect::<HashSet<_>>(),
None => fields_ids_map.iter().map(|(id, _)| id).collect(),
};
displayed_fields_ids.retain(|fid| attributes_to_retrieve_ids.contains(fid));
Ok(displayed_fields_ids)
} }
pub fn snapshot(&self, path: impl AsRef<Path>) -> Result<()> { pub fn snapshot(&self, path: impl AsRef<Path>) -> Result<()> {
let mut dst = path.as_ref().join(format!("indexes/{}/", self.uuid)); match self {
create_dir_all(&dst)?; MockIndex::Vrai(index) => index.snapshot(path),
dst.push("data.mdb"); MockIndex::Faux(faux) => faux.get("snapshot").call(path.as_ref()),
let _txn = self.write_txn()?; }
self.inner }
.env
.copy_to_path(dst, heed::CompactionOption::Enabled)?; pub fn inner(&self) -> &milli::Index {
Ok(()) match self {
MockIndex::Vrai(index) => index.inner(),
MockIndex::Faux(_) => todo!(),
}
}
pub fn update_primary_key(&self, primary_key: Option<String>) -> Result<IndexMeta> {
match self {
MockIndex::Vrai(index) => index.update_primary_key(primary_key),
MockIndex::Faux(_) => todo!(),
}
}
pub fn perform_search(&self, query: SearchQuery) -> Result<SearchResult> {
match self {
MockIndex::Vrai(index) => index.perform_search(query),
MockIndex::Faux(faux) => faux.get("perform_search").call(query),
}
}
pub fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
match self {
MockIndex::Vrai(index) => index.dump(path),
MockIndex::Faux(faux) => faux.get("dump").call(path.as_ref()),
}
}
}
#[test]
fn test_faux_index() {
let faux = Mocker::default();
faux.when("snapshot")
.times(2)
.then(|_: &Path| -> Result<()> { Ok(()) });
let index = MockIndex::faux(faux);
let path = PathBuf::from("hello");
index.snapshot(&path).unwrap();
index.snapshot(&path).unwrap();
}
#[test]
#[should_panic]
fn test_faux_unexisting_method_stub() {
let faux = Mocker::default();
let index = MockIndex::faux(faux);
let path = PathBuf::from("hello");
index.snapshot(&path).unwrap();
index.snapshot(&path).unwrap();
}
#[test]
#[should_panic]
fn test_faux_panic() {
let faux = Mocker::default();
faux.when("snapshot")
.times(2)
.then(|_: &Path| -> Result<()> {
panic!();
});
let index = MockIndex::faux(faux);
let path = PathBuf::from("hello");
index.snapshot(&path).unwrap();
index.snapshot(&path).unwrap();
} }
} }

View File

@ -12,15 +12,14 @@ use serde::{Deserialize, Serialize};
use serde_json::{json, Value}; use serde_json::{json, Value};
use crate::index::error::FacetError; use crate::index::error::FacetError;
use crate::index::IndexError;
use super::error::Result; use super::error::{IndexError, Result};
use super::Index; use super::index::Index;
pub type Document = IndexMap<String, Value>; pub type Document = IndexMap<String, Value>;
type MatchesInfo = BTreeMap<String, Vec<MatchInfo>>; type MatchesInfo = BTreeMap<String, Vec<MatchInfo>>;
#[derive(Serialize, Debug, Clone)] #[derive(Serialize, Debug, Clone, PartialEq)]
pub struct MatchInfo { pub struct MatchInfo {
start: usize, start: usize,
length: usize, length: usize,
@ -36,7 +35,7 @@ pub const fn default_crop_length() -> usize {
DEFAULT_CROP_LENGTH DEFAULT_CROP_LENGTH
} }
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug, Clone, PartialEq)]
#[serde(rename_all = "camelCase", deny_unknown_fields)] #[serde(rename_all = "camelCase", deny_unknown_fields)]
pub struct SearchQuery { pub struct SearchQuery {
pub q: Option<String>, pub q: Option<String>,
@ -56,7 +55,7 @@ pub struct SearchQuery {
pub facets_distribution: Option<Vec<String>>, pub facets_distribution: Option<Vec<String>>,
} }
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize, PartialEq)]
pub struct SearchHit { pub struct SearchHit {
#[serde(flatten)] #[serde(flatten)]
pub document: Document, pub document: Document,
@ -66,7 +65,7 @@ pub struct SearchHit {
pub matches_info: Option<MatchesInfo>, pub matches_info: Option<MatchesInfo>,
} }
#[derive(Serialize, Debug)] #[derive(Serialize, Debug, Clone, PartialEq)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct SearchResult { pub struct SearchResult {
pub hits: Vec<SearchHit>, pub hits: Vec<SearchHit>,

View File

@ -12,7 +12,7 @@ use crate::index_controller::updates::status::{Failed, Processed, Processing, Up
use crate::Update; use crate::Update;
use super::error::{IndexError, Result}; use super::error::{IndexError, Result};
use super::{Index, IndexMeta}; use super::index::{Index, IndexMeta};
fn serialize_with_wildcard<S>( fn serialize_with_wildcard<S>(
field: &Setting<Vec<String>>, field: &Setting<Vec<String>>,

View File

@ -10,14 +10,16 @@ use tokio::sync::{mpsc, oneshot, RwLock};
use super::error::{DumpActorError, Result}; use super::error::{DumpActorError, Result};
use super::{DumpInfo, DumpMsg, DumpStatus, DumpTask}; use super::{DumpInfo, DumpMsg, DumpStatus, DumpTask};
use crate::index_controller::index_resolver::HardStateIndexResolver; use crate::index_controller::index_resolver::index_store::IndexStore;
use crate::index_controller::index_resolver::uuid_store::UuidStore;
use crate::index_controller::index_resolver::IndexResolver;
use crate::index_controller::updates::UpdateSender; use crate::index_controller::updates::UpdateSender;
pub const CONCURRENT_DUMP_MSG: usize = 10; pub const CONCURRENT_DUMP_MSG: usize = 10;
pub struct DumpActor { pub struct DumpActor<U, I> {
inbox: Option<mpsc::Receiver<DumpMsg>>, inbox: Option<mpsc::Receiver<DumpMsg>>,
index_resolver: Arc<HardStateIndexResolver>, index_resolver: Arc<IndexResolver<U, I>>,
update: UpdateSender, update: UpdateSender,
dump_path: PathBuf, dump_path: PathBuf,
lock: Arc<Mutex<()>>, lock: Arc<Mutex<()>>,
@ -31,10 +33,14 @@ fn generate_uid() -> String {
Utc::now().format("%Y%m%d-%H%M%S%3f").to_string() Utc::now().format("%Y%m%d-%H%M%S%3f").to_string()
} }
impl DumpActor { impl<U, I> DumpActor<U, I>
where
U: UuidStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
pub fn new( pub fn new(
inbox: mpsc::Receiver<DumpMsg>, inbox: mpsc::Receiver<DumpMsg>,
index_resolver: Arc<HardStateIndexResolver>, index_resolver: Arc<IndexResolver<U, I>>,
update: UpdateSender, update: UpdateSender,
dump_path: impl AsRef<Path>, dump_path: impl AsRef<Path>,
index_db_size: usize, index_db_size: usize,
@ -114,7 +120,7 @@ impl DumpActor {
let task = DumpTask { let task = DumpTask {
path: self.dump_path.clone(), path: self.dump_path.clone(),
index_resolver: self.index_resolver.clone(), index_resolver: self.index_resolver.clone(),
update_handle: self.update.clone(), update_sender: self.update.clone(),
uid: uid.clone(), uid: uid.clone(),
update_db_size: self.update_db_size, update_db_size: self.update_db_size,
index_db_size: self.index_db_size, index_db_size: self.index_db_size,

View File

@ -4,7 +4,6 @@ use std::path::{Path, PathBuf};
use serde_json::{Deserializer, Value}; use serde_json::{Deserializer, Value};
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use uuid::Uuid;
use crate::index_controller::dump_actor::loaders::compat::{asc_ranking_rule, desc_ranking_rule}; use crate::index_controller::dump_actor::loaders::compat::{asc_ranking_rule, desc_ranking_rule};
use crate::index_controller::dump_actor::Metadata; use crate::index_controller::dump_actor::Metadata;
@ -200,7 +199,7 @@ impl From<compat::Enqueued> for Enqueued {
method, method,
// Just ignore if the uuid is no present. If it is needed later, an error will // Just ignore if the uuid is no present. If it is needed later, an error will
// be thrown. // be thrown.
content_uuid: content.unwrap_or_else(Uuid::default), content_uuid: content.unwrap_or_default(),
} }
} }
compat::UpdateMeta::ClearDocuments => Update::ClearDocuments, compat::UpdateMeta::ClearDocuments => Update::ClearDocuments,

View File

@ -13,7 +13,9 @@ pub use actor::DumpActor;
pub use handle_impl::*; pub use handle_impl::*;
pub use message::DumpMsg; pub use message::DumpMsg;
use super::index_resolver::HardStateIndexResolver; use super::index_resolver::index_store::IndexStore;
use super::index_resolver::uuid_store::UuidStore;
use super::index_resolver::IndexResolver;
use super::updates::UpdateSender; use super::updates::UpdateSender;
use crate::compression::{from_tar_gz, to_tar_gz}; use crate::compression::{from_tar_gz, to_tar_gz};
use crate::index_controller::dump_actor::error::DumpActorError; use crate::index_controller::dump_actor::error::DumpActorError;
@ -51,6 +53,7 @@ impl Metadata {
} }
#[async_trait::async_trait] #[async_trait::async_trait]
#[cfg_attr(test, mockall::automock)]
pub trait DumpActorHandle { pub trait DumpActorHandle {
/// Start the creation of a dump /// Start the creation of a dump
/// Implementation: [handle_impl::DumpActorHandleImpl::create_dump] /// Implementation: [handle_impl::DumpActorHandleImpl::create_dump]
@ -218,16 +221,20 @@ pub fn load_dump(
Ok(()) Ok(())
} }
struct DumpTask { struct DumpTask<U, I> {
path: PathBuf, path: PathBuf,
index_resolver: Arc<HardStateIndexResolver>, index_resolver: Arc<IndexResolver<U, I>>,
update_handle: UpdateSender, update_sender: UpdateSender,
uid: String, uid: String,
update_db_size: usize, update_db_size: usize,
index_db_size: usize, index_db_size: usize,
} }
impl DumpTask { impl<U, I> DumpTask<U, I>
where
U: UuidStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
async fn run(self) -> Result<()> { async fn run(self) -> Result<()> {
trace!("Performing dump."); trace!("Performing dump.");
@ -243,7 +250,7 @@ impl DumpTask {
let uuids = self.index_resolver.dump(temp_dump_path.clone()).await?; let uuids = self.index_resolver.dump(temp_dump_path.clone()).await?;
UpdateMsg::dump(&self.update_handle, uuids, temp_dump_path.clone()).await?; UpdateMsg::dump(&self.update_sender, uuids, temp_dump_path.clone()).await?;
let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> { let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new()?; let temp_dump_file = tempfile::NamedTempFile::new()?;
@ -262,3 +269,110 @@ impl DumpTask {
Ok(()) Ok(())
} }
} }
#[cfg(test)]
mod test {
use std::collections::HashSet;
use futures::future::{err, ok};
use once_cell::sync::Lazy;
use uuid::Uuid;
use super::*;
use crate::index::error::Result as IndexResult;
use crate::index::test::Mocker;
use crate::index::Index;
use crate::index_controller::index_resolver::error::IndexResolverError;
use crate::index_controller::index_resolver::index_store::MockIndexStore;
use crate::index_controller::index_resolver::uuid_store::MockUuidStore;
use crate::index_controller::updates::create_update_handler;
fn setup() {
static SETUP: Lazy<()> = Lazy::new(|| {
if cfg!(windows) {
std::env::set_var("TMP", ".");
} else {
std::env::set_var("TMPDIR", ".");
}
});
// just deref to make sure the env is setup
*SETUP
}
#[actix_rt::test]
async fn test_dump_normal() {
setup();
let tmp = tempfile::tempdir().unwrap();
let uuids = std::iter::repeat_with(Uuid::new_v4)
.take(4)
.collect::<HashSet<_>>();
let mut uuid_store = MockUuidStore::new();
let uuids_cloned = uuids.clone();
uuid_store
.expect_dump()
.once()
.returning(move |_| Box::pin(ok(uuids_cloned.clone())));
let mut index_store = MockIndexStore::new();
index_store.expect_get().times(4).returning(move |uuid| {
let mocker = Mocker::default();
let uuids_clone = uuids.clone();
mocker.when::<(), Uuid>("uuid").once().then(move |_| {
assert!(uuids_clone.contains(&uuid));
uuid
});
mocker
.when::<&Path, IndexResult<()>>("dump")
.once()
.then(move |_| Ok(()));
Box::pin(ok(Some(Index::faux(mocker))))
});
let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
let update_sender =
create_update_handler(index_resolver.clone(), tmp.path(), 4096 * 100).unwrap();
let task = DumpTask {
path: tmp.path().to_owned(),
index_resolver,
update_sender,
uid: String::from("test"),
update_db_size: 4096 * 10,
index_db_size: 4096 * 10,
};
task.run().await.unwrap();
}
#[actix_rt::test]
async fn error_performing_dump() {
let tmp = tempfile::tempdir().unwrap();
let mut uuid_store = MockUuidStore::new();
uuid_store
.expect_dump()
.once()
.returning(move |_| Box::pin(err(IndexResolverError::ExistingPrimaryKey)));
let index_store = MockIndexStore::new();
let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
let update_sender =
create_update_handler(index_resolver.clone(), tmp.path(), 4096 * 100).unwrap();
let task = DumpTask {
path: tmp.path().to_owned(),
index_resolver,
update_sender,
uid: String::from("test"),
update_db_size: 4096 * 10,
index_db_size: 4096 * 10,
};
assert!(task.run().await.is_err());
}
}

View File

@ -17,6 +17,7 @@ use crate::options::IndexerOpts;
type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>; type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
#[async_trait::async_trait] #[async_trait::async_trait]
#[cfg_attr(test, mockall::automock)]
pub trait IndexStore { pub trait IndexStore {
async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index>; async fn create(&self, uuid: Uuid, primary_key: Option<String>) -> Result<Index>;
async fn get(&self, uuid: Uuid) -> Result<Option<Index>>; async fn get(&self, uuid: Uuid) -> Result<Option<Index>>;
@ -72,9 +73,10 @@ impl IndexStore for MapIndexStore {
let index = spawn_blocking(move || -> Result<Index> { let index = spawn_blocking(move || -> Result<Index> {
let index = Index::open(path, index_size, file_store, uuid, update_handler)?; let index = Index::open(path, index_size, file_store, uuid, update_handler)?;
if let Some(primary_key) = primary_key { if let Some(primary_key) = primary_key {
let mut txn = index.write_txn()?; let inner = index.inner();
let mut txn = inner.write_txn()?;
let mut builder = UpdateBuilder::new(0).settings(&mut txn, &index); let mut builder = UpdateBuilder::new(0).settings(&mut txn, index.inner());
builder.set_primary_key(primary_key); builder.set_primary_key(primary_key);
builder.execute(|_, _| ())?; builder.execute(|_, _| ())?;

View File

@ -1,11 +1,12 @@
pub mod error; pub mod error;
mod index_store; pub mod index_store;
pub mod uuid_store; pub mod uuid_store;
use std::path::Path; use std::path::Path;
use error::{IndexResolverError, Result}; use error::{IndexResolverError, Result};
use index_store::{IndexStore, MapIndexStore}; use index_store::{IndexStore, MapIndexStore};
use log::error;
use uuid::Uuid; use uuid::Uuid;
use uuid_store::{HeedUuidStore, UuidStore}; use uuid_store::{HeedUuidStore, UuidStore};
@ -98,8 +99,19 @@ where
} }
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let index = self.index_store.create(uuid, primary_key).await?; let index = self.index_store.create(uuid, primary_key).await?;
self.index_uuid_store.insert(uid, uuid).await?; match self.index_uuid_store.insert(uid, uuid).await {
Ok(index) Err(e) => {
match self.index_store.delete(uuid).await {
Ok(Some(index)) => {
index.inner().clone().prepare_for_closing();
}
Ok(None) => (),
Err(e) => error!("Error while deleting index: {:?}", e),
}
Err(e)
}
Ok(()) => Ok(index),
}
} }
pub async fn list(&self) -> Result<Vec<(String, Index)>> { pub async fn list(&self) -> Result<Vec<(String, Index)>> {
@ -121,7 +133,13 @@ where
pub async fn delete_index(&self, uid: String) -> Result<Uuid> { pub async fn delete_index(&self, uid: String) -> Result<Uuid> {
match self.index_uuid_store.delete(uid.clone()).await? { match self.index_uuid_store.delete(uid.clone()).await? {
Some(uuid) => { Some(uuid) => {
let _ = self.index_store.delete(uuid).await; match self.index_store.delete(uuid).await {
Ok(Some(index)) => {
index.inner().clone().prepare_for_closing();
}
Ok(None) => (),
Err(e) => error!("Error while deleting index: {:?}", e),
}
Ok(uuid) Ok(uuid)
} }
None => Err(IndexResolverError::UnexistingIndex(uid)), None => Err(IndexResolverError::UnexistingIndex(uid)),

View File

@ -22,6 +22,7 @@ struct DumpEntry {
const UUIDS_DB_PATH: &str = "index_uuids"; const UUIDS_DB_PATH: &str = "index_uuids";
#[async_trait::async_trait] #[async_trait::async_trait]
#[cfg_attr(test, mockall::automock)]
pub trait UuidStore: Sized { pub trait UuidStore: Sized {
// Create a new entry for `name`. Return an error if `err` and the entry already exists, return // Create a new entry for `name`. Return an error if `err` and the entry already exists, return
// the uuid otherwise. // the uuid otherwise.

View File

@ -30,7 +30,9 @@ use error::Result;
use self::dump_actor::load_dump; use self::dump_actor::load_dump;
use self::index_resolver::error::IndexResolverError; use self::index_resolver::error::IndexResolverError;
use self::index_resolver::HardStateIndexResolver; use self::index_resolver::index_store::{IndexStore, MapIndexStore};
use self::index_resolver::uuid_store::{HeedUuidStore, UuidStore};
use self::index_resolver::IndexResolver;
use self::updates::status::UpdateStatus; use self::updates::status::UpdateStatus;
use self::updates::UpdateMsg; use self::updates::UpdateMsg;
@ -41,6 +43,10 @@ mod snapshot;
pub mod update_file_store; pub mod update_file_store;
pub mod updates; pub mod updates;
/// Concrete implementation of the IndexController, exposed by meilisearch-lib
pub type MeiliSearch =
IndexController<HeedUuidStore, MapIndexStore, dump_actor::DumpActorHandleImpl>;
pub type Payload = Box< pub type Payload = Box<
dyn Stream<Item = std::result::Result<Bytes, PayloadError>> + Send + Sync + 'static + Unpin, dyn Stream<Item = std::result::Result<Bytes, PayloadError>> + Send + Sync + 'static + Unpin,
>; >;
@ -62,13 +68,6 @@ pub struct IndexSettings {
pub primary_key: Option<String>, pub primary_key: Option<String>,
} }
#[derive(Clone)]
pub struct IndexController {
index_resolver: Arc<HardStateIndexResolver>,
update_sender: updates::UpdateSender,
dump_handle: dump_actor::DumpActorHandleImpl,
}
#[derive(Debug)] #[derive(Debug)]
pub enum DocumentAdditionFormat { pub enum DocumentAdditionFormat {
Json, Json,
@ -129,7 +128,7 @@ impl IndexControllerBuilder {
self, self,
db_path: impl AsRef<Path>, db_path: impl AsRef<Path>,
indexer_options: IndexerOpts, indexer_options: IndexerOpts,
) -> anyhow::Result<IndexController> { ) -> anyhow::Result<MeiliSearch> {
let index_size = self let index_size = self
.max_index_size .max_index_size
.ok_or_else(|| anyhow::anyhow!("Missing index size"))?; .ok_or_else(|| anyhow::anyhow!("Missing index size"))?;
@ -178,6 +177,8 @@ impl IndexControllerBuilder {
update_store_size, update_store_size,
)?; )?;
let dump_handle = Arc::new(dump_handle);
if self.schedule_snapshot { if self.schedule_snapshot {
let snapshot_service = SnapshotService::new( let snapshot_service = SnapshotService::new(
index_resolver.clone(), index_resolver.clone(),
@ -266,7 +267,22 @@ impl IndexControllerBuilder {
} }
} }
impl IndexController { // We are using derivative here to derive Clone, because U, I and D do not necessarily implement
// Clone themselves.
#[derive(derivative::Derivative)]
#[derivative(Clone(bound = ""))]
pub struct IndexController<U, I, D> {
index_resolver: Arc<IndexResolver<U, I>>,
update_sender: updates::UpdateSender,
dump_handle: Arc<D>,
}
impl<U, I, D> IndexController<U, I, D>
where
U: UuidStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
D: DumpActorHandle + Send + Sync,
{
pub fn builder() -> IndexControllerBuilder { pub fn builder() -> IndexControllerBuilder {
IndexControllerBuilder::default() IndexControllerBuilder::default()
} }
@ -286,7 +302,7 @@ impl IndexController {
if create_index { if create_index {
let index = self.index_resolver.create_index(name, None).await?; let index = self.index_resolver.create_index(name, None).await?;
let update_result = let update_result =
UpdateMsg::update(&self.update_sender, index.uuid, update).await?; UpdateMsg::update(&self.update_sender, index.uuid(), update).await?;
Ok(update_result) Ok(update_result)
} else { } else {
Err(IndexResolverError::UnexistingIndex(name).into()) Err(IndexResolverError::UnexistingIndex(name).into())
@ -314,7 +330,7 @@ impl IndexController {
for (uid, index) in indexes { for (uid, index) in indexes {
let meta = index.meta()?; let meta = index.meta()?;
let meta = IndexMetadata { let meta = IndexMetadata {
uuid: index.uuid, uuid: index.uuid(),
name: uid.clone(), name: uid.clone(),
uid, uid,
meta, meta,
@ -366,7 +382,7 @@ impl IndexController {
index_settings.uid.take(); index_settings.uid.take();
let index = self.index_resolver.get_index(uid.clone()).await?; let index = self.index_resolver.get_index(uid.clone()).await?;
let uuid = index.uuid; let uuid = index.uuid();
let meta = let meta =
spawn_blocking(move || index.update_primary_key(index_settings.primary_key)).await??; spawn_blocking(move || index.update_primary_key(index_settings.primary_key)).await??;
let meta = IndexMetadata { let meta = IndexMetadata {
@ -386,7 +402,7 @@ impl IndexController {
pub async fn get_index(&self, uid: String) -> Result<IndexMetadata> { pub async fn get_index(&self, uid: String) -> Result<IndexMetadata> {
let index = self.index_resolver.get_index(uid.clone()).await?; let index = self.index_resolver.get_index(uid.clone()).await?;
let uuid = index.uuid; let uuid = index.uuid();
let meta = spawn_blocking(move || index.meta()).await??; let meta = spawn_blocking(move || index.meta()).await??;
let meta = IndexMetadata { let meta = IndexMetadata {
uuid, uuid,
@ -400,7 +416,7 @@ impl IndexController {
pub async fn get_index_stats(&self, uid: String) -> Result<IndexStats> { pub async fn get_index_stats(&self, uid: String) -> Result<IndexStats> {
let update_infos = UpdateMsg::get_info(&self.update_sender).await?; let update_infos = UpdateMsg::get_info(&self.update_sender).await?;
let index = self.index_resolver.get_index(uid).await?; let index = self.index_resolver.get_index(uid).await?;
let uuid = index.uuid; let uuid = index.uuid();
let mut stats = spawn_blocking(move || index.stats()).await??; let mut stats = spawn_blocking(move || index.stats()).await??;
// Check if the currently indexing update is from our index. // Check if the currently indexing update is from our index.
stats.is_indexing = Some(Some(uuid) == update_infos.processing); stats.is_indexing = Some(Some(uuid) == update_infos.processing);
@ -414,7 +430,7 @@ impl IndexController {
let mut indexes = BTreeMap::new(); let mut indexes = BTreeMap::new();
for (index_uid, index) in self.index_resolver.list().await? { for (index_uid, index) in self.index_resolver.list().await? {
let uuid = index.uuid; let uuid = index.uuid();
let (mut stats, meta) = spawn_blocking::<_, IndexResult<_>>(move || { let (mut stats, meta) = spawn_blocking::<_, IndexResult<_>>(move || {
let stats = index.stats()?; let stats = index.stats()?;
let meta = index.meta()?; let meta = index.meta()?;
@ -461,7 +477,7 @@ impl IndexController {
let meta = spawn_blocking(move || -> IndexResult<_> { let meta = spawn_blocking(move || -> IndexResult<_> {
let meta = index.meta()?; let meta = index.meta()?;
let meta = IndexMetadata { let meta = IndexMetadata {
uuid: index.uuid, uuid: index.uuid(),
uid: uid.clone(), uid: uid.clone(),
name: uid, name: uid,
meta, meta,
@ -497,3 +513,103 @@ pub async fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T {
} }
} }
} }
#[cfg(test)]
mod test {
use futures::future::ok;
use mockall::predicate::eq;
use tokio::sync::mpsc;
use crate::index::error::Result as IndexResult;
use crate::index::test::Mocker;
use crate::index::Index;
use crate::index_controller::dump_actor::MockDumpActorHandle;
use crate::index_controller::index_resolver::index_store::MockIndexStore;
use crate::index_controller::index_resolver::uuid_store::MockUuidStore;
use super::updates::UpdateSender;
use super::*;
impl<D: DumpActorHandle> IndexController<MockUuidStore, MockIndexStore, D> {
pub fn mock(
index_resolver: IndexResolver<MockUuidStore, MockIndexStore>,
update_sender: UpdateSender,
dump_handle: D,
) -> Self {
IndexController {
index_resolver: Arc::new(index_resolver),
update_sender,
dump_handle: Arc::new(dump_handle),
}
}
}
#[actix_rt::test]
async fn test_search_simple() {
let index_uid = "test";
let index_uuid = Uuid::new_v4();
let query = SearchQuery {
q: Some(String::from("hello world")),
offset: Some(10),
limit: 0,
attributes_to_retrieve: Some(vec!["string".to_owned()].into_iter().collect()),
attributes_to_crop: None,
crop_length: 18,
attributes_to_highlight: None,
matches: true,
filter: None,
sort: None,
facets_distribution: None,
};
let result = SearchResult {
hits: vec![],
nb_hits: 29,
exhaustive_nb_hits: true,
query: "hello world".to_string(),
limit: 24,
offset: 0,
processing_time_ms: 50,
facets_distribution: None,
exhaustive_facets_count: Some(true),
};
let mut uuid_store = MockUuidStore::new();
uuid_store
.expect_get_uuid()
.with(eq(index_uid.to_owned()))
.returning(move |s| Box::pin(ok((s, Some(index_uuid)))));
let mut index_store = MockIndexStore::new();
let result_clone = result.clone();
let query_clone = query.clone();
index_store
.expect_get()
.with(eq(index_uuid))
.returning(move |_uuid| {
let result = result_clone.clone();
let query = query_clone.clone();
let mocker = Mocker::default();
mocker
.when::<SearchQuery, IndexResult<SearchResult>>("perform_search")
.once()
.then(move |q| {
assert_eq!(&q, &query);
Ok(result.clone())
});
let index = Index::faux(mocker);
Box::pin(ok(Some(index)))
});
let index_resolver = IndexResolver::new(uuid_store, index_store);
let (update_sender, _) = mpsc::channel(1);
let dump_actor = MockDumpActorHandle::new();
let index_controller = IndexController::mock(index_resolver, update_sender, dump_actor);
let r = index_controller
.search(index_uid.to_owned(), query.clone())
.await
.unwrap();
assert_eq!(r, result);
}
}

View File

@ -11,20 +11,26 @@ use tokio::time::sleep;
use crate::compression::from_tar_gz; use crate::compression::from_tar_gz;
use crate::index_controller::updates::UpdateMsg; use crate::index_controller::updates::UpdateMsg;
use super::index_resolver::HardStateIndexResolver; use super::index_resolver::index_store::IndexStore;
use super::index_resolver::uuid_store::UuidStore;
use super::index_resolver::IndexResolver;
use super::updates::UpdateSender; use super::updates::UpdateSender;
pub struct SnapshotService { pub struct SnapshotService<U, I> {
index_resolver: Arc<HardStateIndexResolver>, index_resolver: Arc<IndexResolver<U, I>>,
update_sender: UpdateSender, update_sender: UpdateSender,
snapshot_period: Duration, snapshot_period: Duration,
snapshot_path: PathBuf, snapshot_path: PathBuf,
db_name: String, db_name: String,
} }
impl SnapshotService { impl<U, I> SnapshotService<U, I>
where
U: UuidStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
pub fn new( pub fn new(
index_resolver: Arc<HardStateIndexResolver>, index_resolver: Arc<IndexResolver<U, I>>,
update_sender: UpdateSender, update_sender: UpdateSender,
snapshot_period: Duration, snapshot_period: Duration,
snapshot_path: PathBuf, snapshot_path: PathBuf,
@ -125,133 +131,169 @@ pub fn load_snapshot(
} }
} }
//#[cfg(test)] #[cfg(test)]
//mod test { mod test {
//use std::iter::FromIterator; use std::{collections::HashSet, sync::Arc};
//use std::{collections::HashSet, sync::Arc};
//use futures::future::{err, ok}; use futures::future::{err, ok};
//use rand::Rng; use once_cell::sync::Lazy;
//use tokio::time::timeout; use rand::Rng;
//use uuid::Uuid; use uuid::Uuid;
//use super::*; use crate::index::error::IndexError;
use crate::index::test::Mocker;
use crate::index::{error::Result as IndexResult, Index};
use crate::index_controller::index_resolver::error::IndexResolverError;
use crate::index_controller::index_resolver::index_store::MockIndexStore;
use crate::index_controller::index_resolver::uuid_store::MockUuidStore;
use crate::index_controller::index_resolver::IndexResolver;
use crate::index_controller::updates::create_update_handler;
//#[actix_rt::test] use super::*;
//async fn test_normal() {
//let mut rng = rand::thread_rng();
//let uuids_num: usize = rng.gen_range(5..10);
//let uuids = (0..uuids_num)
//.map(|_| Uuid::new_v4())
//.collect::<HashSet<_>>();
//let mut uuid_resolver = MockUuidResolverHandle::new(); fn setup() {
//let uuids_clone = uuids.clone(); static SETUP: Lazy<()> = Lazy::new(|| {
//uuid_resolver if cfg!(windows) {
//.expect_snapshot() std::env::set_var("TMP", ".");
//.times(1) } else {
//.returning(move |_| Box::pin(ok(uuids_clone.clone()))); std::env::set_var("TMPDIR", ".");
}
});
//let uuids_clone = uuids.clone(); // just deref to make sure the env is setup
//let mut index_handle = MockIndexActorHandle::new(); *SETUP
//index_handle }
//.expect_snapshot()
//.withf(move |uuid, _path| uuids_clone.contains(uuid))
//.times(uuids_num)
//.returning(move |_, _| Box::pin(ok(())));
//let dir = tempfile::tempdir_in(".").unwrap(); #[actix_rt::test]
//let handle = Arc::new(index_handle); async fn test_normal() {
//let update_handle = setup();
//UpdateActorHandleImpl::<Vec<u8>>::new(handle.clone(), dir.path(), 4096 * 100).unwrap();
//let snapshot_path = tempfile::tempdir_in(".").unwrap(); let mut rng = rand::thread_rng();
//let snapshot_service = SnapshotService::new( let uuids_num: usize = rng.gen_range(5..10);
//uuid_resolver, let uuids = (0..uuids_num)
//update_handle, .map(|_| Uuid::new_v4())
//Duration::from_millis(100), .collect::<HashSet<_>>();
//snapshot_path.path().to_owned(),
//"data.ms".to_string(),
//);
//snapshot_service.perform_snapshot().await.unwrap(); let mut uuid_store = MockUuidStore::new();
//} let uuids_clone = uuids.clone();
uuid_store
.expect_snapshot()
.times(1)
.returning(move |_| Box::pin(ok(uuids_clone.clone())));
//#[actix_rt::test] let mut indexes = uuids.clone().into_iter().map(|uuid| {
//async fn error_performing_uuid_snapshot() { let mocker = Mocker::default();
//let mut uuid_resolver = MockUuidResolverHandle::new(); mocker
//uuid_resolver .when("snapshot")
//.expect_snapshot() .times(1)
//.times(1) .then(|_: &Path| -> IndexResult<()> { Ok(()) });
////abitrary error mocker.when("uuid").then(move |_: ()| uuid);
//.returning(|_| Box::pin(err(UuidResolverError::NameAlreadyExist))); Index::faux(mocker)
});
//let update_handle = MockUpdateActorHandle::new(); let uuids_clone = uuids.clone();
let mut index_store = MockIndexStore::new();
index_store
.expect_get()
.withf(move |uuid| uuids_clone.contains(uuid))
.times(uuids_num)
.returning(move |_| Box::pin(ok(Some(indexes.next().unwrap()))));
//let snapshot_path = tempfile::tempdir_in(".").unwrap(); let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
//let snapshot_service = SnapshotService::new(
//uuid_resolver,
//update_handle,
//Duration::from_millis(100),
//snapshot_path.path().to_owned(),
//"data.ms".to_string(),
//);
//assert!(snapshot_service.perform_snapshot().await.is_err()); let dir = tempfile::tempdir().unwrap();
////Nothing was written to the file let update_sender =
//assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); create_update_handler(index_resolver.clone(), dir.path(), 4096 * 100).unwrap();
//}
//#[actix_rt::test] let snapshot_path = tempfile::tempdir().unwrap();
//async fn error_performing_index_snapshot() { let snapshot_service = SnapshotService::new(
//let uuid = Uuid::new_v4(); index_resolver,
//let mut uuid_resolver = MockUuidResolverHandle::new(); update_sender,
//uuid_resolver Duration::from_millis(100),
//.expect_snapshot() snapshot_path.path().to_owned(),
//.times(1) "data.ms".to_string(),
//.returning(move |_| Box::pin(ok(HashSet::from_iter(Some(uuid))))); );
//let mut update_handle = MockUpdateActorHandle::new(); snapshot_service.perform_snapshot().await.unwrap();
//update_handle }
//.expect_snapshot()
////abitrary error
//.returning(|_, _| Box::pin(err(UpdateActorError::UnexistingUpdate(0))));
//let snapshot_path = tempfile::tempdir_in(".").unwrap(); #[actix_rt::test]
//let snapshot_service = SnapshotService::new( async fn error_performing_uuid_snapshot() {
//uuid_resolver, setup();
//update_handle,
//Duration::from_millis(100),
//snapshot_path.path().to_owned(),
//"data.ms".to_string(),
//);
//assert!(snapshot_service.perform_snapshot().await.is_err()); let mut uuid_store = MockUuidStore::new();
////Nothing was written to the file uuid_store
//assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); .expect_snapshot()
//} .once()
.returning(move |_| Box::pin(err(IndexResolverError::IndexAlreadyExists)));
//#[actix_rt::test] let mut index_store = MockIndexStore::new();
//async fn test_loop() { index_store.expect_get().never();
//let mut uuid_resolver = MockUuidResolverHandle::new();
//uuid_resolver
//.expect_snapshot()
////we expect the funtion to be called between 2 and 3 time in the given interval.
//.times(2..4)
////abitrary error, to short-circuit the function
//.returning(move |_| Box::pin(err(UuidResolverError::NameAlreadyExist)));
//let update_handle = MockUpdateActorHandle::new(); let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
//let snapshot_path = tempfile::tempdir_in(".").unwrap(); let dir = tempfile::tempdir().unwrap();
//let snapshot_service = SnapshotService::new( let update_sender =
//uuid_resolver, create_update_handler(index_resolver.clone(), dir.path(), 4096 * 100).unwrap();
//update_handle,
//Duration::from_millis(100),
//snapshot_path.path().to_owned(),
//"data.ms".to_string(),
//);
//let _ = timeout(Duration::from_millis(300), snapshot_service.run()).await; let snapshot_path = tempfile::tempdir().unwrap();
//} let snapshot_service = SnapshotService::new(
//} index_resolver,
update_sender,
Duration::from_millis(100),
snapshot_path.path().to_owned(),
"data.ms".to_string(),
);
assert!(snapshot_service.perform_snapshot().await.is_err());
}
#[actix_rt::test]
async fn error_performing_index_snapshot() {
setup();
let uuids: HashSet<Uuid> = vec![Uuid::new_v4()].into_iter().collect();
let mut uuid_store = MockUuidStore::new();
let uuids_clone = uuids.clone();
uuid_store
.expect_snapshot()
.once()
.returning(move |_| Box::pin(ok(uuids_clone.clone())));
let mut indexes = uuids.clone().into_iter().map(|uuid| {
let mocker = Mocker::default();
// index returns random error
mocker
.when("snapshot")
.then(|_: &Path| -> IndexResult<()> { Err(IndexError::ExistingPrimaryKey) });
mocker.when("uuid").then(move |_: ()| uuid);
Index::faux(mocker)
});
let uuids_clone = uuids.clone();
let mut index_store = MockIndexStore::new();
index_store
.expect_get()
.withf(move |uuid| uuids_clone.contains(uuid))
.once()
.returning(move |_| Box::pin(ok(Some(indexes.next().unwrap()))));
let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
let dir = tempfile::tempdir().unwrap();
let update_sender =
create_update_handler(index_resolver.clone(), dir.path(), 4096 * 100).unwrap();
let snapshot_path = tempfile::tempdir().unwrap();
let snapshot_service = SnapshotService::new(
index_resolver,
update_sender,
Duration::from_millis(100),
snapshot_path.path().to_owned(),
"data.ms".to_string(),
);
assert!(snapshot_service.perform_snapshot().await.is_err());
}
}

View File

@ -5,6 +5,7 @@ use meilisearch_error::{Code, ErrorCode};
use crate::{ use crate::{
document_formats::DocumentFormatError, document_formats::DocumentFormatError,
index::error::IndexError,
index_controller::{update_file_store::UpdateFileStoreError, DocumentAdditionFormat}, index_controller::{update_file_store::UpdateFileStoreError, DocumentAdditionFormat},
}; };
@ -28,6 +29,8 @@ pub enum UpdateLoopError {
PayloadError(#[from] actix_web::error::PayloadError), PayloadError(#[from] actix_web::error::PayloadError),
#[error("A {0} payload is missing.")] #[error("A {0} payload is missing.")]
MissingPayload(DocumentAdditionFormat), MissingPayload(DocumentAdditionFormat),
#[error("{0}")]
IndexError(#[from] IndexError),
} }
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for UpdateLoopError impl<T> From<tokio::sync::mpsc::error::SendError<T>> for UpdateLoopError
@ -58,7 +61,6 @@ impl ErrorCode for UpdateLoopError {
match self { match self {
Self::UnexistingUpdate(_) => Code::NotFound, Self::UnexistingUpdate(_) => Code::NotFound,
Self::Internal(_) => Code::Internal, Self::Internal(_) => Code::Internal,
//Self::IndexActor(e) => e.error_code(),
Self::FatalUpdateStoreError => Code::Internal, Self::FatalUpdateStoreError => Code::Internal,
Self::DocumentFormatError(error) => error.error_code(), Self::DocumentFormatError(error) => error.error_code(),
Self::PayloadError(error) => match error { Self::PayloadError(error) => match error {
@ -66,6 +68,7 @@ impl ErrorCode for UpdateLoopError {
_ => Code::Internal, _ => Code::Internal,
}, },
Self::MissingPayload(_) => Code::MissingPayload, Self::MissingPayload(_) => Code::MissingPayload,
Self::IndexError(e) => e.error_code(),
} }
} }
} }

View File

@ -26,16 +26,22 @@ use crate::index::{Index, Settings, Unchecked};
use crate::index_controller::update_file_store::UpdateFileStore; use crate::index_controller::update_file_store::UpdateFileStore;
use status::UpdateStatus; use status::UpdateStatus;
use super::index_resolver::HardStateIndexResolver; use super::index_resolver::index_store::IndexStore;
use super::index_resolver::uuid_store::UuidStore;
use super::index_resolver::IndexResolver;
use super::{DocumentAdditionFormat, Update}; use super::{DocumentAdditionFormat, Update};
pub type UpdateSender = mpsc::Sender<UpdateMsg>; pub type UpdateSender = mpsc::Sender<UpdateMsg>;
pub fn create_update_handler( pub fn create_update_handler<U, I>(
index_resolver: Arc<HardStateIndexResolver>, index_resolver: Arc<IndexResolver<U, I>>,
db_path: impl AsRef<Path>, db_path: impl AsRef<Path>,
update_store_size: usize, update_store_size: usize,
) -> anyhow::Result<UpdateSender> { ) -> anyhow::Result<UpdateSender>
where
U: UuidStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
let path = db_path.as_ref().to_owned(); let path = db_path.as_ref().to_owned();
let (sender, receiver) = mpsc::channel(100); let (sender, receiver) = mpsc::channel(100);
let actor = UpdateLoop::new(update_store_size, receiver, path, index_resolver)?; let actor = UpdateLoop::new(update_store_size, receiver, path, index_resolver)?;
@ -95,12 +101,16 @@ pub struct UpdateLoop {
} }
impl UpdateLoop { impl UpdateLoop {
pub fn new( pub fn new<U, I>(
update_db_size: usize, update_db_size: usize,
inbox: mpsc::Receiver<UpdateMsg>, inbox: mpsc::Receiver<UpdateMsg>,
path: impl AsRef<Path>, path: impl AsRef<Path>,
index_resolver: Arc<HardStateIndexResolver>, index_resolver: Arc<IndexResolver<U, I>>,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self>
where
U: UuidStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
std::fs::create_dir_all(&path)?; std::fs::create_dir_all(&path)?;

View File

@ -34,7 +34,7 @@ impl UpdateStore {
// txn must *always* be acquired after state lock, or it will dead lock. // txn must *always* be acquired after state lock, or it will dead lock.
let txn = self.env.write_txn()?; let txn = self.env.write_txn()?;
let uuids = indexes.iter().map(|i| i.uuid).collect(); let uuids = indexes.iter().map(|i| i.uuid()).collect();
self.dump_updates(&txn, &uuids, &path)?; self.dump_updates(&txn, &uuids, &path)?;

View File

@ -29,6 +29,8 @@ use codec::*;
use super::error::Result; use super::error::Result;
use super::status::{Enqueued, Processing}; use super::status::{Enqueued, Processing};
use crate::index::Index; use crate::index::Index;
use crate::index_controller::index_resolver::index_store::IndexStore;
use crate::index_controller::index_resolver::uuid_store::UuidStore;
use crate::index_controller::updates::*; use crate::index_controller::updates::*;
use crate::EnvSizer; use crate::EnvSizer;
@ -157,13 +159,17 @@ impl UpdateStore {
)) ))
} }
pub fn open( pub fn open<U, I>(
options: EnvOpenOptions, options: EnvOpenOptions,
path: impl AsRef<Path>, path: impl AsRef<Path>,
index_resolver: Arc<HardStateIndexResolver>, index_resolver: Arc<IndexResolver<U, I>>,
must_exit: Arc<AtomicBool>, must_exit: Arc<AtomicBool>,
update_file_store: UpdateFileStore, update_file_store: UpdateFileStore,
) -> anyhow::Result<Arc<Self>> { ) -> anyhow::Result<Arc<Self>>
where
U: UuidStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
let (update_store, mut notification_receiver) = let (update_store, mut notification_receiver) =
Self::new(options, path, update_file_store)?; Self::new(options, path, update_file_store)?;
let update_store = Arc::new(update_store); let update_store = Arc::new(update_store);
@ -296,10 +302,14 @@ impl UpdateStore {
/// Executes the user provided function on the next pending update (the one with the lowest id). /// Executes the user provided function on the next pending update (the one with the lowest id).
/// This is asynchronous as it let the user process the update with a read-only txn and /// This is asynchronous as it let the user process the update with a read-only txn and
/// only writing the result meta to the processed-meta store *after* it has been processed. /// only writing the result meta to the processed-meta store *after* it has been processed.
fn process_pending_update( fn process_pending_update<U, I>(
&self, &self,
index_resolver: Arc<HardStateIndexResolver>, index_resolver: Arc<IndexResolver<U, I>>,
) -> Result<Option<()>> { ) -> Result<Option<()>>
where
U: UuidStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
// Create a read transaction to be able to retrieve the pending update in order. // Create a read transaction to be able to retrieve the pending update in order.
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
let first_meta = self.pending_queue.first(&rtxn)?; let first_meta = self.pending_queue.first(&rtxn)?;
@ -325,13 +335,17 @@ impl UpdateStore {
} }
} }
fn perform_update( fn perform_update<U, I>(
&self, &self,
processing: Processing, processing: Processing,
index_resolver: Arc<HardStateIndexResolver>, index_resolver: Arc<IndexResolver<U, I>>,
index_uuid: Uuid, index_uuid: Uuid,
global_id: u64, global_id: u64,
) -> Result<Option<()>> { ) -> Result<Option<()>>
where
U: UuidStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
// Process the pending update using the provided user function. // Process the pending update using the provided user function.
let handle = Handle::current(); let handle = Handle::current();
let update_id = processing.id(); let update_id = processing.id();
@ -509,7 +523,7 @@ impl UpdateStore {
let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
let uuids: HashSet<_> = indexes.iter().map(|i| i.uuid).collect(); let uuids: HashSet<_> = indexes.iter().map(|i| i.uuid()).collect();
for entry in pendings { for entry in pendings {
let ((_, uuid, _), pending) = entry?; let ((_, uuid, _), pending) = entry?;
if uuids.contains(&uuid) { if uuids.contains(&uuid) {
@ -518,9 +532,7 @@ impl UpdateStore {
.. ..
} = pending.decode()? } = pending.decode()?
{ {
self.update_file_store self.update_file_store.snapshot(content_uuid, &path)?;
.snapshot(content_uuid, &path)
.unwrap();
} }
} }
} }
@ -528,8 +540,7 @@ impl UpdateStore {
let path = path.as_ref().to_owned(); let path = path.as_ref().to_owned();
indexes indexes
.par_iter() .par_iter()
.try_for_each(|index| index.snapshot(path.clone())) .try_for_each(|index| index.snapshot(&path))?;
.unwrap();
Ok(()) Ok(())
} }
@ -557,149 +568,217 @@ impl UpdateStore {
} }
} }
//#[cfg(test)] #[cfg(test)]
//mod test { mod test {
//use super::*; use futures::future::ok;
//use crate::index_controller::{ use mockall::predicate::eq;
//index_actor::{error::IndexActorError, MockIndexActorHandle},
//UpdateResult,
//};
//use futures::future::ok; use crate::index::error::IndexError;
use crate::index::test::Mocker;
use crate::index_controller::index_resolver::index_store::MockIndexStore;
use crate::index_controller::index_resolver::uuid_store::MockUuidStore;
use crate::index_controller::updates::status::{Failed, Processed};
//#[actix_rt::test] use super::*;
//async fn test_next_id() {
//let dir = tempfile::tempdir_in(".").unwrap();
//let mut options = EnvOpenOptions::new();
//let handle = Arc::new(MockIndexActorHandle::new());
//options.map_size(4096 * 100);
//let update_store = UpdateStore::open(
//options,
//dir.path(),
//handle,
//Arc::new(AtomicBool::new(false)),
//)
//.unwrap();
//let index1_uuid = Uuid::new_v4(); #[actix_rt::test]
//let index2_uuid = Uuid::new_v4(); async fn test_next_id() {
let dir = tempfile::tempdir_in(".").unwrap();
let mut options = EnvOpenOptions::new();
let index_store = MockIndexStore::new();
let uuid_store = MockUuidStore::new();
let index_resolver = IndexResolver::new(uuid_store, index_store);
let update_file_store = UpdateFileStore::new(dir.path()).unwrap();
options.map_size(4096 * 100);
let update_store = UpdateStore::open(
options,
dir.path(),
Arc::new(index_resolver),
Arc::new(AtomicBool::new(false)),
update_file_store,
)
.unwrap();
//let mut txn = update_store.env.write_txn().unwrap(); let index1_uuid = Uuid::new_v4();
//let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); let index2_uuid = Uuid::new_v4();
//txn.commit().unwrap();
//assert_eq!((0, 0), ids);
//let mut txn = update_store.env.write_txn().unwrap(); let mut txn = update_store.env.write_txn().unwrap();
//let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap(); let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
//txn.commit().unwrap(); txn.commit().unwrap();
//assert_eq!((1, 0), ids); assert_eq!((0, 0), ids);
//let mut txn = update_store.env.write_txn().unwrap(); let mut txn = update_store.env.write_txn().unwrap();
//let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap();
//txn.commit().unwrap(); txn.commit().unwrap();
//assert_eq!((2, 1), ids); assert_eq!((1, 0), ids);
//}
//#[actix_rt::test] let mut txn = update_store.env.write_txn().unwrap();
//async fn test_register_update() { let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
//let dir = tempfile::tempdir_in(".").unwrap(); txn.commit().unwrap();
//let mut options = EnvOpenOptions::new(); assert_eq!((2, 1), ids);
//let handle = Arc::new(MockIndexActorHandle::new()); }
//options.map_size(4096 * 100);
//let update_store = UpdateStore::open(
//options,
//dir.path(),
//handle,
//Arc::new(AtomicBool::new(false)),
//)
//.unwrap();
//let meta = UpdateMeta::ClearDocuments;
//let uuid = Uuid::new_v4();
//let store_clone = update_store.clone();
//tokio::task::spawn_blocking(move || {
//store_clone.register_update(meta, None, uuid).unwrap();
//})
//.await
//.unwrap();
//let txn = update_store.env.read_txn().unwrap(); #[actix_rt::test]
//assert!(update_store async fn test_register_update() {
//.pending_queue let dir = tempfile::tempdir_in(".").unwrap();
//.get(&txn, &(0, uuid, 0)) let index_store = MockIndexStore::new();
//.unwrap() let uuid_store = MockUuidStore::new();
//.is_some()); let index_resolver = IndexResolver::new(uuid_store, index_store);
//} let update_file_store = UpdateFileStore::new(dir.path()).unwrap();
let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100);
let update_store = UpdateStore::open(
options,
dir.path(),
Arc::new(index_resolver),
Arc::new(AtomicBool::new(false)),
update_file_store,
)
.unwrap();
let update = Update::ClearDocuments;
let uuid = Uuid::new_v4();
let store_clone = update_store.clone();
tokio::task::spawn_blocking(move || {
store_clone.register_update(uuid, update).unwrap();
})
.await
.unwrap();
//#[actix_rt::test] let txn = update_store.env.read_txn().unwrap();
//async fn test_process_update() { assert!(update_store
//let dir = tempfile::tempdir_in(".").unwrap(); .pending_queue
//let mut handle = MockIndexActorHandle::new(); .get(&txn, &(0, uuid, 0))
.unwrap()
.is_some());
}
//handle #[actix_rt::test]
//.expect_update() async fn test_process_update_success() {
//.times(2) let dir = tempfile::tempdir_in(".").unwrap();
//.returning(|_index_uuid, processing, _file| { let index_uuid = Uuid::new_v4();
//if processing.id() == 0 {
//Box::pin(ok(Ok(processing.process(UpdateResult::Other))))
//} else {
//Box::pin(ok(Err(
//processing.fail(IndexActorError::ExistingPrimaryKey.into())
//)))
//}
//});
//let handle = Arc::new(handle); let mut index_store = MockIndexStore::new();
index_store
.expect_get()
.with(eq(index_uuid))
.returning(|_uuid| {
let mocker = Mocker::default();
mocker
.when::<Processing, std::result::Result<Processed, Failed>>("handle_update")
.once()
.then(|update| Ok(update.process(status::UpdateResult::Other)));
//let mut options = EnvOpenOptions::new(); Box::pin(ok(Some(Index::faux(mocker))))
//options.map_size(4096 * 100); });
//let store = UpdateStore::open(
//options,
//dir.path(),
//handle.clone(),
//Arc::new(AtomicBool::new(false)),
//)
//.unwrap();
//// wait a bit for the event loop exit. let uuid_store = MockUuidStore::new();
//tokio::time::sleep(std::time::Duration::from_millis(50)).await; let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
//let mut txn = store.env.write_txn().unwrap(); let update_file_store = UpdateFileStore::new(dir.path()).unwrap();
let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100);
let store = UpdateStore::open(
options,
dir.path(),
index_resolver.clone(),
Arc::new(AtomicBool::new(false)),
update_file_store,
)
.unwrap();
//let update = Enqueued::new(UpdateMeta::ClearDocuments, 0, None); // wait a bit for the event loop exit.
//let uuid = Uuid::new_v4(); tokio::time::sleep(std::time::Duration::from_millis(50)).await;
//store let mut txn = store.env.write_txn().unwrap();
//.pending_queue
//.put(&mut txn, &(0, uuid, 0), &update)
//.unwrap();
//let update = Enqueued::new(UpdateMeta::ClearDocuments, 1, None); let update = Enqueued::new(Update::ClearDocuments, 0);
//store store
//.pending_queue .pending_queue
//.put(&mut txn, &(1, uuid, 1), &update) .put(&mut txn, &(0, index_uuid, 0), &update)
//.unwrap(); .unwrap();
//txn.commit().unwrap(); txn.commit().unwrap();
//// Process the pending, and check that it has been moved to the update databases, and // Process the pending, and check that it has been moved to the update databases, and
//// removed from the pending database. // removed from the pending database.
//let store_clone = store.clone(); let store_clone = store.clone();
//tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
//store_clone.process_pending_update(handle.clone()).unwrap(); store_clone.process_pending_update(index_resolver).unwrap();
//store_clone.process_pending_update(handle).unwrap(); })
//}) .await
//.await .unwrap();
//.unwrap();
//let txn = store.env.read_txn().unwrap(); let txn = store.env.read_txn().unwrap();
//assert!(store.pending_queue.first(&txn).unwrap().is_none()); assert!(store.pending_queue.first(&txn).unwrap().is_none());
//let update = store.updates.get(&txn, &(uuid, 0)).unwrap().unwrap(); let update = store.updates.get(&txn, &(index_uuid, 0)).unwrap().unwrap();
//assert!(matches!(update, UpdateStatus::Processed(_))); assert!(matches!(update, UpdateStatus::Processed(_)));
//let update = store.updates.get(&txn, &(uuid, 1)).unwrap().unwrap(); }
//assert!(matches!(update, UpdateStatus::Failed(_))); #[actix_rt::test]
//} async fn test_process_update_failure() {
//} let dir = tempfile::tempdir_in(".").unwrap();
let index_uuid = Uuid::new_v4();
let mut index_store = MockIndexStore::new();
index_store
.expect_get()
.with(eq(index_uuid))
.returning(|_uuid| {
let mocker = Mocker::default();
mocker
.when::<Processing, std::result::Result<Processed, Failed>>("handle_update")
.once()
.then(|update| Err(update.fail(IndexError::ExistingPrimaryKey)));
Box::pin(ok(Some(Index::faux(mocker))))
});
let uuid_store = MockUuidStore::new();
let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store));
let update_file_store = UpdateFileStore::new(dir.path()).unwrap();
let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100);
let store = UpdateStore::open(
options,
dir.path(),
index_resolver.clone(),
Arc::new(AtomicBool::new(false)),
update_file_store,
)
.unwrap();
// wait a bit for the event loop exit.
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
let mut txn = store.env.write_txn().unwrap();
let update = Enqueued::new(Update::ClearDocuments, 0);
store
.pending_queue
.put(&mut txn, &(0, index_uuid, 0), &update)
.unwrap();
txn.commit().unwrap();
// Process the pending, and check that it has been moved to the update databases, and
// removed from the pending database.
let store_clone = store.clone();
tokio::task::spawn_blocking(move || {
store_clone.process_pending_update(index_resolver).unwrap();
})
.await
.unwrap();
let txn = store.env.read_txn().unwrap();
assert!(store.pending_queue.first(&txn).unwrap().is_none());
let update = store.updates.get(&txn, &(index_uuid, 0)).unwrap().unwrap();
assert!(matches!(update, UpdateStatus::Failed(_)));
}
}

View File

@ -5,7 +5,8 @@ pub mod options;
pub mod index; pub mod index;
pub mod index_controller; pub mod index_controller;
pub use index_controller::{updates::store::Update, IndexController as MeiliSearch}; pub use index_controller::updates::store::Update;
pub use index_controller::MeiliSearch;
pub use milli; pub use milli;