Merge branch 'main' of github.com:cisco877/meilisearch into add-index-name-to-sort-error-message

This commit is contained in:
Francesco 2024-02-29 01:27:04 +01:00
commit b853de9584
28 changed files with 1428 additions and 558 deletions

View File

@ -110,6 +110,25 @@ jobs:
--milestone $MILESTONE_VERSION \ --milestone $MILESTONE_VERSION \
--assignee curquiza --assignee curquiza
create-update-version-issue:
needs: get-release-version
# Create the changelog issue if the release is not only a patch release
if: github.event.action == 'created'
runs-on: ubuntu-latest
env:
ISSUE_TEMPLATE: issue-template.md
steps:
- uses: actions/checkout@v3
- name: Download the issue template
run: curl -s https://raw.githubusercontent.com/meilisearch/engine-team/main/issue-templates/update-version-issue.md > $ISSUE_TEMPLATE
- name: Create the issue
run: |
gh issue create \
--title "Update version in Cargo.toml for $MILESTONE_VERSION" \
--label 'maintenance' \
--body-file $ISSUE_TEMPLATE \
--milestone $MILESTONE_VERSION
# ---------------- # ----------------
# MILESTONE CLOSED # MILESTONE CLOSED
# ---------------- # ----------------

182
Cargo.lock generated
View File

@ -36,16 +36,16 @@ dependencies = [
[[package]] [[package]]
name = "actix-http" name = "actix-http"
version = "3.5.1" version = "3.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "129d4c88e98860e1758c5de288d1632b07970a16d59bdf7b8d66053d582bb71f" checksum = "d223b13fd481fc0d1f83bb12659ae774d9e3601814c68a0bc539731698cca743"
dependencies = [ dependencies = [
"actix-codec", "actix-codec",
"actix-rt", "actix-rt",
"actix-service", "actix-service",
"actix-tls", "actix-tls",
"actix-utils", "actix-utils",
"ahash 0.8.3", "ahash 0.8.8",
"base64 0.21.7", "base64 0.21.7",
"bitflags 2.4.1", "bitflags 2.4.1",
"brotli", "brotli",
@ -138,9 +138,9 @@ dependencies = [
[[package]] [[package]]
name = "actix-tls" name = "actix-tls"
version = "3.1.1" version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72616e7fbec0aa99c6f3164677fa48ff5a60036d0799c98cab894a44f3e0efc3" checksum = "d4cce60a2f2b477bc72e5cde0af1812a6e82d8fd85b5570a5dcf2a5bf2c5be5f"
dependencies = [ dependencies = [
"actix-rt", "actix-rt",
"actix-service", "actix-service",
@ -148,13 +148,11 @@ dependencies = [
"futures-core", "futures-core",
"impl-more", "impl-more",
"pin-project-lite", "pin-project-lite",
"rustls 0.21.6",
"rustls-webpki",
"tokio", "tokio",
"tokio-rustls 0.23.4", "tokio-rustls",
"tokio-util", "tokio-util",
"tracing", "tracing",
"webpki-roots 0.22.6", "webpki-roots",
] ]
[[package]] [[package]]
@ -169,9 +167,9 @@ dependencies = [
[[package]] [[package]]
name = "actix-web" name = "actix-web"
version = "4.4.1" version = "4.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e43428f3bf11dee6d166b00ec2df4e3aa8cc1606aaa0b7433c146852e2f4e03b" checksum = "43a6556ddebb638c2358714d853257ed226ece6023ef9364f23f0c70737ea984"
dependencies = [ dependencies = [
"actix-codec", "actix-codec",
"actix-http", "actix-http",
@ -183,7 +181,7 @@ dependencies = [
"actix-tls", "actix-tls",
"actix-utils", "actix-utils",
"actix-web-codegen", "actix-web-codegen",
"ahash 0.8.3", "ahash 0.8.8",
"bytes", "bytes",
"bytestring", "bytestring",
"cfg-if", "cfg-if",
@ -270,14 +268,15 @@ dependencies = [
[[package]] [[package]]
name = "ahash" name = "ahash"
version = "0.8.3" version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c99f64d1e06488f620f932677e24bc6e2897582980441ae90a671415bd7ec2f" checksum = "42cd52102d3df161c77a887b608d7a4897d7cc112886a9537b738a887a03aaff"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"getrandom", "getrandom",
"once_cell", "once_cell",
"version_check", "version_check",
"zerocopy",
] ]
[[package]] [[package]]
@ -834,9 +833,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.0.82" version = "1.0.83"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "305fe645edc1442a0fa8b6726ba61d422798d37a52e12eaecf4b022ebbb88f01" checksum = "f1174fb0b6ec23863f8b971027804a42614e347eafb0a95bf0b12cdae21fc4d0"
dependencies = [ dependencies = [
"jobserver", "jobserver",
"libc", "libc",
@ -2126,8 +2125,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427" checksum = "be4136b2a15dd319360be1c07d9933517ccf0be8f16bf62a3bee4f0d618df427"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"js-sys",
"libc", "libc",
"wasi", "wasi",
"wasm-bindgen",
] ]
[[package]] [[package]]
@ -2251,7 +2252,7 @@ dependencies = [
"atomic-polyfill", "atomic-polyfill",
"hash32", "hash32",
"rustc_version", "rustc_version",
"spin 0.9.8", "spin",
"stable_deref_trait", "stable_deref_trait",
] ]
@ -2420,9 +2421,9 @@ dependencies = [
"futures-util", "futures-util",
"http 0.2.11", "http 0.2.11",
"hyper", "hyper",
"rustls 0.21.6", "rustls",
"tokio", "tokio",
"tokio-rustls 0.24.1", "tokio-rustls",
] ]
[[package]] [[package]]
@ -3124,13 +3125,14 @@ dependencies = [
[[package]] [[package]]
name = "jsonwebtoken" name = "jsonwebtoken"
version = "8.3.0" version = "9.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6971da4d9c3aa03c3d8f3ff0f4155b534aad021292003895a469716b2a230378" checksum = "5c7ea04a7c5c055c175f189b6dc6ba036fd62306b58c66c9f6389036c503a3f4"
dependencies = [ dependencies = [
"base64 0.21.7", "base64 0.21.7",
"js-sys",
"pem", "pem",
"ring 0.16.20", "ring",
"serde", "serde",
"serde_json", "serde_json",
"simple_asn1", "simple_asn1",
@ -3721,7 +3723,7 @@ dependencies = [
"rayon", "rayon",
"regex", "regex",
"reqwest", "reqwest",
"rustls 0.20.9", "rustls",
"rustls-pemfile", "rustls-pemfile",
"segment", "segment",
"serde", "serde",
@ -4257,11 +4259,12 @@ checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
[[package]] [[package]]
name = "pem" name = "pem"
version = "1.1.1" version = "3.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8835c273a76a90455d7344889b0964598e3316e2a79ede8e36f16bdcf2228b8" checksum = "1b8fcc794035347fb64beda2d3b462595dd2753e3f268d89c5aae77e8cf2c310"
dependencies = [ dependencies = [
"base64 0.13.1", "base64 0.21.7",
"serde",
] ]
[[package]] [[package]]
@ -4792,20 +4795,20 @@ dependencies = [
"once_cell", "once_cell",
"percent-encoding", "percent-encoding",
"pin-project-lite", "pin-project-lite",
"rustls 0.21.6", "rustls",
"rustls-pemfile", "rustls-pemfile",
"serde", "serde",
"serde_json", "serde_json",
"serde_urlencoded", "serde_urlencoded",
"system-configuration", "system-configuration",
"tokio", "tokio",
"tokio-rustls 0.24.1", "tokio-rustls",
"tower-service", "tower-service",
"url", "url",
"wasm-bindgen", "wasm-bindgen",
"wasm-bindgen-futures", "wasm-bindgen-futures",
"web-sys", "web-sys",
"webpki-roots 0.25.3", "webpki-roots",
"winreg", "winreg",
] ]
@ -4823,30 +4826,15 @@ checksum = "b9b1a3d5f46d53f4a3478e2be4a5a5ce5108ea58b100dcd139830eae7f79a3a1"
[[package]] [[package]]
name = "ring" name = "ring"
version = "0.16.20" version = "0.17.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" checksum = "688c63d65483050968b2a8937f7995f443e27041a0f7700aa59b0822aedebb74"
dependencies = [
"cc",
"libc",
"once_cell",
"spin 0.5.2",
"untrusted 0.7.1",
"web-sys",
"winapi",
]
[[package]]
name = "ring"
version = "0.17.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9babe80d5c16becf6594aa32ad2be8fe08498e7ae60b77de8df700e67f191d7e"
dependencies = [ dependencies = [
"cc", "cc",
"getrandom", "getrandom",
"libc", "libc",
"spin 0.9.8", "spin",
"untrusted 0.9.0", "untrusted",
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
@ -4924,24 +4912,12 @@ dependencies = [
[[package]] [[package]]
name = "rustls" name = "rustls"
version = "0.20.9" version = "0.21.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b80e3dec595989ea8510028f30c408a4630db12c9cbb8de34203b89d6577e99" checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba"
dependencies = [ dependencies = [
"log", "log",
"ring 0.16.20", "ring",
"sct",
"webpki",
]
[[package]]
name = "rustls"
version = "0.21.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb"
dependencies = [
"log",
"ring 0.16.20",
"rustls-webpki", "rustls-webpki",
"sct", "sct",
] ]
@ -4961,8 +4937,8 @@ version = "0.101.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765" checksum = "8b6275d1ee7a1cd780b64aca7726599a1dbc893b1e64144529e55c3c2f745765"
dependencies = [ dependencies = [
"ring 0.17.3", "ring",
"untrusted 0.9.0", "untrusted",
] ]
[[package]] [[package]]
@ -5004,12 +4980,12 @@ checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]] [[package]]
name = "sct" name = "sct"
version = "0.7.0" version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" checksum = "da046153aa2352493d6cb7da4b6e5c0c057d8a1d0a9aa8560baffdd945acd414"
dependencies = [ dependencies = [
"ring 0.16.20", "ring",
"untrusted 0.7.1", "untrusted",
] ]
[[package]] [[package]]
@ -5275,12 +5251,6 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "spin"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]] [[package]]
name = "spin" name = "spin"
version = "0.9.8" version = "0.9.8"
@ -5642,24 +5612,13 @@ dependencies = [
"syn 2.0.48", "syn 2.0.48",
] ]
[[package]]
name = "tokio-rustls"
version = "0.23.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
dependencies = [
"rustls 0.20.9",
"tokio",
"webpki",
]
[[package]] [[package]]
name = "tokio-rustls" name = "tokio-rustls"
version = "0.24.1" version = "0.24.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081"
dependencies = [ dependencies = [
"rustls 0.21.6", "rustls",
"tokio", "tokio",
] ]
@ -5915,12 +5874,6 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e"
[[package]]
name = "untrusted"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]] [[package]]
name = "untrusted" name = "untrusted"
version = "0.9.0" version = "0.9.0"
@ -5937,13 +5890,13 @@ dependencies = [
"flate2", "flate2",
"log", "log",
"once_cell", "once_cell",
"rustls 0.21.6", "rustls",
"rustls-webpki", "rustls-webpki",
"serde", "serde",
"serde_json", "serde_json",
"socks", "socks",
"url", "url",
"webpki-roots 0.25.3", "webpki-roots",
] ]
[[package]] [[package]]
@ -6153,25 +6106,6 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "webpki"
version = "0.22.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07ecc0cd7cac091bf682ec5efa18b1cff79d617b84181f38b3951dbe135f607f"
dependencies = [
"ring 0.16.20",
"untrusted 0.7.1",
]
[[package]]
name = "webpki-roots"
version = "0.22.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c71e40d7d2c34a5106301fb632274ca37242cd0c9d3e64dbece371a40a2d87"
dependencies = [
"webpki",
]
[[package]] [[package]]
name = "webpki-roots" name = "webpki-roots"
version = "0.25.3" version = "0.25.3"
@ -6533,6 +6467,26 @@ dependencies = [
"synstructure", "synstructure",
] ]
[[package]]
name = "zerocopy"
version = "0.7.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be"
dependencies = [
"zerocopy-derive",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
]
[[package]] [[package]]
name = "zerofrom" name = "zerofrom"
version = "0.1.3" version = "0.1.3"

View File

@ -1,5 +1,5 @@
use std::fs::File as StdFile; use std::fs::File as StdFile;
use std::ops::{Deref, DerefMut}; use std::io::Write;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::str::FromStr; use std::str::FromStr;
@ -22,20 +22,6 @@ pub enum Error {
pub type Result<T> = std::result::Result<T, Error>; pub type Result<T> = std::result::Result<T, Error>;
impl Deref for File {
type Target = NamedTempFile;
fn deref(&self) -> &Self::Target {
&self.file
}
}
impl DerefMut for File {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.file
}
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct FileStore { pub struct FileStore {
path: PathBuf, path: PathBuf,
@ -56,7 +42,7 @@ impl FileStore {
let file = NamedTempFile::new_in(&self.path)?; let file = NamedTempFile::new_in(&self.path)?;
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let path = self.path.join(uuid.to_string()); let path = self.path.join(uuid.to_string());
let update_file = File { file, path }; let update_file = File { file: Some(file), path };
Ok((uuid, update_file)) Ok((uuid, update_file))
} }
@ -67,7 +53,7 @@ impl FileStore {
let file = NamedTempFile::new_in(&self.path)?; let file = NamedTempFile::new_in(&self.path)?;
let uuid = Uuid::from_u128(uuid); let uuid = Uuid::from_u128(uuid);
let path = self.path.join(uuid.to_string()); let path = self.path.join(uuid.to_string());
let update_file = File { file, path }; let update_file = File { file: Some(file), path };
Ok((uuid, update_file)) Ok((uuid, update_file))
} }
@ -136,16 +122,40 @@ impl FileStore {
pub struct File { pub struct File {
path: PathBuf, path: PathBuf,
file: NamedTempFile, file: Option<NamedTempFile>,
} }
impl File { impl File {
pub fn dry_file() -> Result<Self> {
Ok(Self { path: PathBuf::new(), file: None })
}
pub fn persist(self) -> Result<()> { pub fn persist(self) -> Result<()> {
self.file.persist(&self.path)?; if let Some(file) = self.file {
file.persist(&self.path)?;
}
Ok(()) Ok(())
} }
} }
impl Write for File {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if let Some(file) = self.file.as_mut() {
file.write(buf)
} else {
Ok(buf.len())
}
}
fn flush(&mut self) -> std::io::Result<()> {
if let Some(file) = self.file.as_mut() {
file.flush()
} else {
Ok(())
}
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use std::io::Write; use std::io::Write;

View File

@ -48,6 +48,8 @@ impl From<DateField> for Code {
pub enum Error { pub enum Error {
#[error("{1}")] #[error("{1}")]
WithCustomErrorCode(Code, Box<Self>), WithCustomErrorCode(Code, Box<Self>),
#[error("Received bad task id: {received} should be >= to {expected}.")]
BadTaskId { received: TaskId, expected: TaskId },
#[error("Index `{0}` not found.")] #[error("Index `{0}` not found.")]
IndexNotFound(String), IndexNotFound(String),
#[error("Index `{0}` already exists.")] #[error("Index `{0}` already exists.")]
@ -161,6 +163,7 @@ impl Error {
match self { match self {
Error::IndexNotFound(_) Error::IndexNotFound(_)
| Error::WithCustomErrorCode(_, _) | Error::WithCustomErrorCode(_, _)
| Error::BadTaskId { .. }
| Error::IndexAlreadyExists(_) | Error::IndexAlreadyExists(_)
| Error::SwapDuplicateIndexFound(_) | Error::SwapDuplicateIndexFound(_)
| Error::SwapDuplicateIndexesFound(_) | Error::SwapDuplicateIndexesFound(_)
@ -205,6 +208,7 @@ impl ErrorCode for Error {
fn error_code(&self) -> Code { fn error_code(&self) -> Code {
match self { match self {
Error::WithCustomErrorCode(code, _) => *code, Error::WithCustomErrorCode(code, _) => *code,
Error::BadTaskId { .. } => Code::BadRequest,
Error::IndexNotFound(_) => Code::IndexNotFound, Error::IndexNotFound(_) => Code::IndexNotFound,
Error::IndexAlreadyExists(_) => Code::IndexAlreadyExists, Error::IndexAlreadyExists(_) => Code::IndexAlreadyExists,
Error::SwapDuplicateIndexesFound(_) => Code::InvalidSwapDuplicateIndexFound, Error::SwapDuplicateIndexesFound(_) => Code::InvalidSwapDuplicateIndexFound,

View File

@ -15,6 +15,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
let IndexScheduler { let IndexScheduler {
autobatching_enabled, autobatching_enabled,
cleanup_enabled: _,
must_stop_processing: _, must_stop_processing: _,
processing_tasks, processing_tasks,
file_store, file_store,

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,90 @@
---
source: index-scheduler/src/lib.rs
---
[
{
"uid": 0,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "succeeded",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
},
{
"uid": 1,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": {
"message": "Index `doggo` already exists.",
"code": "index_already_exists",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#index_already_exists"
},
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "failed",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
},
{
"uid": 2,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "enqueued",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
},
{
"uid": 3,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "enqueued",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
}
]

View File

@ -0,0 +1,90 @@
---
source: index-scheduler/src/lib.rs
---
[
{
"uid": 0,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "succeeded",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
},
{
"uid": 1,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": {
"message": "Index `doggo` already exists.",
"code": "index_already_exists",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#index_already_exists"
},
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "failed",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
},
{
"uid": 2,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "enqueued",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
},
{
"uid": 3,
"enqueuedAt": "[date]",
"startedAt": "[date]",
"finishedAt": "[date]",
"error": null,
"canceledBy": null,
"details": {
"IndexInfo": {
"primary_key": null
}
},
"status": "enqueued",
"kind": {
"indexCreation": {
"index_uid": "doggo",
"primary_key": null
}
}
}
]

View File

@ -11,7 +11,7 @@ edition.workspace = true
license.workspace = true license.workspace = true
[dependencies] [dependencies]
actix-web = { version = "4.4.1", default-features = false } actix-web = { version = "4.5.1", default-features = false }
anyhow = "1.0.79" anyhow = "1.0.79"
convert_case = "0.6.0" convert_case = "0.6.0"
csv = "1.3.0" csv = "1.3.0"

View File

@ -1,6 +1,6 @@
use std::fmt::{self, Debug, Display}; use std::fmt::{self, Debug, Display};
use std::fs::File; use std::fs::File;
use std::io::{self, Seek, Write}; use std::io::{self, BufWriter, Write};
use std::marker::PhantomData; use std::marker::PhantomData;
use memmap2::MmapOptions; use memmap2::MmapOptions;
@ -104,8 +104,8 @@ impl ErrorCode for DocumentFormatError {
} }
/// Reads CSV from input and write an obkv batch to writer. /// Reads CSV from input and write an obkv batch to writer.
pub fn read_csv(file: &File, writer: impl Write + Seek, delimiter: u8) -> Result<u64> { pub fn read_csv(file: &File, writer: impl Write, delimiter: u8) -> Result<u64> {
let mut builder = DocumentsBatchBuilder::new(writer); let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer));
let mmap = unsafe { MmapOptions::new().map(file)? }; let mmap = unsafe { MmapOptions::new().map(file)? };
let csv = csv::ReaderBuilder::new().delimiter(delimiter).from_reader(mmap.as_ref()); let csv = csv::ReaderBuilder::new().delimiter(delimiter).from_reader(mmap.as_ref());
builder.append_csv(csv).map_err(|e| (PayloadType::Csv { delimiter }, e))?; builder.append_csv(csv).map_err(|e| (PayloadType::Csv { delimiter }, e))?;
@ -116,9 +116,9 @@ pub fn read_csv(file: &File, writer: impl Write + Seek, delimiter: u8) -> Result
Ok(count as u64) Ok(count as u64)
} }
/// Reads JSON from temporary file and write an obkv batch to writer. /// Reads JSON from temporary file and write an obkv batch to writer.
pub fn read_json(file: &File, writer: impl Write + Seek) -> Result<u64> { pub fn read_json(file: &File, writer: impl Write) -> Result<u64> {
let mut builder = DocumentsBatchBuilder::new(writer); let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer));
let mmap = unsafe { MmapOptions::new().map(file)? }; let mmap = unsafe { MmapOptions::new().map(file)? };
let mut deserializer = serde_json::Deserializer::from_slice(&mmap); let mut deserializer = serde_json::Deserializer::from_slice(&mmap);
@ -151,8 +151,8 @@ pub fn read_json(file: &File, writer: impl Write + Seek) -> Result<u64> {
} }
/// Reads JSON from temporary file and write an obkv batch to writer. /// Reads JSON from temporary file and write an obkv batch to writer.
pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result<u64> { pub fn read_ndjson(file: &File, writer: impl Write) -> Result<u64> {
let mut builder = DocumentsBatchBuilder::new(writer); let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer));
let mmap = unsafe { MmapOptions::new().map(file)? }; let mmap = unsafe { MmapOptions::new().map(file)? };
for result in serde_json::Deserializer::from_slice(&mmap).into_iter() { for result in serde_json::Deserializer::from_slice(&mmap).into_iter() {

View File

@ -14,18 +14,18 @@ default-run = "meilisearch"
[dependencies] [dependencies]
actix-cors = "0.7.0" actix-cors = "0.7.0"
actix-http = { version = "3.5.1", default-features = false, features = [ actix-http = { version = "3.6.0", default-features = false, features = [
"compress-brotli", "compress-brotli",
"compress-gzip", "compress-gzip",
"rustls", "rustls-0_21",
] } ] }
actix-utils = "3.0.1" actix-utils = "3.0.1"
actix-web = { version = "4.4.1", default-features = false, features = [ actix-web = { version = "4.5.1", default-features = false, features = [
"macros", "macros",
"compress-brotli", "compress-brotli",
"compress-gzip", "compress-gzip",
"cookies", "cookies",
"rustls", "rustls-0_21",
] } ] }
actix-web-static-files = { git = "https://github.com/kilork/actix-web-static-files.git", rev = "2d3b6160", optional = true } actix-web-static-files = { git = "https://github.com/kilork/actix-web-static-files.git", rev = "2d3b6160", optional = true }
anyhow = { version = "1.0.79", features = ["backtrace"] } anyhow = { version = "1.0.79", features = ["backtrace"] }
@ -52,7 +52,7 @@ index-scheduler = { path = "../index-scheduler" }
indexmap = { version = "2.1.0", features = ["serde"] } indexmap = { version = "2.1.0", features = ["serde"] }
is-terminal = "0.4.10" is-terminal = "0.4.10"
itertools = "0.11.0" itertools = "0.11.0"
jsonwebtoken = "8.3.0" jsonwebtoken = "9.2.0"
lazy_static = "1.4.0" lazy_static = "1.4.0"
meilisearch-auth = { path = "../meilisearch-auth" } meilisearch-auth = { path = "../meilisearch-auth" }
meilisearch-types = { path = "../meilisearch-types" } meilisearch-types = { path = "../meilisearch-types" }
@ -75,7 +75,7 @@ reqwest = { version = "0.11.23", features = [
"rustls-tls", "rustls-tls",
"json", "json",
], default-features = false } ], default-features = false }
rustls = "0.20.8" rustls = "0.21.6"
rustls-pemfile = "1.0.2" rustls-pemfile = "1.0.2"
segment = { version = "0.2.3", optional = true } segment = { version = "0.2.3", optional = true }
serde = { version = "1.0.195", features = ["derive"] } serde = { version = "1.0.195", features = ["derive"] }

View File

@ -250,6 +250,7 @@ impl super::Analytics for SegmentAnalytics {
struct Infos { struct Infos {
env: String, env: String,
experimental_enable_metrics: bool, experimental_enable_metrics: bool,
experimental_replication_parameters: bool,
experimental_enable_logs_route: bool, experimental_enable_logs_route: bool,
experimental_reduce_indexing_memory_usage: bool, experimental_reduce_indexing_memory_usage: bool,
experimental_max_number_of_batched_tasks: usize, experimental_max_number_of_batched_tasks: usize,
@ -288,6 +289,7 @@ impl From<Opt> for Infos {
let Opt { let Opt {
db_path, db_path,
experimental_enable_metrics, experimental_enable_metrics,
experimental_replication_parameters,
experimental_enable_logs_route, experimental_enable_logs_route,
experimental_reduce_indexing_memory_usage, experimental_reduce_indexing_memory_usage,
experimental_max_number_of_batched_tasks, experimental_max_number_of_batched_tasks,
@ -335,6 +337,7 @@ impl From<Opt> for Infos {
Self { Self {
env, env,
experimental_enable_metrics, experimental_enable_metrics,
experimental_replication_parameters,
experimental_enable_logs_route, experimental_enable_logs_route,
experimental_reduce_indexing_memory_usage, experimental_reduce_indexing_memory_usage,
db_path: db_path != PathBuf::from("./data.ms"), db_path: db_path != PathBuf::from("./data.ms"),

View File

@ -131,6 +131,7 @@ gen_seq! { SeqFromRequestFut3; A B C }
gen_seq! { SeqFromRequestFut4; A B C D } gen_seq! { SeqFromRequestFut4; A B C D }
gen_seq! { SeqFromRequestFut5; A B C D E } gen_seq! { SeqFromRequestFut5; A B C D E }
gen_seq! { SeqFromRequestFut6; A B C D E F } gen_seq! { SeqFromRequestFut6; A B C D E F }
gen_seq! { SeqFromRequestFut7; A B C D E F G }
pin_project! { pin_project! {
#[project = ExtractProj] #[project = ExtractProj]

View File

@ -251,7 +251,9 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
.name(String::from("register-snapshot-tasks")) .name(String::from("register-snapshot-tasks"))
.spawn(move || loop { .spawn(move || loop {
thread::sleep(snapshot_delay); thread::sleep(snapshot_delay);
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) { if let Err(e) =
index_scheduler.register(KindWithContent::SnapshotCreation, None, false)
{
error!("Error while registering snapshot: {}", e); error!("Error while registering snapshot: {}", e);
} }
}) })
@ -286,6 +288,7 @@ fn open_or_create_database_unchecked(
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage, enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
indexer_config: (&opt.indexer_options).try_into()?, indexer_config: (&opt.indexer_options).try_into()?,
autobatching_enabled: true, autobatching_enabled: true,
cleanup_enabled: !opt.experimental_replication_parameters,
max_number_of_tasks: 1_000_000, max_number_of_tasks: 1_000_000,
max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks, max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks,
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize, index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
@ -453,6 +456,7 @@ pub fn configure_data(
.app_data(auth) .app_data(auth)
.app_data(web::Data::from(analytics)) .app_data(web::Data::from(analytics))
.app_data(web::Data::new(logs)) .app_data(web::Data::new(logs))
.app_data(web::Data::new(opt.clone()))
.app_data( .app_data(
web::JsonConfig::default() web::JsonConfig::default()
.limit(http_payload_size_limit) .limit(http_payload_size_limit)

View File

@ -133,7 +133,7 @@ async fn run_http(
.keep_alive(KeepAlive::Os); .keep_alive(KeepAlive::Os);
if let Some(config) = opt_clone.get_ssl_config()? { if let Some(config) = opt_clone.get_ssl_config()? {
http_server.bind_rustls(opt_clone.http_addr, config)?.run().await?; http_server.bind_rustls_021(opt_clone.http_addr, config)?.run().await?;
} else { } else {
http_server.bind(&opt_clone.http_addr)?.run().await?; http_server.bind(&opt_clone.http_addr)?.run().await?;
} }

View File

@ -51,6 +51,7 @@ const MEILI_IGNORE_MISSING_DUMP: &str = "MEILI_IGNORE_MISSING_DUMP";
const MEILI_IGNORE_DUMP_IF_DB_EXISTS: &str = "MEILI_IGNORE_DUMP_IF_DB_EXISTS"; const MEILI_IGNORE_DUMP_IF_DB_EXISTS: &str = "MEILI_IGNORE_DUMP_IF_DB_EXISTS";
const MEILI_DUMP_DIR: &str = "MEILI_DUMP_DIR"; const MEILI_DUMP_DIR: &str = "MEILI_DUMP_DIR";
const MEILI_LOG_LEVEL: &str = "MEILI_LOG_LEVEL"; const MEILI_LOG_LEVEL: &str = "MEILI_LOG_LEVEL";
const MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS: &str = "MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS";
const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE"; const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE";
const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS"; const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS";
const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str = const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str =
@ -317,6 +318,16 @@ pub struct Opt {
#[serde(default)] #[serde(default)]
pub experimental_enable_logs_route: bool, pub experimental_enable_logs_route: bool,
/// Enable multiple features that helps you to run meilisearch in a replicated context.
/// For more information, see: <https://github.com/orgs/meilisearch/discussions/725>
///
/// - /!\ Disable the automatic clean up of old processed tasks, you're in charge of that now
/// - Lets you specify a custom task ID upon registering a task
/// - Lets you execute dry-register a task (get an answer from the route but nothing is actually registered in meilisearch and it won't be processed)
#[clap(long, env = MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS)]
#[serde(default)]
pub experimental_replication_parameters: bool,
/// Experimental RAM reduction during indexing, do not use in production, see: <https://github.com/meilisearch/product/discussions/652> /// Experimental RAM reduction during indexing, do not use in production, see: <https://github.com/meilisearch/product/discussions/652>
#[clap(long, env = MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE)] #[clap(long, env = MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE)]
#[serde(default)] #[serde(default)]
@ -423,6 +434,7 @@ impl Opt {
no_analytics, no_analytics,
experimental_enable_metrics, experimental_enable_metrics,
experimental_enable_logs_route, experimental_enable_logs_route,
experimental_replication_parameters,
experimental_reduce_indexing_memory_usage, experimental_reduce_indexing_memory_usage,
} = self; } = self;
export_to_env_if_not_present(MEILI_DB_PATH, db_path); export_to_env_if_not_present(MEILI_DB_PATH, db_path);
@ -479,6 +491,10 @@ impl Opt {
MEILI_EXPERIMENTAL_ENABLE_METRICS, MEILI_EXPERIMENTAL_ENABLE_METRICS,
experimental_enable_metrics.to_string(), experimental_enable_metrics.to_string(),
); );
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS,
experimental_replication_parameters.to_string(),
);
export_to_env_if_not_present( export_to_env_if_not_present(
MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE, MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE,
experimental_enable_logs_route.to_string(), experimental_enable_logs_route.to_string(),
@ -503,11 +519,11 @@ impl Opt {
} }
if self.ssl_require_auth { if self.ssl_require_auth {
let verifier = AllowAnyAuthenticatedClient::new(client_auth_roots); let verifier = AllowAnyAuthenticatedClient::new(client_auth_roots);
config.with_client_cert_verifier(verifier) config.with_client_cert_verifier(Arc::from(verifier))
} else { } else {
let verifier = let verifier =
AllowAnyAnonymousOrAuthenticatedClient::new(client_auth_roots); AllowAnyAnonymousOrAuthenticatedClient::new(client_auth_roots);
config.with_client_cert_verifier(verifier) config.with_client_cert_verifier(Arc::from(verifier))
} }
} }
None => config.with_no_client_auth(), None => config.with_no_client_auth(),

View File

@ -11,7 +11,8 @@ use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler; use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::SummarizedTaskView; use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView};
use crate::Opt;
pub fn configure(cfg: &mut web::ServiceConfig) { pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_dump)))); cfg.service(web::resource("").route(web::post().to(SeqHandler(create_dump))));
@ -21,6 +22,7 @@ pub async fn create_dump(
index_scheduler: GuardedData<ActionPolicy<{ actions::DUMPS_CREATE }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::DUMPS_CREATE }>, Data<IndexScheduler>>,
auth_controller: GuardedData<ActionPolicy<{ actions::DUMPS_CREATE }>, Data<AuthController>>, auth_controller: GuardedData<ActionPolicy<{ actions::DUMPS_CREATE }>, Data<AuthController>>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
analytics.publish("Dump Created".to_string(), json!({}), Some(&req)); analytics.publish("Dump Created".to_string(), json!({}), Some(&req));
@ -29,8 +31,12 @@ pub async fn create_dump(
keys: auth_controller.list_keys()?, keys: auth_controller.list_keys()?,
instance_uid: analytics.instance_uid().cloned(), instance_uid: analytics.instance_uid().cloned(),
}; };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Create dump"); debug!(returns = ?task, "Create dump");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))

View File

@ -7,7 +7,7 @@ use bstr::ByteSlice as _;
use deserr::actix_web::{AwebJson, AwebQueryParameter}; use deserr::actix_web::{AwebJson, AwebQueryParameter};
use deserr::Deserr; use deserr::Deserr;
use futures::StreamExt; use futures::StreamExt;
use index_scheduler::IndexScheduler; use index_scheduler::{IndexScheduler, TaskId};
use meilisearch_types::deserr::query_params::Param; use meilisearch_types::deserr::query_params::Param;
use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError}; use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType}; use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType};
@ -36,8 +36,11 @@ use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::extractors::payload::Payload; use crate::extractors::payload::Payload;
use crate::extractors::sequential_extractor::SeqHandler; use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::{PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT}; use crate::routes::{
get_task_id, is_dry_run, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT,
};
use crate::search::parse_filter; use crate::search::parse_filter;
use crate::Opt;
static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| { static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| {
vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()] vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()]
@ -119,6 +122,7 @@ pub async fn delete_document(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
path: web::Path<DocumentParam>, path: web::Path<DocumentParam>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let DocumentParam { index_uid, document_id } = path.into_inner(); let DocumentParam { index_uid, document_id } = path.into_inner();
@ -130,9 +134,13 @@ pub async fn delete_document(
index_uid: index_uid.to_string(), index_uid: index_uid.to_string(),
documents_ids: vec![document_id], documents_ids: vec![document_id],
}; };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
debug!(returns = ?task, "Delete document"); .await??
.into();
debug!("returns: {:?}", task);
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
} }
@ -267,6 +275,7 @@ pub async fn replace_documents(
params: AwebQueryParameter<UpdateDocumentsQuery, DeserrQueryParamError>, params: AwebQueryParameter<UpdateDocumentsQuery, DeserrQueryParamError>,
body: Payload, body: Payload,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -277,6 +286,8 @@ pub async fn replace_documents(
analytics.add_documents(&params, index_scheduler.index(&index_uid).is_err(), &req); analytics.add_documents(&params, index_scheduler.index(&index_uid).is_err(), &req);
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid); let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task = document_addition( let task = document_addition(
extract_mime_type(&req)?, extract_mime_type(&req)?,
index_scheduler, index_scheduler,
@ -285,6 +296,8 @@ pub async fn replace_documents(
params.csv_delimiter, params.csv_delimiter,
body, body,
IndexDocumentsMethod::ReplaceDocuments, IndexDocumentsMethod::ReplaceDocuments,
uid,
dry_run,
allow_index_creation, allow_index_creation,
) )
.await?; .await?;
@ -299,6 +312,7 @@ pub async fn update_documents(
params: AwebQueryParameter<UpdateDocumentsQuery, DeserrQueryParamError>, params: AwebQueryParameter<UpdateDocumentsQuery, DeserrQueryParamError>,
body: Payload, body: Payload,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -309,6 +323,8 @@ pub async fn update_documents(
analytics.update_documents(&params, index_scheduler.index(&index_uid).is_err(), &req); analytics.update_documents(&params, index_scheduler.index(&index_uid).is_err(), &req);
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid); let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task = document_addition( let task = document_addition(
extract_mime_type(&req)?, extract_mime_type(&req)?,
index_scheduler, index_scheduler,
@ -317,6 +333,8 @@ pub async fn update_documents(
params.csv_delimiter, params.csv_delimiter,
body, body,
IndexDocumentsMethod::UpdateDocuments, IndexDocumentsMethod::UpdateDocuments,
uid,
dry_run,
allow_index_creation, allow_index_creation,
) )
.await?; .await?;
@ -334,6 +352,8 @@ async fn document_addition(
csv_delimiter: Option<u8>, csv_delimiter: Option<u8>,
mut body: Payload, mut body: Payload,
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
task_id: Option<TaskId>,
dry_run: bool,
allow_index_creation: bool, allow_index_creation: bool,
) -> Result<SummarizedTaskView, MeilisearchHttpError> { ) -> Result<SummarizedTaskView, MeilisearchHttpError> {
let format = match ( let format = match (
@ -366,7 +386,7 @@ async fn document_addition(
} }
}; };
let (uuid, mut update_file) = index_scheduler.create_update_file()?; let (uuid, mut update_file) = index_scheduler.create_update_file(dry_run)?;
let temp_file = match tempfile() { let temp_file = match tempfile() {
Ok(file) => file, Ok(file) => file,
@ -405,11 +425,9 @@ async fn document_addition(
let read_file = buffer.into_inner().into_std().await; let read_file = buffer.into_inner().into_std().await;
let documents_count = tokio::task::spawn_blocking(move || { let documents_count = tokio::task::spawn_blocking(move || {
let documents_count = match format { let documents_count = match format {
PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?, PayloadType::Json => read_json(&read_file, &mut update_file)?,
PayloadType::Csv { delimiter } => { PayloadType::Csv { delimiter } => read_csv(&read_file, &mut update_file, delimiter)?,
read_csv(&read_file, update_file.as_file_mut(), delimiter)? PayloadType::Ndjson => read_ndjson(&read_file, &mut update_file)?,
}
PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?,
}; };
// we NEED to persist the file here because we moved the `udpate_file` in another task. // we NEED to persist the file here because we moved the `udpate_file` in another task.
update_file.persist()?; update_file.persist()?;
@ -450,7 +468,9 @@ async fn document_addition(
}; };
let scheduler = index_scheduler.clone(); let scheduler = index_scheduler.clone();
let task = match tokio::task::spawn_blocking(move || scheduler.register(task)).await? { let task = match tokio::task::spawn_blocking(move || scheduler.register(task, task_id, dry_run))
.await?
{
Ok(task) => task, Ok(task) => task,
Err(e) => { Err(e) => {
index_scheduler.delete_update_file(uuid)?; index_scheduler.delete_update_file(uuid)?;
@ -466,6 +486,7 @@ pub async fn delete_documents_batch(
index_uid: web::Path<String>, index_uid: web::Path<String>,
body: web::Json<Vec<Value>>, body: web::Json<Vec<Value>>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Delete documents by batch"); debug!(parameters = ?body, "Delete documents by batch");
@ -480,8 +501,12 @@ pub async fn delete_documents_batch(
let task = let task =
KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids }; KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Delete documents by batch"); debug!(returns = ?task, "Delete documents by batch");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
@ -499,6 +524,7 @@ pub async fn delete_documents_by_filter(
index_uid: web::Path<String>, index_uid: web::Path<String>,
body: AwebJson<DocumentDeletionByFilter, DeserrJsonError>, body: AwebJson<DocumentDeletionByFilter, DeserrJsonError>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Delete documents by filter"); debug!(parameters = ?body, "Delete documents by filter");
@ -516,8 +542,12 @@ pub async fn delete_documents_by_filter(
.map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?; .map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?;
let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter }; let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Delete documents by filter"); debug!(returns = ?task, "Delete documents by filter");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
@ -527,14 +557,19 @@ pub async fn clear_all_documents(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
index_uid: web::Path<String>, index_uid: web::Path<String>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = IndexUid::try_from(index_uid.into_inner())?;
analytics.delete_documents(DocumentDeletionKind::ClearAll, &req); analytics.delete_documents(DocumentDeletionKind::ClearAll, &req);
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() }; let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Delete all documents"); debug!(returns = ?task, "Delete all documents");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))

View File

@ -17,11 +17,13 @@ use serde_json::json;
use time::OffsetDateTime; use time::OffsetDateTime;
use tracing::debug; use tracing::debug;
use super::{Pagination, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT}; use super::{get_task_id, Pagination, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
use crate::analytics::Analytics; use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::{AuthenticationError, GuardedData}; use crate::extractors::authentication::{AuthenticationError, GuardedData};
use crate::extractors::sequential_extractor::SeqHandler; use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::is_dry_run;
use crate::Opt;
pub mod documents; pub mod documents;
pub mod facet_search; pub mod facet_search;
@ -123,6 +125,7 @@ pub async fn create_index(
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_CREATE }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_CREATE }>, Data<IndexScheduler>>,
body: AwebJson<IndexCreateRequest, DeserrJsonError>, body: AwebJson<IndexCreateRequest, DeserrJsonError>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Create index"); debug!(parameters = ?body, "Create index");
@ -137,8 +140,12 @@ pub async fn create_index(
); );
let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key }; let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Create index"); debug!(returns = ?task, "Create index");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
@ -190,6 +197,7 @@ pub async fn update_index(
index_uid: web::Path<String>, index_uid: web::Path<String>,
body: AwebJson<UpdateIndexRequest, DeserrJsonError>, body: AwebJson<UpdateIndexRequest, DeserrJsonError>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Update index"); debug!(parameters = ?body, "Update index");
@ -206,8 +214,12 @@ pub async fn update_index(
primary_key: body.primary_key, primary_key: body.primary_key,
}; };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Update index"); debug!(returns = ?task, "Update index");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
@ -216,11 +228,17 @@ pub async fn update_index(
pub async fn delete_index( pub async fn delete_index(
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_DELETE }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_DELETE }>, Data<IndexScheduler>>,
index_uid: web::Path<String>, index_uid: web::Path<String>,
req: HttpRequest,
opt: web::Data<Opt>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() }; let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Delete index"); debug!(returns = ?task, "Delete index");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))

View File

@ -15,7 +15,8 @@ use tracing::debug;
use crate::analytics::Analytics; use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::routes::SummarizedTaskView; use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView};
use crate::Opt;
#[macro_export] #[macro_export]
macro_rules! make_setting_route { macro_rules! make_setting_route {
@ -34,7 +35,8 @@ macro_rules! make_setting_route {
use $crate::extractors::authentication::policies::*; use $crate::extractors::authentication::policies::*;
use $crate::extractors::authentication::GuardedData; use $crate::extractors::authentication::GuardedData;
use $crate::extractors::sequential_extractor::SeqHandler; use $crate::extractors::sequential_extractor::SeqHandler;
use $crate::routes::SummarizedTaskView; use $crate::Opt;
use $crate::routes::{is_dry_run, get_task_id, SummarizedTaskView};
pub async fn delete( pub async fn delete(
index_scheduler: GuardedData< index_scheduler: GuardedData<
@ -42,6 +44,8 @@ macro_rules! make_setting_route {
Data<IndexScheduler>, Data<IndexScheduler>,
>, >,
index_uid: web::Path<String>, index_uid: web::Path<String>,
req: HttpRequest,
opt: web::Data<Opt>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -56,8 +60,10 @@ macro_rules! make_setting_route {
is_deletion: true, is_deletion: true,
allow_index_creation, allow_index_creation,
}; };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)) tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await?? .await??
.into(); .into();
@ -73,6 +79,7 @@ macro_rules! make_setting_route {
index_uid: actix_web::web::Path<String>, index_uid: actix_web::web::Path<String>,
body: deserr::actix_web::AwebJson<Option<$type>, $err_ty>, body: deserr::actix_web::AwebJson<Option<$type>, $err_ty>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
$analytics_var: web::Data<dyn Analytics>, $analytics_var: web::Data<dyn Analytics>,
) -> std::result::Result<HttpResponse, ResponseError> { ) -> std::result::Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -105,8 +112,10 @@ macro_rules! make_setting_route {
is_deletion: false, is_deletion: false,
allow_index_creation, allow_index_creation,
}; };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)) tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await?? .await??
.into(); .into();
@ -652,6 +661,7 @@ pub async fn update_all(
index_uid: web::Path<String>, index_uid: web::Path<String>,
body: AwebJson<Settings<Unchecked>, DeserrJsonError>, body: AwebJson<Settings<Unchecked>, DeserrJsonError>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -767,8 +777,12 @@ pub async fn update_all(
is_deletion: false, is_deletion: false,
allow_index_creation, allow_index_creation,
}; };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Update all settings"); debug!(returns = ?task, "Update all settings");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
@ -790,6 +804,8 @@ pub async fn get_all(
pub async fn delete_all( pub async fn delete_all(
index_scheduler: GuardedData<ActionPolicy<{ actions::SETTINGS_UPDATE }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::SETTINGS_UPDATE }>, Data<IndexScheduler>>,
index_uid: web::Path<String>, index_uid: web::Path<String>,
req: HttpRequest,
opt: web::Data<Opt>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -803,8 +819,12 @@ pub async fn delete_all(
is_deletion: true, is_deletion: true,
allow_index_creation, allow_index_creation,
}; };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Delete all settings"); debug!(returns = ?task, "Delete all settings");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))

View File

@ -4,7 +4,7 @@ use actix_web::web::Data;
use actix_web::{web, HttpRequest, HttpResponse}; use actix_web::{web, HttpRequest, HttpResponse};
use index_scheduler::IndexScheduler; use index_scheduler::IndexScheduler;
use meilisearch_auth::AuthController; use meilisearch_auth::AuthController;
use meilisearch_types::error::ResponseError; use meilisearch_types::error::{Code, ResponseError};
use meilisearch_types::settings::{Settings, Unchecked}; use meilisearch_types::settings::{Settings, Unchecked};
use meilisearch_types::tasks::{Kind, Status, Task, TaskId}; use meilisearch_types::tasks::{Kind, Status, Task, TaskId};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -15,6 +15,7 @@ use tracing::debug;
use crate::analytics::Analytics; use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::Opt;
const PAGINATION_DEFAULT_LIMIT: usize = 20; const PAGINATION_DEFAULT_LIMIT: usize = 20;
@ -45,6 +46,56 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
.service(web::scope("/experimental-features").configure(features::configure)); .service(web::scope("/experimental-features").configure(features::configure));
} }
pub fn get_task_id(req: &HttpRequest, opt: &Opt) -> Result<Option<TaskId>, ResponseError> {
if !opt.experimental_replication_parameters {
return Ok(None);
}
let task_id = req
.headers()
.get("TaskId")
.map(|header| {
header.to_str().map_err(|e| {
ResponseError::from_msg(
format!("TaskId is not a valid utf-8 string: {e}"),
Code::BadRequest,
)
})
})
.transpose()?
.map(|s| {
s.parse::<TaskId>().map_err(|e| {
ResponseError::from_msg(
format!(
"Could not parse the TaskId as a {}: {e}",
std::any::type_name::<TaskId>(),
),
Code::BadRequest,
)
})
})
.transpose()?;
Ok(task_id)
}
pub fn is_dry_run(req: &HttpRequest, opt: &Opt) -> Result<bool, ResponseError> {
if !opt.experimental_replication_parameters {
return Ok(false);
}
Ok(req
.headers()
.get("DryRun")
.map(|header| {
header.to_str().map_err(|e| {
ResponseError::from_msg(
format!("DryRun is not a valid utf-8 string: {e}"),
Code::BadRequest,
)
})
})
.transpose()?
.map_or(false, |s| s.to_lowercase() == "true"))
}
#[derive(Debug, Serialize)] #[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct SummarizedTaskView { pub struct SummarizedTaskView {

View File

@ -10,7 +10,8 @@ use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler; use crate::extractors::sequential_extractor::SeqHandler;
use crate::routes::SummarizedTaskView; use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView};
use crate::Opt;
pub fn configure(cfg: &mut web::ServiceConfig) { pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_snapshot)))); cfg.service(web::resource("").route(web::post().to(SeqHandler(create_snapshot))));
@ -19,13 +20,18 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
pub async fn create_snapshot( pub async fn create_snapshot(
index_scheduler: GuardedData<ActionPolicy<{ actions::SNAPSHOTS_CREATE }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::SNAPSHOTS_CREATE }>, Data<IndexScheduler>>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
analytics.publish("Snapshot Created".to_string(), json!({}), Some(&req)); analytics.publish("Snapshot Created".to_string(), json!({}), Some(&req));
let task = KindWithContent::SnapshotCreation; let task = KindWithContent::SnapshotCreation;
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
debug!(returns = ?task, "Create snapshot"); debug!(returns = ?task, "Create snapshot");
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))

View File

@ -10,12 +10,13 @@ use meilisearch_types::index_uid::IndexUid;
use meilisearch_types::tasks::{IndexSwap, KindWithContent}; use meilisearch_types::tasks::{IndexSwap, KindWithContent};
use serde_json::json; use serde_json::json;
use super::SummarizedTaskView; use super::{get_task_id, is_dry_run, SummarizedTaskView};
use crate::analytics::Analytics; use crate::analytics::Analytics;
use crate::error::MeilisearchHttpError; use crate::error::MeilisearchHttpError;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::{AuthenticationError, GuardedData}; use crate::extractors::authentication::{AuthenticationError, GuardedData};
use crate::extractors::sequential_extractor::SeqHandler; use crate::extractors::sequential_extractor::SeqHandler;
use crate::Opt;
pub fn configure(cfg: &mut web::ServiceConfig) { pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(SeqHandler(swap_indexes)))); cfg.service(web::resource("").route(web::post().to(SeqHandler(swap_indexes))));
@ -32,6 +33,7 @@ pub async fn swap_indexes(
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_SWAP }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_SWAP }>, Data<IndexScheduler>>,
params: AwebJson<Vec<SwapIndexesPayload>, DeserrJsonError>, params: AwebJson<Vec<SwapIndexesPayload>, DeserrJsonError>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let params = params.into_inner(); let params = params.into_inner();
@ -60,7 +62,11 @@ pub async fn swap_indexes(
} }
let task = KindWithContent::IndexSwap { swaps }; let task = KindWithContent::IndexSwap { swaps };
let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task: SummarizedTaskView = let task: SummarizedTaskView =
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into(); tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
.await??
.into();
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
} }

View File

@ -18,11 +18,12 @@ use time::macros::format_description;
use time::{Date, Duration, OffsetDateTime, Time}; use time::{Date, Duration, OffsetDateTime, Time};
use tokio::task; use tokio::task;
use super::SummarizedTaskView; use super::{get_task_id, is_dry_run, SummarizedTaskView};
use crate::analytics::Analytics; use crate::analytics::Analytics;
use crate::extractors::authentication::policies::*; use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData; use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler; use crate::extractors::sequential_extractor::SeqHandler;
use crate::Opt;
const DEFAULT_LIMIT: u32 = 20; const DEFAULT_LIMIT: u32 = 20;
@ -161,6 +162,7 @@ async fn cancel_tasks(
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_CANCEL }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_CANCEL }>, Data<IndexScheduler>>,
params: AwebQueryParameter<TaskDeletionOrCancelationQuery, DeserrQueryParamError>, params: AwebQueryParameter<TaskDeletionOrCancelationQuery, DeserrQueryParamError>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let params = params.into_inner(); let params = params.into_inner();
@ -197,7 +199,11 @@ async fn cancel_tasks(
let task_cancelation = let task_cancelation =
KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks }; KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks };
let task = task::spawn_blocking(move || index_scheduler.register(task_cancelation)).await??; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task =
task::spawn_blocking(move || index_scheduler.register(task_cancelation, uid, dry_run))
.await??;
let task: SummarizedTaskView = task.into(); let task: SummarizedTaskView = task.into();
Ok(HttpResponse::Ok().json(task)) Ok(HttpResponse::Ok().json(task))
@ -207,6 +213,7 @@ async fn delete_tasks(
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_DELETE }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_DELETE }>, Data<IndexScheduler>>,
params: AwebQueryParameter<TaskDeletionOrCancelationQuery, DeserrQueryParamError>, params: AwebQueryParameter<TaskDeletionOrCancelationQuery, DeserrQueryParamError>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
let params = params.into_inner(); let params = params.into_inner();
@ -242,7 +249,10 @@ async fn delete_tasks(
let task_deletion = let task_deletion =
KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks }; KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks };
let task = task::spawn_blocking(move || index_scheduler.register(task_deletion)).await??; let uid = get_task_id(&req, &opt)?;
let dry_run = is_dry_run(&req, &opt)?;
let task = task::spawn_blocking(move || index_scheduler.register(task_deletion, uid, dry_run))
.await??;
let task: SummarizedTaskView = task.into(); let task: SummarizedTaskView = task.into();
Ok(HttpResponse::Ok().json(task)) Ok(HttpResponse::Ok().json(task))

View File

@ -100,16 +100,11 @@ impl Index<'_> {
pub async fn raw_add_documents( pub async fn raw_add_documents(
&self, &self,
payload: &str, payload: &str,
content_type: Option<&str>, headers: Vec<(&str, &str)>,
query_parameter: &str, query_parameter: &str,
) -> (Value, StatusCode) { ) -> (Value, StatusCode) {
let url = format!("/indexes/{}/documents{}", urlencode(self.uid.as_ref()), query_parameter); let url = format!("/indexes/{}/documents{}", urlencode(self.uid.as_ref()), query_parameter);
self.service.post_str(url, payload, headers).await
if let Some(content_type) = content_type {
self.service.post_str(url, payload, vec![("Content-Type", content_type)]).await
} else {
self.service.post_str(url, payload, Vec::new()).await
}
} }
pub async fn update_documents( pub async fn update_documents(

View File

@ -1,10 +1,11 @@
use actix_web::test; use actix_web::test;
use meili_snap::{json_string, snapshot}; use meili_snap::{json_string, snapshot};
use meilisearch::Opt;
use time::format_description::well_known::Rfc3339; use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime; use time::OffsetDateTime;
use crate::common::encoder::Encoder; use crate::common::encoder::Encoder;
use crate::common::{GetAllDocumentsOptions, Server, Value}; use crate::common::{default_settings, GetAllDocumentsOptions, Server, Value};
use crate::json; use crate::json;
/// This is the basic usage of our API and every other tests uses the content-type application/json /// This is the basic usage of our API and every other tests uses the content-type application/json
@ -2157,3 +2158,49 @@ async fn batch_several_documents_addition() {
assert_eq!(code, 200, "failed with `{}`", response); assert_eq!(code, 200, "failed with `{}`", response);
assert_eq!(response["results"].as_array().unwrap().len(), 120); assert_eq!(response["results"].as_array().unwrap().len(), 120);
} }
#[actix_rt::test]
async fn dry_register_file() {
let temp = tempfile::tempdir().unwrap();
let options =
Opt { experimental_replication_parameters: true, ..default_settings(temp.path()) };
let server = Server::new_with_options(options).await.unwrap();
let index = server.index("tamo");
let documents = r#"
{
"id": "12",
"doggo": "kefir"
}
"#;
let (response, code) = index
.raw_add_documents(
documents,
vec![("Content-Type", "application/json"), ("DryRun", "true")],
"",
)
.await;
snapshot!(response, @r###"
{
"taskUid": 0,
"indexUid": "tamo",
"status": "enqueued",
"type": "documentAdditionOrUpdate",
"enqueuedAt": "[date]"
}
"###);
snapshot!(code, @"202 Accepted");
let (response, code) = index.get_task(response.uid()).await;
snapshot!(response, @r###"
{
"message": "Task `0` not found.",
"code": "task_not_found",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#task_not_found"
}
"###);
snapshot!(code, @"404 Not Found");
}

View File

@ -209,7 +209,8 @@ async fn replace_documents_missing_payload() {
let server = Server::new().await; let server = Server::new().await;
let index = server.index("test"); let index = server.index("test");
let (response, code) = index.raw_add_documents("", Some("application/json"), "").await; let (response, code) =
index.raw_add_documents("", vec![("Content-Type", "application/json")], "").await;
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {
@ -220,7 +221,8 @@ async fn replace_documents_missing_payload() {
} }
"###); "###);
let (response, code) = index.raw_add_documents("", Some("application/x-ndjson"), "").await; let (response, code) =
index.raw_add_documents("", vec![("Content-Type", "application/x-ndjson")], "").await;
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {
@ -231,7 +233,8 @@ async fn replace_documents_missing_payload() {
} }
"###); "###);
let (response, code) = index.raw_add_documents("", Some("text/csv"), "").await; let (response, code) =
index.raw_add_documents("", vec![("Content-Type", "text/csv")], "").await;
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {
@ -287,7 +290,7 @@ async fn replace_documents_missing_content_type() {
let server = Server::new().await; let server = Server::new().await;
let index = server.index("test"); let index = server.index("test");
let (response, code) = index.raw_add_documents("", None, "").await; let (response, code) = index.raw_add_documents("", Vec::new(), "").await;
snapshot!(code, @"415 Unsupported Media Type"); snapshot!(code, @"415 Unsupported Media Type");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {
@ -299,7 +302,7 @@ async fn replace_documents_missing_content_type() {
"###); "###);
// even with a csv delimiter specified this error is triggered first // even with a csv delimiter specified this error is triggered first
let (response, code) = index.raw_add_documents("", None, "?csvDelimiter=;").await; let (response, code) = index.raw_add_documents("", Vec::new(), "?csvDelimiter=;").await;
snapshot!(code, @"415 Unsupported Media Type"); snapshot!(code, @"415 Unsupported Media Type");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {
@ -345,7 +348,7 @@ async fn replace_documents_bad_content_type() {
let server = Server::new().await; let server = Server::new().await;
let index = server.index("test"); let index = server.index("test");
let (response, code) = index.raw_add_documents("", Some("doggo"), "").await; let (response, code) = index.raw_add_documents("", vec![("Content-Type", "doggo")], "").await;
snapshot!(code, @"415 Unsupported Media Type"); snapshot!(code, @"415 Unsupported Media Type");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {
@ -379,8 +382,9 @@ async fn replace_documents_bad_csv_delimiter() {
let server = Server::new().await; let server = Server::new().await;
let index = server.index("test"); let index = server.index("test");
let (response, code) = let (response, code) = index
index.raw_add_documents("", Some("application/json"), "?csvDelimiter").await; .raw_add_documents("", vec![("Content-Type", "application/json")], "?csvDelimiter")
.await;
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {
@ -391,8 +395,9 @@ async fn replace_documents_bad_csv_delimiter() {
} }
"###); "###);
let (response, code) = let (response, code) = index
index.raw_add_documents("", Some("application/json"), "?csvDelimiter=doggo").await; .raw_add_documents("", vec![("Content-Type", "application/json")], "?csvDelimiter=doggo")
.await;
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {
@ -404,7 +409,11 @@ async fn replace_documents_bad_csv_delimiter() {
"###); "###);
let (response, code) = index let (response, code) = index
.raw_add_documents("", Some("application/json"), &format!("?csvDelimiter={}", encode("🍰"))) .raw_add_documents(
"",
vec![("Content-Type", "application/json")],
&format!("?csvDelimiter={}", encode("🍰")),
)
.await; .await;
snapshot!(code, @"400 Bad Request"); snapshot!(code, @"400 Bad Request");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
@ -469,8 +478,9 @@ async fn replace_documents_csv_delimiter_with_bad_content_type() {
let server = Server::new().await; let server = Server::new().await;
let index = server.index("test"); let index = server.index("test");
let (response, code) = let (response, code) = index
index.raw_add_documents("", Some("application/json"), "?csvDelimiter=a").await; .raw_add_documents("", vec![("Content-Type", "application/json")], "?csvDelimiter=a")
.await;
snapshot!(code, @"415 Unsupported Media Type"); snapshot!(code, @"415 Unsupported Media Type");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {
@ -481,8 +491,9 @@ async fn replace_documents_csv_delimiter_with_bad_content_type() {
} }
"###); "###);
let (response, code) = let (response, code) = index
index.raw_add_documents("", Some("application/x-ndjson"), "?csvDelimiter=a").await; .raw_add_documents("", vec![("Content-Type", "application/x-ndjson")], "?csvDelimiter=a")
.await;
snapshot!(code, @"415 Unsupported Media Type"); snapshot!(code, @"415 Unsupported Media Type");
snapshot!(json_string!(response), @r###" snapshot!(json_string!(response), @r###"
{ {

View File

@ -2,9 +2,10 @@ use actix_web::http::header::ContentType;
use actix_web::test; use actix_web::test;
use http::header::ACCEPT_ENCODING; use http::header::ACCEPT_ENCODING;
use meili_snap::{json_string, snapshot}; use meili_snap::{json_string, snapshot};
use meilisearch::Opt;
use crate::common::encoder::Encoder; use crate::common::encoder::Encoder;
use crate::common::{Server, Value}; use crate::common::{default_settings, Server, Value};
use crate::json; use crate::json;
#[actix_rt::test] #[actix_rt::test]
@ -199,3 +200,79 @@ async fn error_create_with_invalid_index_uid() {
} }
"###); "###);
} }
#[actix_rt::test]
async fn send_task_id() {
let temp = tempfile::tempdir().unwrap();
let options =
Opt { experimental_replication_parameters: true, ..default_settings(temp.path()) };
let server = Server::new_with_options(options).await.unwrap();
let app = server.init_web_app().await;
let index = server.index("catto");
let (response, code) = index.create(None).await;
snapshot!(code, @"202 Accepted");
snapshot!(json_string!(response, { ".enqueuedAt" => "[date]" }), @r###"
{
"taskUid": 0,
"indexUid": "catto",
"status": "enqueued",
"type": "indexCreation",
"enqueuedAt": "[date]"
}
"###);
let body = serde_json::to_string(&json!({
"uid": "doggo",
"primaryKey": None::<&str>,
}))
.unwrap();
let req = test::TestRequest::post()
.uri("/indexes")
.insert_header(("TaskId", "25"))
.insert_header(ContentType::json())
.set_payload(body)
.to_request();
let res = test::call_service(&app, req).await;
snapshot!(res.status(), @"202 Accepted");
let bytes = test::read_body(res).await;
let response = serde_json::from_slice::<Value>(&bytes).expect("Expecting valid json");
snapshot!(json_string!(response, { ".enqueuedAt" => "[date]" }), @r###"
{
"taskUid": 25,
"indexUid": "doggo",
"status": "enqueued",
"type": "indexCreation",
"enqueuedAt": "[date]"
}
"###);
let body = serde_json::to_string(&json!({
"uid": "girafo",
"primaryKey": None::<&str>,
}))
.unwrap();
let req = test::TestRequest::post()
.uri("/indexes")
.insert_header(("TaskId", "12"))
.insert_header(ContentType::json())
.set_payload(body)
.to_request();
let res = test::call_service(&app, req).await;
snapshot!(res.status(), @"400 Bad Request");
let bytes = test::read_body(res).await;
let response = serde_json::from_slice::<Value>(&bytes).expect("Expecting valid json");
snapshot!(json_string!(response), @r###"
{
"message": "Received bad task id: 12 should be >= to 26.",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"
}
"###);
}