mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-12-23 13:10:06 +01:00
Merge #4042
4042: Implements the new replication parameters r=ManyTheFish a=irevoire ### This PR implements the necessary parameters for the High Availability - [ ] Update the spec Introduce a new CLI flag called `--experimental-replication-parameters` that changes a few behaviors in the engine: - [The auto-deletion of tasks is disabled](https://specs.meilisearch.com/specifications/text/0060-tasks-api.html#_2-technical-details) - Upon registering a task, you can choose its task ID by sending a new header: `TaskId: 456645`. It must be a valid number, which must be superior to the last task id ever seen. - Add the ability to « dry-register » a task. That means meilisearch will answer to you with a valid task ID like everything went well, but won’t actually write anything in the database. To do that, you need to use the `DryRun: true` header. ---- Old prototype `prototype-custom-task-id-0`: - Adds the capability to specify your own task ID via the `TaskId` http header - Make the task IDs a u64 instead of a u32 Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
commit
938149f814
@ -1,5 +1,5 @@
|
|||||||
use std::fs::File as StdFile;
|
use std::fs::File as StdFile;
|
||||||
use std::ops::{Deref, DerefMut};
|
use std::io::Write;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
|
||||||
@ -22,20 +22,6 @@ pub enum Error {
|
|||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, Error>;
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
|
|
||||||
impl Deref for File {
|
|
||||||
type Target = NamedTempFile;
|
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target {
|
|
||||||
&self.file
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DerefMut for File {
|
|
||||||
fn deref_mut(&mut self) -> &mut Self::Target {
|
|
||||||
&mut self.file
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct FileStore {
|
pub struct FileStore {
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
@ -56,7 +42,7 @@ impl FileStore {
|
|||||||
let file = NamedTempFile::new_in(&self.path)?;
|
let file = NamedTempFile::new_in(&self.path)?;
|
||||||
let uuid = Uuid::new_v4();
|
let uuid = Uuid::new_v4();
|
||||||
let path = self.path.join(uuid.to_string());
|
let path = self.path.join(uuid.to_string());
|
||||||
let update_file = File { file, path };
|
let update_file = File { file: Some(file), path };
|
||||||
|
|
||||||
Ok((uuid, update_file))
|
Ok((uuid, update_file))
|
||||||
}
|
}
|
||||||
@ -67,7 +53,7 @@ impl FileStore {
|
|||||||
let file = NamedTempFile::new_in(&self.path)?;
|
let file = NamedTempFile::new_in(&self.path)?;
|
||||||
let uuid = Uuid::from_u128(uuid);
|
let uuid = Uuid::from_u128(uuid);
|
||||||
let path = self.path.join(uuid.to_string());
|
let path = self.path.join(uuid.to_string());
|
||||||
let update_file = File { file, path };
|
let update_file = File { file: Some(file), path };
|
||||||
|
|
||||||
Ok((uuid, update_file))
|
Ok((uuid, update_file))
|
||||||
}
|
}
|
||||||
@ -136,16 +122,40 @@ impl FileStore {
|
|||||||
|
|
||||||
pub struct File {
|
pub struct File {
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
file: NamedTempFile,
|
file: Option<NamedTempFile>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl File {
|
impl File {
|
||||||
|
pub fn dry_file() -> Result<Self> {
|
||||||
|
Ok(Self { path: PathBuf::new(), file: None })
|
||||||
|
}
|
||||||
|
|
||||||
pub fn persist(self) -> Result<()> {
|
pub fn persist(self) -> Result<()> {
|
||||||
self.file.persist(&self.path)?;
|
if let Some(file) = self.file {
|
||||||
|
file.persist(&self.path)?;
|
||||||
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Write for File {
|
||||||
|
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||||
|
if let Some(file) = self.file.as_mut() {
|
||||||
|
file.write(buf)
|
||||||
|
} else {
|
||||||
|
Ok(buf.len())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn flush(&mut self) -> std::io::Result<()> {
|
||||||
|
if let Some(file) = self.file.as_mut() {
|
||||||
|
file.flush()
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
|
@ -48,6 +48,8 @@ impl From<DateField> for Code {
|
|||||||
pub enum Error {
|
pub enum Error {
|
||||||
#[error("{1}")]
|
#[error("{1}")]
|
||||||
WithCustomErrorCode(Code, Box<Self>),
|
WithCustomErrorCode(Code, Box<Self>),
|
||||||
|
#[error("Received bad task id: {received} should be >= to {expected}.")]
|
||||||
|
BadTaskId { received: TaskId, expected: TaskId },
|
||||||
#[error("Index `{0}` not found.")]
|
#[error("Index `{0}` not found.")]
|
||||||
IndexNotFound(String),
|
IndexNotFound(String),
|
||||||
#[error("Index `{0}` already exists.")]
|
#[error("Index `{0}` already exists.")]
|
||||||
@ -161,6 +163,7 @@ impl Error {
|
|||||||
match self {
|
match self {
|
||||||
Error::IndexNotFound(_)
|
Error::IndexNotFound(_)
|
||||||
| Error::WithCustomErrorCode(_, _)
|
| Error::WithCustomErrorCode(_, _)
|
||||||
|
| Error::BadTaskId { .. }
|
||||||
| Error::IndexAlreadyExists(_)
|
| Error::IndexAlreadyExists(_)
|
||||||
| Error::SwapDuplicateIndexFound(_)
|
| Error::SwapDuplicateIndexFound(_)
|
||||||
| Error::SwapDuplicateIndexesFound(_)
|
| Error::SwapDuplicateIndexesFound(_)
|
||||||
@ -205,6 +208,7 @@ impl ErrorCode for Error {
|
|||||||
fn error_code(&self) -> Code {
|
fn error_code(&self) -> Code {
|
||||||
match self {
|
match self {
|
||||||
Error::WithCustomErrorCode(code, _) => *code,
|
Error::WithCustomErrorCode(code, _) => *code,
|
||||||
|
Error::BadTaskId { .. } => Code::BadRequest,
|
||||||
Error::IndexNotFound(_) => Code::IndexNotFound,
|
Error::IndexNotFound(_) => Code::IndexNotFound,
|
||||||
Error::IndexAlreadyExists(_) => Code::IndexAlreadyExists,
|
Error::IndexAlreadyExists(_) => Code::IndexAlreadyExists,
|
||||||
Error::SwapDuplicateIndexesFound(_) => Code::InvalidSwapDuplicateIndexFound,
|
Error::SwapDuplicateIndexesFound(_) => Code::InvalidSwapDuplicateIndexFound,
|
||||||
|
@ -15,6 +15,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
|
|||||||
|
|
||||||
let IndexScheduler {
|
let IndexScheduler {
|
||||||
autobatching_enabled,
|
autobatching_enabled,
|
||||||
|
cleanup_enabled: _,
|
||||||
must_stop_processing: _,
|
must_stop_processing: _,
|
||||||
processing_tasks,
|
processing_tasks,
|
||||||
file_store,
|
file_store,
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,90 @@
|
|||||||
|
---
|
||||||
|
source: index-scheduler/src/lib.rs
|
||||||
|
---
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"uid": 0,
|
||||||
|
"enqueuedAt": "[date]",
|
||||||
|
"startedAt": "[date]",
|
||||||
|
"finishedAt": "[date]",
|
||||||
|
"error": null,
|
||||||
|
"canceledBy": null,
|
||||||
|
"details": {
|
||||||
|
"IndexInfo": {
|
||||||
|
"primary_key": null
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"status": "succeeded",
|
||||||
|
"kind": {
|
||||||
|
"indexCreation": {
|
||||||
|
"index_uid": "doggo",
|
||||||
|
"primary_key": null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"uid": 1,
|
||||||
|
"enqueuedAt": "[date]",
|
||||||
|
"startedAt": "[date]",
|
||||||
|
"finishedAt": "[date]",
|
||||||
|
"error": {
|
||||||
|
"message": "Index `doggo` already exists.",
|
||||||
|
"code": "index_already_exists",
|
||||||
|
"type": "invalid_request",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#index_already_exists"
|
||||||
|
},
|
||||||
|
"canceledBy": null,
|
||||||
|
"details": {
|
||||||
|
"IndexInfo": {
|
||||||
|
"primary_key": null
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"status": "failed",
|
||||||
|
"kind": {
|
||||||
|
"indexCreation": {
|
||||||
|
"index_uid": "doggo",
|
||||||
|
"primary_key": null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"uid": 2,
|
||||||
|
"enqueuedAt": "[date]",
|
||||||
|
"startedAt": "[date]",
|
||||||
|
"finishedAt": "[date]",
|
||||||
|
"error": null,
|
||||||
|
"canceledBy": null,
|
||||||
|
"details": {
|
||||||
|
"IndexInfo": {
|
||||||
|
"primary_key": null
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"status": "enqueued",
|
||||||
|
"kind": {
|
||||||
|
"indexCreation": {
|
||||||
|
"index_uid": "doggo",
|
||||||
|
"primary_key": null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"uid": 3,
|
||||||
|
"enqueuedAt": "[date]",
|
||||||
|
"startedAt": "[date]",
|
||||||
|
"finishedAt": "[date]",
|
||||||
|
"error": null,
|
||||||
|
"canceledBy": null,
|
||||||
|
"details": {
|
||||||
|
"IndexInfo": {
|
||||||
|
"primary_key": null
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"status": "enqueued",
|
||||||
|
"kind": {
|
||||||
|
"indexCreation": {
|
||||||
|
"index_uid": "doggo",
|
||||||
|
"primary_key": null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
@ -0,0 +1,90 @@
|
|||||||
|
---
|
||||||
|
source: index-scheduler/src/lib.rs
|
||||||
|
---
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"uid": 0,
|
||||||
|
"enqueuedAt": "[date]",
|
||||||
|
"startedAt": "[date]",
|
||||||
|
"finishedAt": "[date]",
|
||||||
|
"error": null,
|
||||||
|
"canceledBy": null,
|
||||||
|
"details": {
|
||||||
|
"IndexInfo": {
|
||||||
|
"primary_key": null
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"status": "succeeded",
|
||||||
|
"kind": {
|
||||||
|
"indexCreation": {
|
||||||
|
"index_uid": "doggo",
|
||||||
|
"primary_key": null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"uid": 1,
|
||||||
|
"enqueuedAt": "[date]",
|
||||||
|
"startedAt": "[date]",
|
||||||
|
"finishedAt": "[date]",
|
||||||
|
"error": {
|
||||||
|
"message": "Index `doggo` already exists.",
|
||||||
|
"code": "index_already_exists",
|
||||||
|
"type": "invalid_request",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#index_already_exists"
|
||||||
|
},
|
||||||
|
"canceledBy": null,
|
||||||
|
"details": {
|
||||||
|
"IndexInfo": {
|
||||||
|
"primary_key": null
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"status": "failed",
|
||||||
|
"kind": {
|
||||||
|
"indexCreation": {
|
||||||
|
"index_uid": "doggo",
|
||||||
|
"primary_key": null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"uid": 2,
|
||||||
|
"enqueuedAt": "[date]",
|
||||||
|
"startedAt": "[date]",
|
||||||
|
"finishedAt": "[date]",
|
||||||
|
"error": null,
|
||||||
|
"canceledBy": null,
|
||||||
|
"details": {
|
||||||
|
"IndexInfo": {
|
||||||
|
"primary_key": null
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"status": "enqueued",
|
||||||
|
"kind": {
|
||||||
|
"indexCreation": {
|
||||||
|
"index_uid": "doggo",
|
||||||
|
"primary_key": null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"uid": 3,
|
||||||
|
"enqueuedAt": "[date]",
|
||||||
|
"startedAt": "[date]",
|
||||||
|
"finishedAt": "[date]",
|
||||||
|
"error": null,
|
||||||
|
"canceledBy": null,
|
||||||
|
"details": {
|
||||||
|
"IndexInfo": {
|
||||||
|
"primary_key": null
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"status": "enqueued",
|
||||||
|
"kind": {
|
||||||
|
"indexCreation": {
|
||||||
|
"index_uid": "doggo",
|
||||||
|
"primary_key": null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
@ -1,6 +1,6 @@
|
|||||||
use std::fmt::{self, Debug, Display};
|
use std::fmt::{self, Debug, Display};
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
use std::io::{self, Seek, Write};
|
use std::io::{self, BufWriter, Write};
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
|
|
||||||
use memmap2::MmapOptions;
|
use memmap2::MmapOptions;
|
||||||
@ -104,8 +104,8 @@ impl ErrorCode for DocumentFormatError {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Reads CSV from input and write an obkv batch to writer.
|
/// Reads CSV from input and write an obkv batch to writer.
|
||||||
pub fn read_csv(file: &File, writer: impl Write + Seek, delimiter: u8) -> Result<u64> {
|
pub fn read_csv(file: &File, writer: impl Write, delimiter: u8) -> Result<u64> {
|
||||||
let mut builder = DocumentsBatchBuilder::new(writer);
|
let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer));
|
||||||
let mmap = unsafe { MmapOptions::new().map(file)? };
|
let mmap = unsafe { MmapOptions::new().map(file)? };
|
||||||
let csv = csv::ReaderBuilder::new().delimiter(delimiter).from_reader(mmap.as_ref());
|
let csv = csv::ReaderBuilder::new().delimiter(delimiter).from_reader(mmap.as_ref());
|
||||||
builder.append_csv(csv).map_err(|e| (PayloadType::Csv { delimiter }, e))?;
|
builder.append_csv(csv).map_err(|e| (PayloadType::Csv { delimiter }, e))?;
|
||||||
@ -117,8 +117,8 @@ pub fn read_csv(file: &File, writer: impl Write + Seek, delimiter: u8) -> Result
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Reads JSON from temporary file and write an obkv batch to writer.
|
/// Reads JSON from temporary file and write an obkv batch to writer.
|
||||||
pub fn read_json(file: &File, writer: impl Write + Seek) -> Result<u64> {
|
pub fn read_json(file: &File, writer: impl Write) -> Result<u64> {
|
||||||
let mut builder = DocumentsBatchBuilder::new(writer);
|
let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer));
|
||||||
let mmap = unsafe { MmapOptions::new().map(file)? };
|
let mmap = unsafe { MmapOptions::new().map(file)? };
|
||||||
let mut deserializer = serde_json::Deserializer::from_slice(&mmap);
|
let mut deserializer = serde_json::Deserializer::from_slice(&mmap);
|
||||||
|
|
||||||
@ -151,8 +151,8 @@ pub fn read_json(file: &File, writer: impl Write + Seek) -> Result<u64> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Reads JSON from temporary file and write an obkv batch to writer.
|
/// Reads JSON from temporary file and write an obkv batch to writer.
|
||||||
pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result<u64> {
|
pub fn read_ndjson(file: &File, writer: impl Write) -> Result<u64> {
|
||||||
let mut builder = DocumentsBatchBuilder::new(writer);
|
let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer));
|
||||||
let mmap = unsafe { MmapOptions::new().map(file)? };
|
let mmap = unsafe { MmapOptions::new().map(file)? };
|
||||||
|
|
||||||
for result in serde_json::Deserializer::from_slice(&mmap).into_iter() {
|
for result in serde_json::Deserializer::from_slice(&mmap).into_iter() {
|
||||||
|
@ -250,6 +250,7 @@ impl super::Analytics for SegmentAnalytics {
|
|||||||
struct Infos {
|
struct Infos {
|
||||||
env: String,
|
env: String,
|
||||||
experimental_enable_metrics: bool,
|
experimental_enable_metrics: bool,
|
||||||
|
experimental_replication_parameters: bool,
|
||||||
experimental_enable_logs_route: bool,
|
experimental_enable_logs_route: bool,
|
||||||
experimental_reduce_indexing_memory_usage: bool,
|
experimental_reduce_indexing_memory_usage: bool,
|
||||||
experimental_max_number_of_batched_tasks: usize,
|
experimental_max_number_of_batched_tasks: usize,
|
||||||
@ -288,6 +289,7 @@ impl From<Opt> for Infos {
|
|||||||
let Opt {
|
let Opt {
|
||||||
db_path,
|
db_path,
|
||||||
experimental_enable_metrics,
|
experimental_enable_metrics,
|
||||||
|
experimental_replication_parameters,
|
||||||
experimental_enable_logs_route,
|
experimental_enable_logs_route,
|
||||||
experimental_reduce_indexing_memory_usage,
|
experimental_reduce_indexing_memory_usage,
|
||||||
experimental_max_number_of_batched_tasks,
|
experimental_max_number_of_batched_tasks,
|
||||||
@ -335,6 +337,7 @@ impl From<Opt> for Infos {
|
|||||||
Self {
|
Self {
|
||||||
env,
|
env,
|
||||||
experimental_enable_metrics,
|
experimental_enable_metrics,
|
||||||
|
experimental_replication_parameters,
|
||||||
experimental_enable_logs_route,
|
experimental_enable_logs_route,
|
||||||
experimental_reduce_indexing_memory_usage,
|
experimental_reduce_indexing_memory_usage,
|
||||||
db_path: db_path != PathBuf::from("./data.ms"),
|
db_path: db_path != PathBuf::from("./data.ms"),
|
||||||
|
@ -131,6 +131,7 @@ gen_seq! { SeqFromRequestFut3; A B C }
|
|||||||
gen_seq! { SeqFromRequestFut4; A B C D }
|
gen_seq! { SeqFromRequestFut4; A B C D }
|
||||||
gen_seq! { SeqFromRequestFut5; A B C D E }
|
gen_seq! { SeqFromRequestFut5; A B C D E }
|
||||||
gen_seq! { SeqFromRequestFut6; A B C D E F }
|
gen_seq! { SeqFromRequestFut6; A B C D E F }
|
||||||
|
gen_seq! { SeqFromRequestFut7; A B C D E F G }
|
||||||
|
|
||||||
pin_project! {
|
pin_project! {
|
||||||
#[project = ExtractProj]
|
#[project = ExtractProj]
|
||||||
|
@ -251,7 +251,9 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
|
|||||||
.name(String::from("register-snapshot-tasks"))
|
.name(String::from("register-snapshot-tasks"))
|
||||||
.spawn(move || loop {
|
.spawn(move || loop {
|
||||||
thread::sleep(snapshot_delay);
|
thread::sleep(snapshot_delay);
|
||||||
if let Err(e) = index_scheduler.register(KindWithContent::SnapshotCreation) {
|
if let Err(e) =
|
||||||
|
index_scheduler.register(KindWithContent::SnapshotCreation, None, false)
|
||||||
|
{
|
||||||
error!("Error while registering snapshot: {}", e);
|
error!("Error while registering snapshot: {}", e);
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@ -286,6 +288,7 @@ fn open_or_create_database_unchecked(
|
|||||||
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
|
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
|
||||||
indexer_config: (&opt.indexer_options).try_into()?,
|
indexer_config: (&opt.indexer_options).try_into()?,
|
||||||
autobatching_enabled: true,
|
autobatching_enabled: true,
|
||||||
|
cleanup_enabled: !opt.experimental_replication_parameters,
|
||||||
max_number_of_tasks: 1_000_000,
|
max_number_of_tasks: 1_000_000,
|
||||||
max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks,
|
max_number_of_batched_tasks: opt.experimental_max_number_of_batched_tasks,
|
||||||
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
|
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
|
||||||
@ -453,6 +456,7 @@ pub fn configure_data(
|
|||||||
.app_data(auth)
|
.app_data(auth)
|
||||||
.app_data(web::Data::from(analytics))
|
.app_data(web::Data::from(analytics))
|
||||||
.app_data(web::Data::new(logs))
|
.app_data(web::Data::new(logs))
|
||||||
|
.app_data(web::Data::new(opt.clone()))
|
||||||
.app_data(
|
.app_data(
|
||||||
web::JsonConfig::default()
|
web::JsonConfig::default()
|
||||||
.limit(http_payload_size_limit)
|
.limit(http_payload_size_limit)
|
||||||
|
@ -51,6 +51,7 @@ const MEILI_IGNORE_MISSING_DUMP: &str = "MEILI_IGNORE_MISSING_DUMP";
|
|||||||
const MEILI_IGNORE_DUMP_IF_DB_EXISTS: &str = "MEILI_IGNORE_DUMP_IF_DB_EXISTS";
|
const MEILI_IGNORE_DUMP_IF_DB_EXISTS: &str = "MEILI_IGNORE_DUMP_IF_DB_EXISTS";
|
||||||
const MEILI_DUMP_DIR: &str = "MEILI_DUMP_DIR";
|
const MEILI_DUMP_DIR: &str = "MEILI_DUMP_DIR";
|
||||||
const MEILI_LOG_LEVEL: &str = "MEILI_LOG_LEVEL";
|
const MEILI_LOG_LEVEL: &str = "MEILI_LOG_LEVEL";
|
||||||
|
const MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS: &str = "MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS";
|
||||||
const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE";
|
const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE";
|
||||||
const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS";
|
const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS";
|
||||||
const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str =
|
const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str =
|
||||||
@ -317,6 +318,16 @@ pub struct Opt {
|
|||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub experimental_enable_logs_route: bool,
|
pub experimental_enable_logs_route: bool,
|
||||||
|
|
||||||
|
/// Enable multiple features that helps you to run meilisearch in a replicated context.
|
||||||
|
/// For more information, see: <https://github.com/orgs/meilisearch/discussions/725>
|
||||||
|
///
|
||||||
|
/// - /!\ Disable the automatic clean up of old processed tasks, you're in charge of that now
|
||||||
|
/// - Lets you specify a custom task ID upon registering a task
|
||||||
|
/// - Lets you execute dry-register a task (get an answer from the route but nothing is actually registered in meilisearch and it won't be processed)
|
||||||
|
#[clap(long, env = MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS)]
|
||||||
|
#[serde(default)]
|
||||||
|
pub experimental_replication_parameters: bool,
|
||||||
|
|
||||||
/// Experimental RAM reduction during indexing, do not use in production, see: <https://github.com/meilisearch/product/discussions/652>
|
/// Experimental RAM reduction during indexing, do not use in production, see: <https://github.com/meilisearch/product/discussions/652>
|
||||||
#[clap(long, env = MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE)]
|
#[clap(long, env = MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE)]
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
@ -423,6 +434,7 @@ impl Opt {
|
|||||||
no_analytics,
|
no_analytics,
|
||||||
experimental_enable_metrics,
|
experimental_enable_metrics,
|
||||||
experimental_enable_logs_route,
|
experimental_enable_logs_route,
|
||||||
|
experimental_replication_parameters,
|
||||||
experimental_reduce_indexing_memory_usage,
|
experimental_reduce_indexing_memory_usage,
|
||||||
} = self;
|
} = self;
|
||||||
export_to_env_if_not_present(MEILI_DB_PATH, db_path);
|
export_to_env_if_not_present(MEILI_DB_PATH, db_path);
|
||||||
@ -479,6 +491,10 @@ impl Opt {
|
|||||||
MEILI_EXPERIMENTAL_ENABLE_METRICS,
|
MEILI_EXPERIMENTAL_ENABLE_METRICS,
|
||||||
experimental_enable_metrics.to_string(),
|
experimental_enable_metrics.to_string(),
|
||||||
);
|
);
|
||||||
|
export_to_env_if_not_present(
|
||||||
|
MEILI_EXPERIMENTAL_REPLICATION_PARAMETERS,
|
||||||
|
experimental_replication_parameters.to_string(),
|
||||||
|
);
|
||||||
export_to_env_if_not_present(
|
export_to_env_if_not_present(
|
||||||
MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE,
|
MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE,
|
||||||
experimental_enable_logs_route.to_string(),
|
experimental_enable_logs_route.to_string(),
|
||||||
|
@ -11,7 +11,8 @@ use crate::analytics::Analytics;
|
|||||||
use crate::extractors::authentication::policies::*;
|
use crate::extractors::authentication::policies::*;
|
||||||
use crate::extractors::authentication::GuardedData;
|
use crate::extractors::authentication::GuardedData;
|
||||||
use crate::extractors::sequential_extractor::SeqHandler;
|
use crate::extractors::sequential_extractor::SeqHandler;
|
||||||
use crate::routes::SummarizedTaskView;
|
use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView};
|
||||||
|
use crate::Opt;
|
||||||
|
|
||||||
pub fn configure(cfg: &mut web::ServiceConfig) {
|
pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||||
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_dump))));
|
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_dump))));
|
||||||
@ -21,6 +22,7 @@ pub async fn create_dump(
|
|||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DUMPS_CREATE }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::DUMPS_CREATE }>, Data<IndexScheduler>>,
|
||||||
auth_controller: GuardedData<ActionPolicy<{ actions::DUMPS_CREATE }>, Data<AuthController>>,
|
auth_controller: GuardedData<ActionPolicy<{ actions::DUMPS_CREATE }>, Data<AuthController>>,
|
||||||
req: HttpRequest,
|
req: HttpRequest,
|
||||||
|
opt: web::Data<Opt>,
|
||||||
analytics: web::Data<dyn Analytics>,
|
analytics: web::Data<dyn Analytics>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
analytics.publish("Dump Created".to_string(), json!({}), Some(&req));
|
analytics.publish("Dump Created".to_string(), json!({}), Some(&req));
|
||||||
@ -29,8 +31,12 @@ pub async fn create_dump(
|
|||||||
keys: auth_controller.list_keys()?,
|
keys: auth_controller.list_keys()?,
|
||||||
instance_uid: analytics.instance_uid().cloned(),
|
instance_uid: analytics.instance_uid().cloned(),
|
||||||
};
|
};
|
||||||
|
let uid = get_task_id(&req, &opt)?;
|
||||||
|
let dry_run = is_dry_run(&req, &opt)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||||
|
.await??
|
||||||
|
.into();
|
||||||
|
|
||||||
debug!(returns = ?task, "Create dump");
|
debug!(returns = ?task, "Create dump");
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
|
@ -7,7 +7,7 @@ use bstr::ByteSlice as _;
|
|||||||
use deserr::actix_web::{AwebJson, AwebQueryParameter};
|
use deserr::actix_web::{AwebJson, AwebQueryParameter};
|
||||||
use deserr::Deserr;
|
use deserr::Deserr;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use index_scheduler::IndexScheduler;
|
use index_scheduler::{IndexScheduler, TaskId};
|
||||||
use meilisearch_types::deserr::query_params::Param;
|
use meilisearch_types::deserr::query_params::Param;
|
||||||
use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
|
use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
|
||||||
use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType};
|
use meilisearch_types::document_formats::{read_csv, read_json, read_ndjson, PayloadType};
|
||||||
@ -36,8 +36,11 @@ use crate::extractors::authentication::policies::*;
|
|||||||
use crate::extractors::authentication::GuardedData;
|
use crate::extractors::authentication::GuardedData;
|
||||||
use crate::extractors::payload::Payload;
|
use crate::extractors::payload::Payload;
|
||||||
use crate::extractors::sequential_extractor::SeqHandler;
|
use crate::extractors::sequential_extractor::SeqHandler;
|
||||||
use crate::routes::{PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
|
use crate::routes::{
|
||||||
|
get_task_id, is_dry_run, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT,
|
||||||
|
};
|
||||||
use crate::search::parse_filter;
|
use crate::search::parse_filter;
|
||||||
|
use crate::Opt;
|
||||||
|
|
||||||
static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| {
|
static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| {
|
||||||
vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()]
|
vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()]
|
||||||
@ -119,6 +122,7 @@ pub async fn delete_document(
|
|||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
|
||||||
path: web::Path<DocumentParam>,
|
path: web::Path<DocumentParam>,
|
||||||
req: HttpRequest,
|
req: HttpRequest,
|
||||||
|
opt: web::Data<Opt>,
|
||||||
analytics: web::Data<dyn Analytics>,
|
analytics: web::Data<dyn Analytics>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
let DocumentParam { index_uid, document_id } = path.into_inner();
|
let DocumentParam { index_uid, document_id } = path.into_inner();
|
||||||
@ -130,9 +134,13 @@ pub async fn delete_document(
|
|||||||
index_uid: index_uid.to_string(),
|
index_uid: index_uid.to_string(),
|
||||||
documents_ids: vec![document_id],
|
documents_ids: vec![document_id],
|
||||||
};
|
};
|
||||||
|
let uid = get_task_id(&req, &opt)?;
|
||||||
|
let dry_run = is_dry_run(&req, &opt)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||||
debug!(returns = ?task, "Delete document");
|
.await??
|
||||||
|
.into();
|
||||||
|
debug!("returns: {:?}", task);
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -267,6 +275,7 @@ pub async fn replace_documents(
|
|||||||
params: AwebQueryParameter<UpdateDocumentsQuery, DeserrQueryParamError>,
|
params: AwebQueryParameter<UpdateDocumentsQuery, DeserrQueryParamError>,
|
||||||
body: Payload,
|
body: Payload,
|
||||||
req: HttpRequest,
|
req: HttpRequest,
|
||||||
|
opt: web::Data<Opt>,
|
||||||
analytics: web::Data<dyn Analytics>,
|
analytics: web::Data<dyn Analytics>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||||
@ -277,6 +286,8 @@ pub async fn replace_documents(
|
|||||||
analytics.add_documents(¶ms, index_scheduler.index(&index_uid).is_err(), &req);
|
analytics.add_documents(¶ms, index_scheduler.index(&index_uid).is_err(), &req);
|
||||||
|
|
||||||
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
|
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
|
||||||
|
let uid = get_task_id(&req, &opt)?;
|
||||||
|
let dry_run = is_dry_run(&req, &opt)?;
|
||||||
let task = document_addition(
|
let task = document_addition(
|
||||||
extract_mime_type(&req)?,
|
extract_mime_type(&req)?,
|
||||||
index_scheduler,
|
index_scheduler,
|
||||||
@ -285,6 +296,8 @@ pub async fn replace_documents(
|
|||||||
params.csv_delimiter,
|
params.csv_delimiter,
|
||||||
body,
|
body,
|
||||||
IndexDocumentsMethod::ReplaceDocuments,
|
IndexDocumentsMethod::ReplaceDocuments,
|
||||||
|
uid,
|
||||||
|
dry_run,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
@ -299,6 +312,7 @@ pub async fn update_documents(
|
|||||||
params: AwebQueryParameter<UpdateDocumentsQuery, DeserrQueryParamError>,
|
params: AwebQueryParameter<UpdateDocumentsQuery, DeserrQueryParamError>,
|
||||||
body: Payload,
|
body: Payload,
|
||||||
req: HttpRequest,
|
req: HttpRequest,
|
||||||
|
opt: web::Data<Opt>,
|
||||||
analytics: web::Data<dyn Analytics>,
|
analytics: web::Data<dyn Analytics>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||||
@ -309,6 +323,8 @@ pub async fn update_documents(
|
|||||||
analytics.update_documents(¶ms, index_scheduler.index(&index_uid).is_err(), &req);
|
analytics.update_documents(¶ms, index_scheduler.index(&index_uid).is_err(), &req);
|
||||||
|
|
||||||
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
|
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
|
||||||
|
let uid = get_task_id(&req, &opt)?;
|
||||||
|
let dry_run = is_dry_run(&req, &opt)?;
|
||||||
let task = document_addition(
|
let task = document_addition(
|
||||||
extract_mime_type(&req)?,
|
extract_mime_type(&req)?,
|
||||||
index_scheduler,
|
index_scheduler,
|
||||||
@ -317,6 +333,8 @@ pub async fn update_documents(
|
|||||||
params.csv_delimiter,
|
params.csv_delimiter,
|
||||||
body,
|
body,
|
||||||
IndexDocumentsMethod::UpdateDocuments,
|
IndexDocumentsMethod::UpdateDocuments,
|
||||||
|
uid,
|
||||||
|
dry_run,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
@ -334,6 +352,8 @@ async fn document_addition(
|
|||||||
csv_delimiter: Option<u8>,
|
csv_delimiter: Option<u8>,
|
||||||
mut body: Payload,
|
mut body: Payload,
|
||||||
method: IndexDocumentsMethod,
|
method: IndexDocumentsMethod,
|
||||||
|
task_id: Option<TaskId>,
|
||||||
|
dry_run: bool,
|
||||||
allow_index_creation: bool,
|
allow_index_creation: bool,
|
||||||
) -> Result<SummarizedTaskView, MeilisearchHttpError> {
|
) -> Result<SummarizedTaskView, MeilisearchHttpError> {
|
||||||
let format = match (
|
let format = match (
|
||||||
@ -366,7 +386,7 @@ async fn document_addition(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let (uuid, mut update_file) = index_scheduler.create_update_file()?;
|
let (uuid, mut update_file) = index_scheduler.create_update_file(dry_run)?;
|
||||||
|
|
||||||
let temp_file = match tempfile() {
|
let temp_file = match tempfile() {
|
||||||
Ok(file) => file,
|
Ok(file) => file,
|
||||||
@ -405,11 +425,9 @@ async fn document_addition(
|
|||||||
let read_file = buffer.into_inner().into_std().await;
|
let read_file = buffer.into_inner().into_std().await;
|
||||||
let documents_count = tokio::task::spawn_blocking(move || {
|
let documents_count = tokio::task::spawn_blocking(move || {
|
||||||
let documents_count = match format {
|
let documents_count = match format {
|
||||||
PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?,
|
PayloadType::Json => read_json(&read_file, &mut update_file)?,
|
||||||
PayloadType::Csv { delimiter } => {
|
PayloadType::Csv { delimiter } => read_csv(&read_file, &mut update_file, delimiter)?,
|
||||||
read_csv(&read_file, update_file.as_file_mut(), delimiter)?
|
PayloadType::Ndjson => read_ndjson(&read_file, &mut update_file)?,
|
||||||
}
|
|
||||||
PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?,
|
|
||||||
};
|
};
|
||||||
// we NEED to persist the file here because we moved the `udpate_file` in another task.
|
// we NEED to persist the file here because we moved the `udpate_file` in another task.
|
||||||
update_file.persist()?;
|
update_file.persist()?;
|
||||||
@ -450,7 +468,9 @@ async fn document_addition(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let scheduler = index_scheduler.clone();
|
let scheduler = index_scheduler.clone();
|
||||||
let task = match tokio::task::spawn_blocking(move || scheduler.register(task)).await? {
|
let task = match tokio::task::spawn_blocking(move || scheduler.register(task, task_id, dry_run))
|
||||||
|
.await?
|
||||||
|
{
|
||||||
Ok(task) => task,
|
Ok(task) => task,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
index_scheduler.delete_update_file(uuid)?;
|
index_scheduler.delete_update_file(uuid)?;
|
||||||
@ -466,6 +486,7 @@ pub async fn delete_documents_batch(
|
|||||||
index_uid: web::Path<String>,
|
index_uid: web::Path<String>,
|
||||||
body: web::Json<Vec<Value>>,
|
body: web::Json<Vec<Value>>,
|
||||||
req: HttpRequest,
|
req: HttpRequest,
|
||||||
|
opt: web::Data<Opt>,
|
||||||
analytics: web::Data<dyn Analytics>,
|
analytics: web::Data<dyn Analytics>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
debug!(parameters = ?body, "Delete documents by batch");
|
debug!(parameters = ?body, "Delete documents by batch");
|
||||||
@ -480,8 +501,12 @@ pub async fn delete_documents_batch(
|
|||||||
|
|
||||||
let task =
|
let task =
|
||||||
KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids };
|
KindWithContent::DocumentDeletion { index_uid: index_uid.to_string(), documents_ids: ids };
|
||||||
|
let uid = get_task_id(&req, &opt)?;
|
||||||
|
let dry_run = is_dry_run(&req, &opt)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||||
|
.await??
|
||||||
|
.into();
|
||||||
|
|
||||||
debug!(returns = ?task, "Delete documents by batch");
|
debug!(returns = ?task, "Delete documents by batch");
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
@ -499,6 +524,7 @@ pub async fn delete_documents_by_filter(
|
|||||||
index_uid: web::Path<String>,
|
index_uid: web::Path<String>,
|
||||||
body: AwebJson<DocumentDeletionByFilter, DeserrJsonError>,
|
body: AwebJson<DocumentDeletionByFilter, DeserrJsonError>,
|
||||||
req: HttpRequest,
|
req: HttpRequest,
|
||||||
|
opt: web::Data<Opt>,
|
||||||
analytics: web::Data<dyn Analytics>,
|
analytics: web::Data<dyn Analytics>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
debug!(parameters = ?body, "Delete documents by filter");
|
debug!(parameters = ?body, "Delete documents by filter");
|
||||||
@ -516,8 +542,12 @@ pub async fn delete_documents_by_filter(
|
|||||||
.map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?;
|
.map_err(|err| ResponseError::from_msg(err.message, Code::InvalidDocumentFilter))?;
|
||||||
let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter };
|
let task = KindWithContent::DocumentDeletionByFilter { index_uid, filter_expr: filter };
|
||||||
|
|
||||||
|
let uid = get_task_id(&req, &opt)?;
|
||||||
|
let dry_run = is_dry_run(&req, &opt)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||||
|
.await??
|
||||||
|
.into();
|
||||||
|
|
||||||
debug!(returns = ?task, "Delete documents by filter");
|
debug!(returns = ?task, "Delete documents by filter");
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
@ -527,14 +557,19 @@ pub async fn clear_all_documents(
|
|||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
|
||||||
index_uid: web::Path<String>,
|
index_uid: web::Path<String>,
|
||||||
req: HttpRequest,
|
req: HttpRequest,
|
||||||
|
opt: web::Data<Opt>,
|
||||||
analytics: web::Data<dyn Analytics>,
|
analytics: web::Data<dyn Analytics>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||||
analytics.delete_documents(DocumentDeletionKind::ClearAll, &req);
|
analytics.delete_documents(DocumentDeletionKind::ClearAll, &req);
|
||||||
|
|
||||||
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };
|
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };
|
||||||
|
let uid = get_task_id(&req, &opt)?;
|
||||||
|
let dry_run = is_dry_run(&req, &opt)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||||
|
.await??
|
||||||
|
.into();
|
||||||
|
|
||||||
debug!(returns = ?task, "Delete all documents");
|
debug!(returns = ?task, "Delete all documents");
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
|
@ -17,11 +17,13 @@ use serde_json::json;
|
|||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
use tracing::debug;
|
use tracing::debug;
|
||||||
|
|
||||||
use super::{Pagination, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
|
use super::{get_task_id, Pagination, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
|
||||||
use crate::analytics::Analytics;
|
use crate::analytics::Analytics;
|
||||||
use crate::extractors::authentication::policies::*;
|
use crate::extractors::authentication::policies::*;
|
||||||
use crate::extractors::authentication::{AuthenticationError, GuardedData};
|
use crate::extractors::authentication::{AuthenticationError, GuardedData};
|
||||||
use crate::extractors::sequential_extractor::SeqHandler;
|
use crate::extractors::sequential_extractor::SeqHandler;
|
||||||
|
use crate::routes::is_dry_run;
|
||||||
|
use crate::Opt;
|
||||||
|
|
||||||
pub mod documents;
|
pub mod documents;
|
||||||
pub mod facet_search;
|
pub mod facet_search;
|
||||||
@ -123,6 +125,7 @@ pub async fn create_index(
|
|||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_CREATE }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_CREATE }>, Data<IndexScheduler>>,
|
||||||
body: AwebJson<IndexCreateRequest, DeserrJsonError>,
|
body: AwebJson<IndexCreateRequest, DeserrJsonError>,
|
||||||
req: HttpRequest,
|
req: HttpRequest,
|
||||||
|
opt: web::Data<Opt>,
|
||||||
analytics: web::Data<dyn Analytics>,
|
analytics: web::Data<dyn Analytics>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
debug!(parameters = ?body, "Create index");
|
debug!(parameters = ?body, "Create index");
|
||||||
@ -137,8 +140,12 @@ pub async fn create_index(
|
|||||||
);
|
);
|
||||||
|
|
||||||
let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key };
|
let task = KindWithContent::IndexCreation { index_uid: uid.to_string(), primary_key };
|
||||||
|
let uid = get_task_id(&req, &opt)?;
|
||||||
|
let dry_run = is_dry_run(&req, &opt)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||||
|
.await??
|
||||||
|
.into();
|
||||||
debug!(returns = ?task, "Create index");
|
debug!(returns = ?task, "Create index");
|
||||||
|
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
@ -190,6 +197,7 @@ pub async fn update_index(
|
|||||||
index_uid: web::Path<String>,
|
index_uid: web::Path<String>,
|
||||||
body: AwebJson<UpdateIndexRequest, DeserrJsonError>,
|
body: AwebJson<UpdateIndexRequest, DeserrJsonError>,
|
||||||
req: HttpRequest,
|
req: HttpRequest,
|
||||||
|
opt: web::Data<Opt>,
|
||||||
analytics: web::Data<dyn Analytics>,
|
analytics: web::Data<dyn Analytics>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
debug!(parameters = ?body, "Update index");
|
debug!(parameters = ?body, "Update index");
|
||||||
@ -206,8 +214,12 @@ pub async fn update_index(
|
|||||||
primary_key: body.primary_key,
|
primary_key: body.primary_key,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let uid = get_task_id(&req, &opt)?;
|
||||||
|
let dry_run = is_dry_run(&req, &opt)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||||
|
.await??
|
||||||
|
.into();
|
||||||
|
|
||||||
debug!(returns = ?task, "Update index");
|
debug!(returns = ?task, "Update index");
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
@ -216,11 +228,17 @@ pub async fn update_index(
|
|||||||
pub async fn delete_index(
|
pub async fn delete_index(
|
||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_DELETE }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_DELETE }>, Data<IndexScheduler>>,
|
||||||
index_uid: web::Path<String>,
|
index_uid: web::Path<String>,
|
||||||
|
req: HttpRequest,
|
||||||
|
opt: web::Data<Opt>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||||
let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() };
|
let task = KindWithContent::IndexDeletion { index_uid: index_uid.into_inner() };
|
||||||
|
let uid = get_task_id(&req, &opt)?;
|
||||||
|
let dry_run = is_dry_run(&req, &opt)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||||
|
.await??
|
||||||
|
.into();
|
||||||
debug!(returns = ?task, "Delete index");
|
debug!(returns = ?task, "Delete index");
|
||||||
|
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
|
@ -15,7 +15,8 @@ use tracing::debug;
|
|||||||
use crate::analytics::Analytics;
|
use crate::analytics::Analytics;
|
||||||
use crate::extractors::authentication::policies::*;
|
use crate::extractors::authentication::policies::*;
|
||||||
use crate::extractors::authentication::GuardedData;
|
use crate::extractors::authentication::GuardedData;
|
||||||
use crate::routes::SummarizedTaskView;
|
use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView};
|
||||||
|
use crate::Opt;
|
||||||
|
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! make_setting_route {
|
macro_rules! make_setting_route {
|
||||||
@ -34,7 +35,8 @@ macro_rules! make_setting_route {
|
|||||||
use $crate::extractors::authentication::policies::*;
|
use $crate::extractors::authentication::policies::*;
|
||||||
use $crate::extractors::authentication::GuardedData;
|
use $crate::extractors::authentication::GuardedData;
|
||||||
use $crate::extractors::sequential_extractor::SeqHandler;
|
use $crate::extractors::sequential_extractor::SeqHandler;
|
||||||
use $crate::routes::SummarizedTaskView;
|
use $crate::Opt;
|
||||||
|
use $crate::routes::{is_dry_run, get_task_id, SummarizedTaskView};
|
||||||
|
|
||||||
pub async fn delete(
|
pub async fn delete(
|
||||||
index_scheduler: GuardedData<
|
index_scheduler: GuardedData<
|
||||||
@ -42,6 +44,8 @@ macro_rules! make_setting_route {
|
|||||||
Data<IndexScheduler>,
|
Data<IndexScheduler>,
|
||||||
>,
|
>,
|
||||||
index_uid: web::Path<String>,
|
index_uid: web::Path<String>,
|
||||||
|
req: HttpRequest,
|
||||||
|
opt: web::Data<Opt>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||||
|
|
||||||
@ -56,8 +60,10 @@ macro_rules! make_setting_route {
|
|||||||
is_deletion: true,
|
is_deletion: true,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
};
|
};
|
||||||
|
let uid = get_task_id(&req, &opt)?;
|
||||||
|
let dry_run = is_dry_run(&req, &opt)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task))
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||||
.await??
|
.await??
|
||||||
.into();
|
.into();
|
||||||
|
|
||||||
@ -73,6 +79,7 @@ macro_rules! make_setting_route {
|
|||||||
index_uid: actix_web::web::Path<String>,
|
index_uid: actix_web::web::Path<String>,
|
||||||
body: deserr::actix_web::AwebJson<Option<$type>, $err_ty>,
|
body: deserr::actix_web::AwebJson<Option<$type>, $err_ty>,
|
||||||
req: HttpRequest,
|
req: HttpRequest,
|
||||||
|
opt: web::Data<Opt>,
|
||||||
$analytics_var: web::Data<dyn Analytics>,
|
$analytics_var: web::Data<dyn Analytics>,
|
||||||
) -> std::result::Result<HttpResponse, ResponseError> {
|
) -> std::result::Result<HttpResponse, ResponseError> {
|
||||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||||
@ -105,8 +112,10 @@ macro_rules! make_setting_route {
|
|||||||
is_deletion: false,
|
is_deletion: false,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
};
|
};
|
||||||
|
let uid = get_task_id(&req, &opt)?;
|
||||||
|
let dry_run = is_dry_run(&req, &opt)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task))
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||||
.await??
|
.await??
|
||||||
.into();
|
.into();
|
||||||
|
|
||||||
@ -652,6 +661,7 @@ pub async fn update_all(
|
|||||||
index_uid: web::Path<String>,
|
index_uid: web::Path<String>,
|
||||||
body: AwebJson<Settings<Unchecked>, DeserrJsonError>,
|
body: AwebJson<Settings<Unchecked>, DeserrJsonError>,
|
||||||
req: HttpRequest,
|
req: HttpRequest,
|
||||||
|
opt: web::Data<Opt>,
|
||||||
analytics: web::Data<dyn Analytics>,
|
analytics: web::Data<dyn Analytics>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||||
@ -767,8 +777,12 @@ pub async fn update_all(
|
|||||||
is_deletion: false,
|
is_deletion: false,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
};
|
};
|
||||||
|
let uid = get_task_id(&req, &opt)?;
|
||||||
|
let dry_run = is_dry_run(&req, &opt)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||||
|
.await??
|
||||||
|
.into();
|
||||||
|
|
||||||
debug!(returns = ?task, "Update all settings");
|
debug!(returns = ?task, "Update all settings");
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
@ -790,6 +804,8 @@ pub async fn get_all(
|
|||||||
pub async fn delete_all(
|
pub async fn delete_all(
|
||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::SETTINGS_UPDATE }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::SETTINGS_UPDATE }>, Data<IndexScheduler>>,
|
||||||
index_uid: web::Path<String>,
|
index_uid: web::Path<String>,
|
||||||
|
req: HttpRequest,
|
||||||
|
opt: web::Data<Opt>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||||
|
|
||||||
@ -803,8 +819,12 @@ pub async fn delete_all(
|
|||||||
is_deletion: true,
|
is_deletion: true,
|
||||||
allow_index_creation,
|
allow_index_creation,
|
||||||
};
|
};
|
||||||
|
let uid = get_task_id(&req, &opt)?;
|
||||||
|
let dry_run = is_dry_run(&req, &opt)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||||
|
.await??
|
||||||
|
.into();
|
||||||
|
|
||||||
debug!(returns = ?task, "Delete all settings");
|
debug!(returns = ?task, "Delete all settings");
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
|
@ -4,7 +4,7 @@ use actix_web::web::Data;
|
|||||||
use actix_web::{web, HttpRequest, HttpResponse};
|
use actix_web::{web, HttpRequest, HttpResponse};
|
||||||
use index_scheduler::IndexScheduler;
|
use index_scheduler::IndexScheduler;
|
||||||
use meilisearch_auth::AuthController;
|
use meilisearch_auth::AuthController;
|
||||||
use meilisearch_types::error::ResponseError;
|
use meilisearch_types::error::{Code, ResponseError};
|
||||||
use meilisearch_types::settings::{Settings, Unchecked};
|
use meilisearch_types::settings::{Settings, Unchecked};
|
||||||
use meilisearch_types::tasks::{Kind, Status, Task, TaskId};
|
use meilisearch_types::tasks::{Kind, Status, Task, TaskId};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@ -15,6 +15,7 @@ use tracing::debug;
|
|||||||
use crate::analytics::Analytics;
|
use crate::analytics::Analytics;
|
||||||
use crate::extractors::authentication::policies::*;
|
use crate::extractors::authentication::policies::*;
|
||||||
use crate::extractors::authentication::GuardedData;
|
use crate::extractors::authentication::GuardedData;
|
||||||
|
use crate::Opt;
|
||||||
|
|
||||||
const PAGINATION_DEFAULT_LIMIT: usize = 20;
|
const PAGINATION_DEFAULT_LIMIT: usize = 20;
|
||||||
|
|
||||||
@ -45,6 +46,56 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
|
|||||||
.service(web::scope("/experimental-features").configure(features::configure));
|
.service(web::scope("/experimental-features").configure(features::configure));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_task_id(req: &HttpRequest, opt: &Opt) -> Result<Option<TaskId>, ResponseError> {
|
||||||
|
if !opt.experimental_replication_parameters {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
let task_id = req
|
||||||
|
.headers()
|
||||||
|
.get("TaskId")
|
||||||
|
.map(|header| {
|
||||||
|
header.to_str().map_err(|e| {
|
||||||
|
ResponseError::from_msg(
|
||||||
|
format!("TaskId is not a valid utf-8 string: {e}"),
|
||||||
|
Code::BadRequest,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.transpose()?
|
||||||
|
.map(|s| {
|
||||||
|
s.parse::<TaskId>().map_err(|e| {
|
||||||
|
ResponseError::from_msg(
|
||||||
|
format!(
|
||||||
|
"Could not parse the TaskId as a {}: {e}",
|
||||||
|
std::any::type_name::<TaskId>(),
|
||||||
|
),
|
||||||
|
Code::BadRequest,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.transpose()?;
|
||||||
|
Ok(task_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_dry_run(req: &HttpRequest, opt: &Opt) -> Result<bool, ResponseError> {
|
||||||
|
if !opt.experimental_replication_parameters {
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
Ok(req
|
||||||
|
.headers()
|
||||||
|
.get("DryRun")
|
||||||
|
.map(|header| {
|
||||||
|
header.to_str().map_err(|e| {
|
||||||
|
ResponseError::from_msg(
|
||||||
|
format!("DryRun is not a valid utf-8 string: {e}"),
|
||||||
|
Code::BadRequest,
|
||||||
|
)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.transpose()?
|
||||||
|
.map_or(false, |s| s.to_lowercase() == "true"))
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize)]
|
#[derive(Debug, Serialize)]
|
||||||
#[serde(rename_all = "camelCase")]
|
#[serde(rename_all = "camelCase")]
|
||||||
pub struct SummarizedTaskView {
|
pub struct SummarizedTaskView {
|
||||||
|
@ -10,7 +10,8 @@ use crate::analytics::Analytics;
|
|||||||
use crate::extractors::authentication::policies::*;
|
use crate::extractors::authentication::policies::*;
|
||||||
use crate::extractors::authentication::GuardedData;
|
use crate::extractors::authentication::GuardedData;
|
||||||
use crate::extractors::sequential_extractor::SeqHandler;
|
use crate::extractors::sequential_extractor::SeqHandler;
|
||||||
use crate::routes::SummarizedTaskView;
|
use crate::routes::{get_task_id, is_dry_run, SummarizedTaskView};
|
||||||
|
use crate::Opt;
|
||||||
|
|
||||||
pub fn configure(cfg: &mut web::ServiceConfig) {
|
pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||||
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_snapshot))));
|
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_snapshot))));
|
||||||
@ -19,13 +20,18 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
|
|||||||
pub async fn create_snapshot(
|
pub async fn create_snapshot(
|
||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::SNAPSHOTS_CREATE }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::SNAPSHOTS_CREATE }>, Data<IndexScheduler>>,
|
||||||
req: HttpRequest,
|
req: HttpRequest,
|
||||||
|
opt: web::Data<Opt>,
|
||||||
analytics: web::Data<dyn Analytics>,
|
analytics: web::Data<dyn Analytics>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
analytics.publish("Snapshot Created".to_string(), json!({}), Some(&req));
|
analytics.publish("Snapshot Created".to_string(), json!({}), Some(&req));
|
||||||
|
|
||||||
let task = KindWithContent::SnapshotCreation;
|
let task = KindWithContent::SnapshotCreation;
|
||||||
|
let uid = get_task_id(&req, &opt)?;
|
||||||
|
let dry_run = is_dry_run(&req, &opt)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||||
|
.await??
|
||||||
|
.into();
|
||||||
|
|
||||||
debug!(returns = ?task, "Create snapshot");
|
debug!(returns = ?task, "Create snapshot");
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
|
@ -10,12 +10,13 @@ use meilisearch_types::index_uid::IndexUid;
|
|||||||
use meilisearch_types::tasks::{IndexSwap, KindWithContent};
|
use meilisearch_types::tasks::{IndexSwap, KindWithContent};
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
|
|
||||||
use super::SummarizedTaskView;
|
use super::{get_task_id, is_dry_run, SummarizedTaskView};
|
||||||
use crate::analytics::Analytics;
|
use crate::analytics::Analytics;
|
||||||
use crate::error::MeilisearchHttpError;
|
use crate::error::MeilisearchHttpError;
|
||||||
use crate::extractors::authentication::policies::*;
|
use crate::extractors::authentication::policies::*;
|
||||||
use crate::extractors::authentication::{AuthenticationError, GuardedData};
|
use crate::extractors::authentication::{AuthenticationError, GuardedData};
|
||||||
use crate::extractors::sequential_extractor::SeqHandler;
|
use crate::extractors::sequential_extractor::SeqHandler;
|
||||||
|
use crate::Opt;
|
||||||
|
|
||||||
pub fn configure(cfg: &mut web::ServiceConfig) {
|
pub fn configure(cfg: &mut web::ServiceConfig) {
|
||||||
cfg.service(web::resource("").route(web::post().to(SeqHandler(swap_indexes))));
|
cfg.service(web::resource("").route(web::post().to(SeqHandler(swap_indexes))));
|
||||||
@ -32,6 +33,7 @@ pub async fn swap_indexes(
|
|||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_SWAP }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_SWAP }>, Data<IndexScheduler>>,
|
||||||
params: AwebJson<Vec<SwapIndexesPayload>, DeserrJsonError>,
|
params: AwebJson<Vec<SwapIndexesPayload>, DeserrJsonError>,
|
||||||
req: HttpRequest,
|
req: HttpRequest,
|
||||||
|
opt: web::Data<Opt>,
|
||||||
analytics: web::Data<dyn Analytics>,
|
analytics: web::Data<dyn Analytics>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
let params = params.into_inner();
|
let params = params.into_inner();
|
||||||
@ -60,7 +62,11 @@ pub async fn swap_indexes(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let task = KindWithContent::IndexSwap { swaps };
|
let task = KindWithContent::IndexSwap { swaps };
|
||||||
|
let uid = get_task_id(&req, &opt)?;
|
||||||
|
let dry_run = is_dry_run(&req, &opt)?;
|
||||||
let task: SummarizedTaskView =
|
let task: SummarizedTaskView =
|
||||||
tokio::task::spawn_blocking(move || index_scheduler.register(task)).await??.into();
|
tokio::task::spawn_blocking(move || index_scheduler.register(task, uid, dry_run))
|
||||||
|
.await??
|
||||||
|
.into();
|
||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
}
|
}
|
||||||
|
@ -18,11 +18,12 @@ use time::macros::format_description;
|
|||||||
use time::{Date, Duration, OffsetDateTime, Time};
|
use time::{Date, Duration, OffsetDateTime, Time};
|
||||||
use tokio::task;
|
use tokio::task;
|
||||||
|
|
||||||
use super::SummarizedTaskView;
|
use super::{get_task_id, is_dry_run, SummarizedTaskView};
|
||||||
use crate::analytics::Analytics;
|
use crate::analytics::Analytics;
|
||||||
use crate::extractors::authentication::policies::*;
|
use crate::extractors::authentication::policies::*;
|
||||||
use crate::extractors::authentication::GuardedData;
|
use crate::extractors::authentication::GuardedData;
|
||||||
use crate::extractors::sequential_extractor::SeqHandler;
|
use crate::extractors::sequential_extractor::SeqHandler;
|
||||||
|
use crate::Opt;
|
||||||
|
|
||||||
const DEFAULT_LIMIT: u32 = 20;
|
const DEFAULT_LIMIT: u32 = 20;
|
||||||
|
|
||||||
@ -161,6 +162,7 @@ async fn cancel_tasks(
|
|||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_CANCEL }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_CANCEL }>, Data<IndexScheduler>>,
|
||||||
params: AwebQueryParameter<TaskDeletionOrCancelationQuery, DeserrQueryParamError>,
|
params: AwebQueryParameter<TaskDeletionOrCancelationQuery, DeserrQueryParamError>,
|
||||||
req: HttpRequest,
|
req: HttpRequest,
|
||||||
|
opt: web::Data<Opt>,
|
||||||
analytics: web::Data<dyn Analytics>,
|
analytics: web::Data<dyn Analytics>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
let params = params.into_inner();
|
let params = params.into_inner();
|
||||||
@ -197,7 +199,11 @@ async fn cancel_tasks(
|
|||||||
let task_cancelation =
|
let task_cancelation =
|
||||||
KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks };
|
KindWithContent::TaskCancelation { query: format!("?{}", req.query_string()), tasks };
|
||||||
|
|
||||||
let task = task::spawn_blocking(move || index_scheduler.register(task_cancelation)).await??;
|
let uid = get_task_id(&req, &opt)?;
|
||||||
|
let dry_run = is_dry_run(&req, &opt)?;
|
||||||
|
let task =
|
||||||
|
task::spawn_blocking(move || index_scheduler.register(task_cancelation, uid, dry_run))
|
||||||
|
.await??;
|
||||||
let task: SummarizedTaskView = task.into();
|
let task: SummarizedTaskView = task.into();
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().json(task))
|
Ok(HttpResponse::Ok().json(task))
|
||||||
@ -207,6 +213,7 @@ async fn delete_tasks(
|
|||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_DELETE }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_DELETE }>, Data<IndexScheduler>>,
|
||||||
params: AwebQueryParameter<TaskDeletionOrCancelationQuery, DeserrQueryParamError>,
|
params: AwebQueryParameter<TaskDeletionOrCancelationQuery, DeserrQueryParamError>,
|
||||||
req: HttpRequest,
|
req: HttpRequest,
|
||||||
|
opt: web::Data<Opt>,
|
||||||
analytics: web::Data<dyn Analytics>,
|
analytics: web::Data<dyn Analytics>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
let params = params.into_inner();
|
let params = params.into_inner();
|
||||||
@ -242,7 +249,10 @@ async fn delete_tasks(
|
|||||||
let task_deletion =
|
let task_deletion =
|
||||||
KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks };
|
KindWithContent::TaskDeletion { query: format!("?{}", req.query_string()), tasks };
|
||||||
|
|
||||||
let task = task::spawn_blocking(move || index_scheduler.register(task_deletion)).await??;
|
let uid = get_task_id(&req, &opt)?;
|
||||||
|
let dry_run = is_dry_run(&req, &opt)?;
|
||||||
|
let task = task::spawn_blocking(move || index_scheduler.register(task_deletion, uid, dry_run))
|
||||||
|
.await??;
|
||||||
let task: SummarizedTaskView = task.into();
|
let task: SummarizedTaskView = task.into();
|
||||||
|
|
||||||
Ok(HttpResponse::Ok().json(task))
|
Ok(HttpResponse::Ok().json(task))
|
||||||
|
@ -100,16 +100,11 @@ impl Index<'_> {
|
|||||||
pub async fn raw_add_documents(
|
pub async fn raw_add_documents(
|
||||||
&self,
|
&self,
|
||||||
payload: &str,
|
payload: &str,
|
||||||
content_type: Option<&str>,
|
headers: Vec<(&str, &str)>,
|
||||||
query_parameter: &str,
|
query_parameter: &str,
|
||||||
) -> (Value, StatusCode) {
|
) -> (Value, StatusCode) {
|
||||||
let url = format!("/indexes/{}/documents{}", urlencode(self.uid.as_ref()), query_parameter);
|
let url = format!("/indexes/{}/documents{}", urlencode(self.uid.as_ref()), query_parameter);
|
||||||
|
self.service.post_str(url, payload, headers).await
|
||||||
if let Some(content_type) = content_type {
|
|
||||||
self.service.post_str(url, payload, vec![("Content-Type", content_type)]).await
|
|
||||||
} else {
|
|
||||||
self.service.post_str(url, payload, Vec::new()).await
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn update_documents(
|
pub async fn update_documents(
|
||||||
|
@ -1,10 +1,11 @@
|
|||||||
use actix_web::test;
|
use actix_web::test;
|
||||||
use meili_snap::{json_string, snapshot};
|
use meili_snap::{json_string, snapshot};
|
||||||
|
use meilisearch::Opt;
|
||||||
use time::format_description::well_known::Rfc3339;
|
use time::format_description::well_known::Rfc3339;
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
|
|
||||||
use crate::common::encoder::Encoder;
|
use crate::common::encoder::Encoder;
|
||||||
use crate::common::{GetAllDocumentsOptions, Server, Value};
|
use crate::common::{default_settings, GetAllDocumentsOptions, Server, Value};
|
||||||
use crate::json;
|
use crate::json;
|
||||||
|
|
||||||
/// This is the basic usage of our API and every other tests uses the content-type application/json
|
/// This is the basic usage of our API and every other tests uses the content-type application/json
|
||||||
@ -2157,3 +2158,49 @@ async fn batch_several_documents_addition() {
|
|||||||
assert_eq!(code, 200, "failed with `{}`", response);
|
assert_eq!(code, 200, "failed with `{}`", response);
|
||||||
assert_eq!(response["results"].as_array().unwrap().len(), 120);
|
assert_eq!(response["results"].as_array().unwrap().len(), 120);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn dry_register_file() {
|
||||||
|
let temp = tempfile::tempdir().unwrap();
|
||||||
|
|
||||||
|
let options =
|
||||||
|
Opt { experimental_replication_parameters: true, ..default_settings(temp.path()) };
|
||||||
|
let server = Server::new_with_options(options).await.unwrap();
|
||||||
|
let index = server.index("tamo");
|
||||||
|
|
||||||
|
let documents = r#"
|
||||||
|
{
|
||||||
|
"id": "12",
|
||||||
|
"doggo": "kefir"
|
||||||
|
}
|
||||||
|
"#;
|
||||||
|
|
||||||
|
let (response, code) = index
|
||||||
|
.raw_add_documents(
|
||||||
|
documents,
|
||||||
|
vec![("Content-Type", "application/json"), ("DryRun", "true")],
|
||||||
|
"",
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
snapshot!(response, @r###"
|
||||||
|
{
|
||||||
|
"taskUid": 0,
|
||||||
|
"indexUid": "tamo",
|
||||||
|
"status": "enqueued",
|
||||||
|
"type": "documentAdditionOrUpdate",
|
||||||
|
"enqueuedAt": "[date]"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
snapshot!(code, @"202 Accepted");
|
||||||
|
|
||||||
|
let (response, code) = index.get_task(response.uid()).await;
|
||||||
|
snapshot!(response, @r###"
|
||||||
|
{
|
||||||
|
"message": "Task `0` not found.",
|
||||||
|
"code": "task_not_found",
|
||||||
|
"type": "invalid_request",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#task_not_found"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
snapshot!(code, @"404 Not Found");
|
||||||
|
}
|
||||||
|
@ -209,7 +209,8 @@ async fn replace_documents_missing_payload() {
|
|||||||
let server = Server::new().await;
|
let server = Server::new().await;
|
||||||
let index = server.index("test");
|
let index = server.index("test");
|
||||||
|
|
||||||
let (response, code) = index.raw_add_documents("", Some("application/json"), "").await;
|
let (response, code) =
|
||||||
|
index.raw_add_documents("", vec![("Content-Type", "application/json")], "").await;
|
||||||
snapshot!(code, @"400 Bad Request");
|
snapshot!(code, @"400 Bad Request");
|
||||||
snapshot!(json_string!(response), @r###"
|
snapshot!(json_string!(response), @r###"
|
||||||
{
|
{
|
||||||
@ -220,7 +221,8 @@ async fn replace_documents_missing_payload() {
|
|||||||
}
|
}
|
||||||
"###);
|
"###);
|
||||||
|
|
||||||
let (response, code) = index.raw_add_documents("", Some("application/x-ndjson"), "").await;
|
let (response, code) =
|
||||||
|
index.raw_add_documents("", vec![("Content-Type", "application/x-ndjson")], "").await;
|
||||||
snapshot!(code, @"400 Bad Request");
|
snapshot!(code, @"400 Bad Request");
|
||||||
snapshot!(json_string!(response), @r###"
|
snapshot!(json_string!(response), @r###"
|
||||||
{
|
{
|
||||||
@ -231,7 +233,8 @@ async fn replace_documents_missing_payload() {
|
|||||||
}
|
}
|
||||||
"###);
|
"###);
|
||||||
|
|
||||||
let (response, code) = index.raw_add_documents("", Some("text/csv"), "").await;
|
let (response, code) =
|
||||||
|
index.raw_add_documents("", vec![("Content-Type", "text/csv")], "").await;
|
||||||
snapshot!(code, @"400 Bad Request");
|
snapshot!(code, @"400 Bad Request");
|
||||||
snapshot!(json_string!(response), @r###"
|
snapshot!(json_string!(response), @r###"
|
||||||
{
|
{
|
||||||
@ -287,7 +290,7 @@ async fn replace_documents_missing_content_type() {
|
|||||||
let server = Server::new().await;
|
let server = Server::new().await;
|
||||||
let index = server.index("test");
|
let index = server.index("test");
|
||||||
|
|
||||||
let (response, code) = index.raw_add_documents("", None, "").await;
|
let (response, code) = index.raw_add_documents("", Vec::new(), "").await;
|
||||||
snapshot!(code, @"415 Unsupported Media Type");
|
snapshot!(code, @"415 Unsupported Media Type");
|
||||||
snapshot!(json_string!(response), @r###"
|
snapshot!(json_string!(response), @r###"
|
||||||
{
|
{
|
||||||
@ -299,7 +302,7 @@ async fn replace_documents_missing_content_type() {
|
|||||||
"###);
|
"###);
|
||||||
|
|
||||||
// even with a csv delimiter specified this error is triggered first
|
// even with a csv delimiter specified this error is triggered first
|
||||||
let (response, code) = index.raw_add_documents("", None, "?csvDelimiter=;").await;
|
let (response, code) = index.raw_add_documents("", Vec::new(), "?csvDelimiter=;").await;
|
||||||
snapshot!(code, @"415 Unsupported Media Type");
|
snapshot!(code, @"415 Unsupported Media Type");
|
||||||
snapshot!(json_string!(response), @r###"
|
snapshot!(json_string!(response), @r###"
|
||||||
{
|
{
|
||||||
@ -345,7 +348,7 @@ async fn replace_documents_bad_content_type() {
|
|||||||
let server = Server::new().await;
|
let server = Server::new().await;
|
||||||
let index = server.index("test");
|
let index = server.index("test");
|
||||||
|
|
||||||
let (response, code) = index.raw_add_documents("", Some("doggo"), "").await;
|
let (response, code) = index.raw_add_documents("", vec![("Content-Type", "doggo")], "").await;
|
||||||
snapshot!(code, @"415 Unsupported Media Type");
|
snapshot!(code, @"415 Unsupported Media Type");
|
||||||
snapshot!(json_string!(response), @r###"
|
snapshot!(json_string!(response), @r###"
|
||||||
{
|
{
|
||||||
@ -379,8 +382,9 @@ async fn replace_documents_bad_csv_delimiter() {
|
|||||||
let server = Server::new().await;
|
let server = Server::new().await;
|
||||||
let index = server.index("test");
|
let index = server.index("test");
|
||||||
|
|
||||||
let (response, code) =
|
let (response, code) = index
|
||||||
index.raw_add_documents("", Some("application/json"), "?csvDelimiter").await;
|
.raw_add_documents("", vec![("Content-Type", "application/json")], "?csvDelimiter")
|
||||||
|
.await;
|
||||||
snapshot!(code, @"400 Bad Request");
|
snapshot!(code, @"400 Bad Request");
|
||||||
snapshot!(json_string!(response), @r###"
|
snapshot!(json_string!(response), @r###"
|
||||||
{
|
{
|
||||||
@ -391,8 +395,9 @@ async fn replace_documents_bad_csv_delimiter() {
|
|||||||
}
|
}
|
||||||
"###);
|
"###);
|
||||||
|
|
||||||
let (response, code) =
|
let (response, code) = index
|
||||||
index.raw_add_documents("", Some("application/json"), "?csvDelimiter=doggo").await;
|
.raw_add_documents("", vec![("Content-Type", "application/json")], "?csvDelimiter=doggo")
|
||||||
|
.await;
|
||||||
snapshot!(code, @"400 Bad Request");
|
snapshot!(code, @"400 Bad Request");
|
||||||
snapshot!(json_string!(response), @r###"
|
snapshot!(json_string!(response), @r###"
|
||||||
{
|
{
|
||||||
@ -404,7 +409,11 @@ async fn replace_documents_bad_csv_delimiter() {
|
|||||||
"###);
|
"###);
|
||||||
|
|
||||||
let (response, code) = index
|
let (response, code) = index
|
||||||
.raw_add_documents("", Some("application/json"), &format!("?csvDelimiter={}", encode("🍰")))
|
.raw_add_documents(
|
||||||
|
"",
|
||||||
|
vec![("Content-Type", "application/json")],
|
||||||
|
&format!("?csvDelimiter={}", encode("🍰")),
|
||||||
|
)
|
||||||
.await;
|
.await;
|
||||||
snapshot!(code, @"400 Bad Request");
|
snapshot!(code, @"400 Bad Request");
|
||||||
snapshot!(json_string!(response), @r###"
|
snapshot!(json_string!(response), @r###"
|
||||||
@ -469,8 +478,9 @@ async fn replace_documents_csv_delimiter_with_bad_content_type() {
|
|||||||
let server = Server::new().await;
|
let server = Server::new().await;
|
||||||
let index = server.index("test");
|
let index = server.index("test");
|
||||||
|
|
||||||
let (response, code) =
|
let (response, code) = index
|
||||||
index.raw_add_documents("", Some("application/json"), "?csvDelimiter=a").await;
|
.raw_add_documents("", vec![("Content-Type", "application/json")], "?csvDelimiter=a")
|
||||||
|
.await;
|
||||||
snapshot!(code, @"415 Unsupported Media Type");
|
snapshot!(code, @"415 Unsupported Media Type");
|
||||||
snapshot!(json_string!(response), @r###"
|
snapshot!(json_string!(response), @r###"
|
||||||
{
|
{
|
||||||
@ -481,8 +491,9 @@ async fn replace_documents_csv_delimiter_with_bad_content_type() {
|
|||||||
}
|
}
|
||||||
"###);
|
"###);
|
||||||
|
|
||||||
let (response, code) =
|
let (response, code) = index
|
||||||
index.raw_add_documents("", Some("application/x-ndjson"), "?csvDelimiter=a").await;
|
.raw_add_documents("", vec![("Content-Type", "application/x-ndjson")], "?csvDelimiter=a")
|
||||||
|
.await;
|
||||||
snapshot!(code, @"415 Unsupported Media Type");
|
snapshot!(code, @"415 Unsupported Media Type");
|
||||||
snapshot!(json_string!(response), @r###"
|
snapshot!(json_string!(response), @r###"
|
||||||
{
|
{
|
||||||
|
@ -2,9 +2,10 @@ use actix_web::http::header::ContentType;
|
|||||||
use actix_web::test;
|
use actix_web::test;
|
||||||
use http::header::ACCEPT_ENCODING;
|
use http::header::ACCEPT_ENCODING;
|
||||||
use meili_snap::{json_string, snapshot};
|
use meili_snap::{json_string, snapshot};
|
||||||
|
use meilisearch::Opt;
|
||||||
|
|
||||||
use crate::common::encoder::Encoder;
|
use crate::common::encoder::Encoder;
|
||||||
use crate::common::{Server, Value};
|
use crate::common::{default_settings, Server, Value};
|
||||||
use crate::json;
|
use crate::json;
|
||||||
|
|
||||||
#[actix_rt::test]
|
#[actix_rt::test]
|
||||||
@ -199,3 +200,79 @@ async fn error_create_with_invalid_index_uid() {
|
|||||||
}
|
}
|
||||||
"###);
|
"###);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_rt::test]
|
||||||
|
async fn send_task_id() {
|
||||||
|
let temp = tempfile::tempdir().unwrap();
|
||||||
|
|
||||||
|
let options =
|
||||||
|
Opt { experimental_replication_parameters: true, ..default_settings(temp.path()) };
|
||||||
|
let server = Server::new_with_options(options).await.unwrap();
|
||||||
|
|
||||||
|
let app = server.init_web_app().await;
|
||||||
|
let index = server.index("catto");
|
||||||
|
let (response, code) = index.create(None).await;
|
||||||
|
snapshot!(code, @"202 Accepted");
|
||||||
|
snapshot!(json_string!(response, { ".enqueuedAt" => "[date]" }), @r###"
|
||||||
|
{
|
||||||
|
"taskUid": 0,
|
||||||
|
"indexUid": "catto",
|
||||||
|
"status": "enqueued",
|
||||||
|
"type": "indexCreation",
|
||||||
|
"enqueuedAt": "[date]"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
|
||||||
|
let body = serde_json::to_string(&json!({
|
||||||
|
"uid": "doggo",
|
||||||
|
"primaryKey": None::<&str>,
|
||||||
|
}))
|
||||||
|
.unwrap();
|
||||||
|
let req = test::TestRequest::post()
|
||||||
|
.uri("/indexes")
|
||||||
|
.insert_header(("TaskId", "25"))
|
||||||
|
.insert_header(ContentType::json())
|
||||||
|
.set_payload(body)
|
||||||
|
.to_request();
|
||||||
|
|
||||||
|
let res = test::call_service(&app, req).await;
|
||||||
|
snapshot!(res.status(), @"202 Accepted");
|
||||||
|
|
||||||
|
let bytes = test::read_body(res).await;
|
||||||
|
let response = serde_json::from_slice::<Value>(&bytes).expect("Expecting valid json");
|
||||||
|
snapshot!(json_string!(response, { ".enqueuedAt" => "[date]" }), @r###"
|
||||||
|
{
|
||||||
|
"taskUid": 25,
|
||||||
|
"indexUid": "doggo",
|
||||||
|
"status": "enqueued",
|
||||||
|
"type": "indexCreation",
|
||||||
|
"enqueuedAt": "[date]"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
|
||||||
|
let body = serde_json::to_string(&json!({
|
||||||
|
"uid": "girafo",
|
||||||
|
"primaryKey": None::<&str>,
|
||||||
|
}))
|
||||||
|
.unwrap();
|
||||||
|
let req = test::TestRequest::post()
|
||||||
|
.uri("/indexes")
|
||||||
|
.insert_header(("TaskId", "12"))
|
||||||
|
.insert_header(ContentType::json())
|
||||||
|
.set_payload(body)
|
||||||
|
.to_request();
|
||||||
|
|
||||||
|
let res = test::call_service(&app, req).await;
|
||||||
|
snapshot!(res.status(), @"400 Bad Request");
|
||||||
|
|
||||||
|
let bytes = test::read_body(res).await;
|
||||||
|
let response = serde_json::from_slice::<Value>(&bytes).expect("Expecting valid json");
|
||||||
|
snapshot!(json_string!(response), @r###"
|
||||||
|
{
|
||||||
|
"message": "Received bad task id: 12 should be >= to 26.",
|
||||||
|
"code": "bad_request",
|
||||||
|
"type": "invalid_request",
|
||||||
|
"link": "https://docs.meilisearch.com/errors#bad_request"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user