mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-29 08:14:26 +01:00
Merge #4238
4238: Task queue webhook r=dureuill a=irevoire # Prototype `prototype-task-queue-webhook-1` The prototype is available through Docker by using the following command: ```bash docker run -p 7700:7700 -v $(pwd)/meili_data:/meili_data getmeili/meilisearch:prototype-task-queue-webhook-1 ``` # Pull Request Implements the task queue webhook. ## Related issue Fixes https://github.com/meilisearch/meilisearch/issues/4236 ## What does this PR do? - Provide a new cli and env var for the webhook, respectively called `--task-webhook-url` and `MEILI_TASK_WEBHOOK_URL` - Also supports sending the requests with a custom `Authorization` header by specifying the optional `--task-webhook-authorization-header` CLI parameter or `MEILI_TASK_WEBHOOK_AUTHORIZATION_HEADER` env variable. - Throw an error if the specified URL is invalid - Every time a batch is processed, send all the finished tasks into the webhook with our public `TaskView` type as a JSON Line GZIPed body. - Add one test. ## PR checklist ### Before becoming ready to review - [x] Add a test - [x] Compress the data we send - [x] Chunk and stream the data we send - [x] Remove the unwrap in the index-scheduler when sending the data fails - [x] The analytics are missing ### Before merging - [x] Release a prototype Co-authored-by: Tamo <tamo@meilisearch.com> Co-authored-by: Clément Renault <clement@meilisearch.com>
This commit is contained in:
commit
43e822e802
24
Cargo.lock
generated
24
Cargo.lock
generated
@ -1677,9 +1677,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "flate2"
|
name = "flate2"
|
||||||
version = "1.0.26"
|
version = "1.0.28"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743"
|
checksum = "46303f565772937ffe1d394a4fac6f411c6013172fadde9dcdb1e147a086940e"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"crc32fast",
|
"crc32fast",
|
||||||
"miniz_oxide",
|
"miniz_oxide",
|
||||||
@ -1701,9 +1701,9 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "form_urlencoded"
|
name = "form_urlencoded"
|
||||||
version = "1.2.0"
|
version = "1.2.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "a62bc1cf6f830c2ec14a513a9fb124d0a213a629668a4186f329db21fe045652"
|
checksum = "e13624c2627564efccf4934284bdd98cbaa14e79b0b5a141218e507b3a823456"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
]
|
]
|
||||||
@ -2753,9 +2753,9 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "idna"
|
name = "idna"
|
||||||
version = "0.4.0"
|
version = "0.5.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c"
|
checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"unicode-bidi",
|
"unicode-bidi",
|
||||||
"unicode-normalization",
|
"unicode-normalization",
|
||||||
@ -2774,6 +2774,7 @@ dependencies = [
|
|||||||
"dump",
|
"dump",
|
||||||
"enum-iterator",
|
"enum-iterator",
|
||||||
"file-store",
|
"file-store",
|
||||||
|
"flate2",
|
||||||
"insta",
|
"insta",
|
||||||
"log",
|
"log",
|
||||||
"meili-snap",
|
"meili-snap",
|
||||||
@ -2789,6 +2790,7 @@ dependencies = [
|
|||||||
"tempfile",
|
"tempfile",
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"time",
|
"time",
|
||||||
|
"ureq",
|
||||||
"uuid 1.5.0",
|
"uuid 1.5.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
@ -3559,6 +3561,7 @@ dependencies = [
|
|||||||
"tokio",
|
"tokio",
|
||||||
"tokio-stream",
|
"tokio-stream",
|
||||||
"toml",
|
"toml",
|
||||||
|
"url",
|
||||||
"urlencoding",
|
"urlencoding",
|
||||||
"uuid 1.5.0",
|
"uuid 1.5.0",
|
||||||
"vergen",
|
"vergen",
|
||||||
@ -4067,9 +4070,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "percent-encoding"
|
name = "percent-encoding"
|
||||||
version = "2.3.0"
|
version = "2.3.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94"
|
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "permissive-json-pointer"
|
name = "permissive-json-pointer"
|
||||||
@ -5597,13 +5600,14 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "url"
|
name = "url"
|
||||||
version = "2.4.0"
|
version = "2.5.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "50bff7831e19200a85b17131d085c25d7811bc4e186efdaf54bbd132994a88cb"
|
checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"form_urlencoded",
|
"form_urlencoded",
|
||||||
"idna",
|
"idna",
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -18,6 +18,7 @@ derive_builder = "0.12.0"
|
|||||||
dump = { path = "../dump" }
|
dump = { path = "../dump" }
|
||||||
enum-iterator = "1.4.0"
|
enum-iterator = "1.4.0"
|
||||||
file-store = { path = "../file-store" }
|
file-store = { path = "../file-store" }
|
||||||
|
flate2 = "1.0.28"
|
||||||
log = "0.4.17"
|
log = "0.4.17"
|
||||||
meilisearch-auth = { path = "../meilisearch-auth" }
|
meilisearch-auth = { path = "../meilisearch-auth" }
|
||||||
meilisearch-types = { path = "../meilisearch-types" }
|
meilisearch-types = { path = "../meilisearch-types" }
|
||||||
@ -30,6 +31,7 @@ synchronoise = "1.0.1"
|
|||||||
tempfile = "3.5.0"
|
tempfile = "3.5.0"
|
||||||
thiserror = "1.0.40"
|
thiserror = "1.0.40"
|
||||||
time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] }
|
time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] }
|
||||||
|
ureq = "2.9.1"
|
||||||
uuid = { version = "1.3.1", features = ["serde", "v4"] }
|
uuid = { version = "1.3.1", features = ["serde", "v4"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
@ -37,6 +37,8 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
|
|||||||
snapshots_path: _,
|
snapshots_path: _,
|
||||||
auth_path: _,
|
auth_path: _,
|
||||||
version_file_path: _,
|
version_file_path: _,
|
||||||
|
webhook_url: _,
|
||||||
|
webhook_authorization_header: _,
|
||||||
test_breakpoint_sdr: _,
|
test_breakpoint_sdr: _,
|
||||||
planned_failures: _,
|
planned_failures: _,
|
||||||
run_loop_iteration: _,
|
run_loop_iteration: _,
|
||||||
|
@ -34,6 +34,7 @@ pub type TaskId = u32;
|
|||||||
|
|
||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::{BTreeMap, HashMap};
|
||||||
use std::fs::File;
|
use std::fs::File;
|
||||||
|
use std::io::{self, BufReader, Read};
|
||||||
use std::ops::{Bound, RangeBounds};
|
use std::ops::{Bound, RangeBounds};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::atomic::AtomicBool;
|
use std::sync::atomic::AtomicBool;
|
||||||
@ -45,6 +46,8 @@ use dump::{KindDump, TaskDump, UpdateFile};
|
|||||||
pub use error::Error;
|
pub use error::Error;
|
||||||
pub use features::RoFeatures;
|
pub use features::RoFeatures;
|
||||||
use file_store::FileStore;
|
use file_store::FileStore;
|
||||||
|
use flate2::bufread::GzEncoder;
|
||||||
|
use flate2::Compression;
|
||||||
use meilisearch_types::error::ResponseError;
|
use meilisearch_types::error::ResponseError;
|
||||||
use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFeatures};
|
use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFeatures};
|
||||||
use meilisearch_types::heed::byteorder::BE;
|
use meilisearch_types::heed::byteorder::BE;
|
||||||
@ -54,6 +57,7 @@ use meilisearch_types::milli::documents::DocumentsBatchBuilder;
|
|||||||
use meilisearch_types::milli::update::IndexerConfig;
|
use meilisearch_types::milli::update::IndexerConfig;
|
||||||
use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs};
|
use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs};
|
||||||
use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32};
|
use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32};
|
||||||
|
use meilisearch_types::task_view::TaskView;
|
||||||
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
|
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
|
||||||
use puffin::FrameView;
|
use puffin::FrameView;
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
@ -170,8 +174,8 @@ impl ProcessingTasks {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Set the processing tasks to an empty list
|
/// Set the processing tasks to an empty list
|
||||||
fn stop_processing(&mut self) {
|
fn stop_processing(&mut self) -> RoaringBitmap {
|
||||||
self.processing = RoaringBitmap::new();
|
std::mem::take(&mut self.processing)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns `true` if there, at least, is one task that is currently processing that we must stop.
|
/// Returns `true` if there, at least, is one task that is currently processing that we must stop.
|
||||||
@ -241,6 +245,10 @@ pub struct IndexSchedulerOptions {
|
|||||||
pub snapshots_path: PathBuf,
|
pub snapshots_path: PathBuf,
|
||||||
/// The path to the folder containing the dumps.
|
/// The path to the folder containing the dumps.
|
||||||
pub dumps_path: PathBuf,
|
pub dumps_path: PathBuf,
|
||||||
|
/// The URL on which we must send the tasks statuses
|
||||||
|
pub webhook_url: Option<String>,
|
||||||
|
/// The value we will send into the Authorization HTTP header on the webhook URL
|
||||||
|
pub webhook_authorization_header: Option<String>,
|
||||||
/// The maximum size, in bytes, of the task index.
|
/// The maximum size, in bytes, of the task index.
|
||||||
pub task_db_size: usize,
|
pub task_db_size: usize,
|
||||||
/// The size, in bytes, with which a meilisearch index is opened the first time of each meilisearch index.
|
/// The size, in bytes, with which a meilisearch index is opened the first time of each meilisearch index.
|
||||||
@ -323,6 +331,11 @@ pub struct IndexScheduler {
|
|||||||
/// The maximum number of tasks that will be batched together.
|
/// The maximum number of tasks that will be batched together.
|
||||||
pub(crate) max_number_of_batched_tasks: usize,
|
pub(crate) max_number_of_batched_tasks: usize,
|
||||||
|
|
||||||
|
/// The webhook url we should send tasks to after processing every batches.
|
||||||
|
pub(crate) webhook_url: Option<String>,
|
||||||
|
/// The Authorization header to send to the webhook URL.
|
||||||
|
pub(crate) webhook_authorization_header: Option<String>,
|
||||||
|
|
||||||
/// A frame to output the indexation profiling files to disk.
|
/// A frame to output the indexation profiling files to disk.
|
||||||
pub(crate) puffin_frame: Arc<puffin::GlobalFrameView>,
|
pub(crate) puffin_frame: Arc<puffin::GlobalFrameView>,
|
||||||
|
|
||||||
@ -388,6 +401,8 @@ impl IndexScheduler {
|
|||||||
dumps_path: self.dumps_path.clone(),
|
dumps_path: self.dumps_path.clone(),
|
||||||
auth_path: self.auth_path.clone(),
|
auth_path: self.auth_path.clone(),
|
||||||
version_file_path: self.version_file_path.clone(),
|
version_file_path: self.version_file_path.clone(),
|
||||||
|
webhook_url: self.webhook_url.clone(),
|
||||||
|
webhook_authorization_header: self.webhook_authorization_header.clone(),
|
||||||
currently_updating_index: self.currently_updating_index.clone(),
|
currently_updating_index: self.currently_updating_index.clone(),
|
||||||
embedders: self.embedders.clone(),
|
embedders: self.embedders.clone(),
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@ -487,6 +502,8 @@ impl IndexScheduler {
|
|||||||
snapshots_path: options.snapshots_path,
|
snapshots_path: options.snapshots_path,
|
||||||
auth_path: options.auth_path,
|
auth_path: options.auth_path,
|
||||||
version_file_path: options.version_file_path,
|
version_file_path: options.version_file_path,
|
||||||
|
webhook_url: options.webhook_url,
|
||||||
|
webhook_authorization_header: options.webhook_authorization_header,
|
||||||
currently_updating_index: Arc::new(RwLock::new(None)),
|
currently_updating_index: Arc::new(RwLock::new(None)),
|
||||||
embedders: Default::default(),
|
embedders: Default::default(),
|
||||||
|
|
||||||
@ -1251,19 +1268,99 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.processing_tasks.write().unwrap().stop_processing();
|
let processed = self.processing_tasks.write().unwrap().stop_processing();
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
self.maybe_fail(tests::FailureLocation::CommittingWtxn)?;
|
self.maybe_fail(tests::FailureLocation::CommittingWtxn)?;
|
||||||
|
|
||||||
wtxn.commit().map_err(Error::HeedTransaction)?;
|
wtxn.commit().map_err(Error::HeedTransaction)?;
|
||||||
|
|
||||||
|
// We shouldn't crash the tick function if we can't send data to the webhook.
|
||||||
|
let _ = self.notify_webhook(&processed);
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
self.breakpoint(Breakpoint::AfterProcessing);
|
self.breakpoint(Breakpoint::AfterProcessing);
|
||||||
|
|
||||||
Ok(TickOutcome::TickAgain(processed_tasks))
|
Ok(TickOutcome::TickAgain(processed_tasks))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Once the tasks changes have been commited we must send all the tasks that were updated to our webhook if there is one.
|
||||||
|
fn notify_webhook(&self, updated: &RoaringBitmap) -> Result<()> {
|
||||||
|
if let Some(ref url) = self.webhook_url {
|
||||||
|
struct TaskReader<'a, 'b> {
|
||||||
|
rtxn: &'a RoTxn<'a>,
|
||||||
|
index_scheduler: &'a IndexScheduler,
|
||||||
|
tasks: &'b mut roaring::bitmap::Iter<'b>,
|
||||||
|
buffer: Vec<u8>,
|
||||||
|
written: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, 'b> Read for TaskReader<'a, 'b> {
|
||||||
|
fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
|
||||||
|
if self.buffer.is_empty() {
|
||||||
|
match self.tasks.next() {
|
||||||
|
None => return Ok(0),
|
||||||
|
Some(task_id) => {
|
||||||
|
let task = self
|
||||||
|
.index_scheduler
|
||||||
|
.get_task(self.rtxn, task_id)
|
||||||
|
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?
|
||||||
|
.ok_or_else(|| {
|
||||||
|
io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
Error::CorruptedTaskQueue,
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
serde_json::to_writer(
|
||||||
|
&mut self.buffer,
|
||||||
|
&TaskView::from_task(&task),
|
||||||
|
)?;
|
||||||
|
self.buffer.push(b'\n');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut to_write = &self.buffer[self.written..];
|
||||||
|
let wrote = io::copy(&mut to_write, &mut buf)?;
|
||||||
|
self.written += wrote as usize;
|
||||||
|
|
||||||
|
// we wrote everything and must refresh our buffer on the next call
|
||||||
|
if self.written == self.buffer.len() {
|
||||||
|
self.written = 0;
|
||||||
|
self.buffer.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(wrote as usize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let rtxn = self.env.read_txn()?;
|
||||||
|
|
||||||
|
let task_reader = TaskReader {
|
||||||
|
rtxn: &rtxn,
|
||||||
|
index_scheduler: self,
|
||||||
|
tasks: &mut updated.into_iter(),
|
||||||
|
buffer: Vec::with_capacity(50), // on average a task is around ~100 bytes
|
||||||
|
written: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
// let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default());
|
||||||
|
let reader = GzEncoder::new(BufReader::new(task_reader), Compression::default());
|
||||||
|
let request = ureq::post(url).set("Content-Encoding", "gzip");
|
||||||
|
let request = match &self.webhook_authorization_header {
|
||||||
|
Some(header) => request.set("Authorization", header),
|
||||||
|
None => request,
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Err(e) = request.send(reader) {
|
||||||
|
log::error!("While sending data to the webhook: {e}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Register a task to cleanup the task queue if needed
|
/// Register a task to cleanup the task queue if needed
|
||||||
fn cleanup_task_queue(&self) -> Result<()> {
|
fn cleanup_task_queue(&self) -> Result<()> {
|
||||||
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
|
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
|
||||||
@ -1677,6 +1774,8 @@ mod tests {
|
|||||||
indexes_path: tempdir.path().join("indexes"),
|
indexes_path: tempdir.path().join("indexes"),
|
||||||
snapshots_path: tempdir.path().join("snapshots"),
|
snapshots_path: tempdir.path().join("snapshots"),
|
||||||
dumps_path: tempdir.path().join("dumps"),
|
dumps_path: tempdir.path().join("dumps"),
|
||||||
|
webhook_url: None,
|
||||||
|
webhook_authorization_header: None,
|
||||||
task_db_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
|
task_db_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
|
||||||
index_base_map_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
|
index_base_map_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
|
||||||
enable_mdb_writemap: false,
|
enable_mdb_writemap: false,
|
||||||
|
@ -9,6 +9,7 @@ pub mod index_uid_pattern;
|
|||||||
pub mod keys;
|
pub mod keys;
|
||||||
pub mod settings;
|
pub mod settings;
|
||||||
pub mod star_or;
|
pub mod star_or;
|
||||||
|
pub mod task_view;
|
||||||
pub mod tasks;
|
pub mod tasks;
|
||||||
pub mod versioning;
|
pub mod versioning;
|
||||||
pub use milli::{heed, Index};
|
pub use milli::{heed, Index};
|
||||||
|
139
meilisearch-types/src/task_view.rs
Normal file
139
meilisearch-types/src/task_view.rs
Normal file
@ -0,0 +1,139 @@
|
|||||||
|
use serde::Serialize;
|
||||||
|
use time::{Duration, OffsetDateTime};
|
||||||
|
|
||||||
|
use crate::error::ResponseError;
|
||||||
|
use crate::settings::{Settings, Unchecked};
|
||||||
|
use crate::tasks::{serialize_duration, Details, IndexSwap, Kind, Status, Task, TaskId};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct TaskView {
|
||||||
|
pub uid: TaskId,
|
||||||
|
#[serde(default)]
|
||||||
|
pub index_uid: Option<String>,
|
||||||
|
pub status: Status,
|
||||||
|
#[serde(rename = "type")]
|
||||||
|
pub kind: Kind,
|
||||||
|
pub canceled_by: Option<TaskId>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub details: Option<DetailsView>,
|
||||||
|
pub error: Option<ResponseError>,
|
||||||
|
#[serde(serialize_with = "serialize_duration", default)]
|
||||||
|
pub duration: Option<Duration>,
|
||||||
|
#[serde(with = "time::serde::rfc3339")]
|
||||||
|
pub enqueued_at: OffsetDateTime,
|
||||||
|
#[serde(with = "time::serde::rfc3339::option", default)]
|
||||||
|
pub started_at: Option<OffsetDateTime>,
|
||||||
|
#[serde(with = "time::serde::rfc3339::option", default)]
|
||||||
|
pub finished_at: Option<OffsetDateTime>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TaskView {
|
||||||
|
pub fn from_task(task: &Task) -> TaskView {
|
||||||
|
TaskView {
|
||||||
|
uid: task.uid,
|
||||||
|
index_uid: task.index_uid().map(ToOwned::to_owned),
|
||||||
|
status: task.status,
|
||||||
|
kind: task.kind.as_kind(),
|
||||||
|
canceled_by: task.canceled_by,
|
||||||
|
details: task.details.clone().map(DetailsView::from),
|
||||||
|
error: task.error.clone(),
|
||||||
|
duration: task.started_at.zip(task.finished_at).map(|(start, end)| end - start),
|
||||||
|
enqueued_at: task.enqueued_at,
|
||||||
|
started_at: task.started_at,
|
||||||
|
finished_at: task.finished_at,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default, Debug, PartialEq, Eq, Clone, Serialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct DetailsView {
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub received_documents: Option<u64>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub indexed_documents: Option<Option<u64>>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub primary_key: Option<Option<String>>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub provided_ids: Option<usize>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub deleted_documents: Option<Option<u64>>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub matched_tasks: Option<u64>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub canceled_tasks: Option<Option<u64>>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub deleted_tasks: Option<Option<u64>>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub original_filter: Option<Option<String>>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub dump_uid: Option<Option<String>>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
#[serde(flatten)]
|
||||||
|
pub settings: Option<Box<Settings<Unchecked>>>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub swaps: Option<Vec<IndexSwap>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<Details> for DetailsView {
|
||||||
|
fn from(details: Details) -> Self {
|
||||||
|
match details {
|
||||||
|
Details::DocumentAdditionOrUpdate { received_documents, indexed_documents } => {
|
||||||
|
DetailsView {
|
||||||
|
received_documents: Some(received_documents),
|
||||||
|
indexed_documents: Some(indexed_documents),
|
||||||
|
..DetailsView::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Details::SettingsUpdate { settings } => {
|
||||||
|
DetailsView { settings: Some(settings), ..DetailsView::default() }
|
||||||
|
}
|
||||||
|
Details::IndexInfo { primary_key } => {
|
||||||
|
DetailsView { primary_key: Some(primary_key), ..DetailsView::default() }
|
||||||
|
}
|
||||||
|
Details::DocumentDeletion {
|
||||||
|
provided_ids: received_document_ids,
|
||||||
|
deleted_documents,
|
||||||
|
} => DetailsView {
|
||||||
|
provided_ids: Some(received_document_ids),
|
||||||
|
deleted_documents: Some(deleted_documents),
|
||||||
|
original_filter: Some(None),
|
||||||
|
..DetailsView::default()
|
||||||
|
},
|
||||||
|
Details::DocumentDeletionByFilter { original_filter, deleted_documents } => {
|
||||||
|
DetailsView {
|
||||||
|
provided_ids: Some(0),
|
||||||
|
original_filter: Some(Some(original_filter)),
|
||||||
|
deleted_documents: Some(deleted_documents),
|
||||||
|
..DetailsView::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Details::ClearAll { deleted_documents } => {
|
||||||
|
DetailsView { deleted_documents: Some(deleted_documents), ..DetailsView::default() }
|
||||||
|
}
|
||||||
|
Details::TaskCancelation { matched_tasks, canceled_tasks, original_filter } => {
|
||||||
|
DetailsView {
|
||||||
|
matched_tasks: Some(matched_tasks),
|
||||||
|
canceled_tasks: Some(canceled_tasks),
|
||||||
|
original_filter: Some(Some(original_filter)),
|
||||||
|
..DetailsView::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Details::TaskDeletion { matched_tasks, deleted_tasks, original_filter } => {
|
||||||
|
DetailsView {
|
||||||
|
matched_tasks: Some(matched_tasks),
|
||||||
|
deleted_tasks: Some(deleted_tasks),
|
||||||
|
original_filter: Some(Some(original_filter)),
|
||||||
|
..DetailsView::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Details::Dump { dump_uid } => {
|
||||||
|
DetailsView { dump_uid: Some(dump_uid), ..DetailsView::default() }
|
||||||
|
}
|
||||||
|
Details::IndexSwap { swaps } => {
|
||||||
|
DetailsView { swaps: Some(swaps), ..Default::default() }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -104,6 +104,7 @@ walkdir = "2.3.3"
|
|||||||
yaup = "0.2.1"
|
yaup = "0.2.1"
|
||||||
serde_urlencoded = "0.7.1"
|
serde_urlencoded = "0.7.1"
|
||||||
termcolor = "1.2.0"
|
termcolor = "1.2.0"
|
||||||
|
url = { version = "2.5.0", features = ["serde"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
actix-rt = "2.8.0"
|
actix-rt = "2.8.0"
|
||||||
|
@ -264,6 +264,8 @@ struct Infos {
|
|||||||
ignore_snapshot_if_db_exists: bool,
|
ignore_snapshot_if_db_exists: bool,
|
||||||
http_addr: bool,
|
http_addr: bool,
|
||||||
http_payload_size_limit: Byte,
|
http_payload_size_limit: Byte,
|
||||||
|
task_queue_webhook: bool,
|
||||||
|
task_webhook_authorization_header: bool,
|
||||||
log_level: String,
|
log_level: String,
|
||||||
max_indexing_memory: MaxMemory,
|
max_indexing_memory: MaxMemory,
|
||||||
max_indexing_threads: MaxThreads,
|
max_indexing_threads: MaxThreads,
|
||||||
@ -290,6 +292,8 @@ impl From<Opt> for Infos {
|
|||||||
http_addr,
|
http_addr,
|
||||||
master_key: _,
|
master_key: _,
|
||||||
env,
|
env,
|
||||||
|
task_webhook_url,
|
||||||
|
task_webhook_authorization_header,
|
||||||
max_index_size: _,
|
max_index_size: _,
|
||||||
max_task_db_size: _,
|
max_task_db_size: _,
|
||||||
http_payload_size_limit,
|
http_payload_size_limit,
|
||||||
@ -343,6 +347,8 @@ impl From<Opt> for Infos {
|
|||||||
http_addr: http_addr != default_http_addr(),
|
http_addr: http_addr != default_http_addr(),
|
||||||
http_payload_size_limit,
|
http_payload_size_limit,
|
||||||
experimental_max_number_of_batched_tasks,
|
experimental_max_number_of_batched_tasks,
|
||||||
|
task_queue_webhook: task_webhook_url.is_some(),
|
||||||
|
task_webhook_authorization_header: task_webhook_authorization_header.is_some(),
|
||||||
log_level: log_level.to_string(),
|
log_level: log_level.to_string(),
|
||||||
max_indexing_memory,
|
max_indexing_memory,
|
||||||
max_indexing_threads,
|
max_indexing_threads,
|
||||||
|
@ -228,6 +228,8 @@ fn open_or_create_database_unchecked(
|
|||||||
indexes_path: opt.db_path.join("indexes"),
|
indexes_path: opt.db_path.join("indexes"),
|
||||||
snapshots_path: opt.snapshot_dir.clone(),
|
snapshots_path: opt.snapshot_dir.clone(),
|
||||||
dumps_path: opt.dump_dir.clone(),
|
dumps_path: opt.dump_dir.clone(),
|
||||||
|
webhook_url: opt.task_webhook_url.as_ref().map(|url| url.to_string()),
|
||||||
|
webhook_authorization_header: opt.task_webhook_authorization_header.clone(),
|
||||||
task_db_size: opt.max_task_db_size.get_bytes() as usize,
|
task_db_size: opt.max_task_db_size.get_bytes() as usize,
|
||||||
index_base_map_size: opt.max_index_size.get_bytes() as usize,
|
index_base_map_size: opt.max_index_size.get_bytes() as usize,
|
||||||
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
|
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,
|
||||||
|
@ -21,6 +21,7 @@ use rustls::RootCertStore;
|
|||||||
use rustls_pemfile::{certs, pkcs8_private_keys, rsa_private_keys};
|
use rustls_pemfile::{certs, pkcs8_private_keys, rsa_private_keys};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use sysinfo::{RefreshKind, System, SystemExt};
|
use sysinfo::{RefreshKind, System, SystemExt};
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
const POSSIBLE_ENV: [&str; 2] = ["development", "production"];
|
const POSSIBLE_ENV: [&str; 2] = ["development", "production"];
|
||||||
|
|
||||||
@ -28,6 +29,8 @@ const MEILI_DB_PATH: &str = "MEILI_DB_PATH";
|
|||||||
const MEILI_HTTP_ADDR: &str = "MEILI_HTTP_ADDR";
|
const MEILI_HTTP_ADDR: &str = "MEILI_HTTP_ADDR";
|
||||||
const MEILI_MASTER_KEY: &str = "MEILI_MASTER_KEY";
|
const MEILI_MASTER_KEY: &str = "MEILI_MASTER_KEY";
|
||||||
const MEILI_ENV: &str = "MEILI_ENV";
|
const MEILI_ENV: &str = "MEILI_ENV";
|
||||||
|
const MEILI_TASK_WEBHOOK_URL: &str = "MEILI_TASK_WEBHOOK_URL";
|
||||||
|
const MEILI_TASK_WEBHOOK_AUTHORIZATION_HEADER: &str = "MEILI_TASK_WEBHOOK_AUTHORIZATION_HEADER";
|
||||||
#[cfg(feature = "analytics")]
|
#[cfg(feature = "analytics")]
|
||||||
const MEILI_NO_ANALYTICS: &str = "MEILI_NO_ANALYTICS";
|
const MEILI_NO_ANALYTICS: &str = "MEILI_NO_ANALYTICS";
|
||||||
const MEILI_HTTP_PAYLOAD_SIZE_LIMIT: &str = "MEILI_HTTP_PAYLOAD_SIZE_LIMIT";
|
const MEILI_HTTP_PAYLOAD_SIZE_LIMIT: &str = "MEILI_HTTP_PAYLOAD_SIZE_LIMIT";
|
||||||
@ -156,6 +159,14 @@ pub struct Opt {
|
|||||||
#[serde(default = "default_env")]
|
#[serde(default = "default_env")]
|
||||||
pub env: String,
|
pub env: String,
|
||||||
|
|
||||||
|
/// Called whenever a task finishes so a third party can be notified.
|
||||||
|
#[clap(long, env = MEILI_TASK_WEBHOOK_URL)]
|
||||||
|
pub task_webhook_url: Option<Url>,
|
||||||
|
|
||||||
|
/// The Authorization header to send on the webhook URL whenever a task finishes so a third party can be notified.
|
||||||
|
#[clap(long, env = MEILI_TASK_WEBHOOK_AUTHORIZATION_HEADER)]
|
||||||
|
pub task_webhook_authorization_header: Option<String>,
|
||||||
|
|
||||||
/// Deactivates Meilisearch's built-in telemetry when provided.
|
/// Deactivates Meilisearch's built-in telemetry when provided.
|
||||||
///
|
///
|
||||||
/// Meilisearch automatically collects data from all instances that do not opt out using this flag.
|
/// Meilisearch automatically collects data from all instances that do not opt out using this flag.
|
||||||
@ -375,6 +386,8 @@ impl Opt {
|
|||||||
http_addr,
|
http_addr,
|
||||||
master_key,
|
master_key,
|
||||||
env,
|
env,
|
||||||
|
task_webhook_url,
|
||||||
|
task_webhook_authorization_header,
|
||||||
max_index_size: _,
|
max_index_size: _,
|
||||||
max_task_db_size: _,
|
max_task_db_size: _,
|
||||||
http_payload_size_limit,
|
http_payload_size_limit,
|
||||||
@ -409,6 +422,16 @@ impl Opt {
|
|||||||
export_to_env_if_not_present(MEILI_MASTER_KEY, master_key);
|
export_to_env_if_not_present(MEILI_MASTER_KEY, master_key);
|
||||||
}
|
}
|
||||||
export_to_env_if_not_present(MEILI_ENV, env);
|
export_to_env_if_not_present(MEILI_ENV, env);
|
||||||
|
if let Some(task_webhook_url) = task_webhook_url {
|
||||||
|
export_to_env_if_not_present(MEILI_TASK_WEBHOOK_URL, task_webhook_url.to_string());
|
||||||
|
}
|
||||||
|
if let Some(task_webhook_authorization_header) = task_webhook_authorization_header {
|
||||||
|
export_to_env_if_not_present(
|
||||||
|
MEILI_TASK_WEBHOOK_AUTHORIZATION_HEADER,
|
||||||
|
task_webhook_authorization_header,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(feature = "analytics")]
|
#[cfg(feature = "analytics")]
|
||||||
{
|
{
|
||||||
export_to_env_if_not_present(MEILI_NO_ANALYTICS, no_analytics.to_string());
|
export_to_env_if_not_present(MEILI_NO_ANALYTICS, no_analytics.to_string());
|
||||||
|
@ -8,11 +8,9 @@ use meilisearch_types::deserr::DeserrQueryParamError;
|
|||||||
use meilisearch_types::error::deserr_codes::*;
|
use meilisearch_types::error::deserr_codes::*;
|
||||||
use meilisearch_types::error::{InvalidTaskDateError, ResponseError};
|
use meilisearch_types::error::{InvalidTaskDateError, ResponseError};
|
||||||
use meilisearch_types::index_uid::IndexUid;
|
use meilisearch_types::index_uid::IndexUid;
|
||||||
use meilisearch_types::settings::{Settings, Unchecked};
|
|
||||||
use meilisearch_types::star_or::{OptionStarOr, OptionStarOrList};
|
use meilisearch_types::star_or::{OptionStarOr, OptionStarOrList};
|
||||||
use meilisearch_types::tasks::{
|
use meilisearch_types::task_view::TaskView;
|
||||||
serialize_duration, Details, IndexSwap, Kind, KindWithContent, Status, Task,
|
use meilisearch_types::tasks::{Kind, KindWithContent, Status};
|
||||||
};
|
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use time::format_description::well_known::Rfc3339;
|
use time::format_description::well_known::Rfc3339;
|
||||||
@ -37,140 +35,6 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
|
|||||||
.service(web::resource("/cancel").route(web::post().to(SeqHandler(cancel_tasks))))
|
.service(web::resource("/cancel").route(web::post().to(SeqHandler(cancel_tasks))))
|
||||||
.service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task))));
|
.service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task))));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
pub struct TaskView {
|
|
||||||
pub uid: TaskId,
|
|
||||||
#[serde(default)]
|
|
||||||
pub index_uid: Option<String>,
|
|
||||||
pub status: Status,
|
|
||||||
#[serde(rename = "type")]
|
|
||||||
pub kind: Kind,
|
|
||||||
pub canceled_by: Option<TaskId>,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
pub details: Option<DetailsView>,
|
|
||||||
pub error: Option<ResponseError>,
|
|
||||||
#[serde(serialize_with = "serialize_duration", default)]
|
|
||||||
pub duration: Option<Duration>,
|
|
||||||
#[serde(with = "time::serde::rfc3339")]
|
|
||||||
pub enqueued_at: OffsetDateTime,
|
|
||||||
#[serde(with = "time::serde::rfc3339::option", default)]
|
|
||||||
pub started_at: Option<OffsetDateTime>,
|
|
||||||
#[serde(with = "time::serde::rfc3339::option", default)]
|
|
||||||
pub finished_at: Option<OffsetDateTime>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl TaskView {
|
|
||||||
pub fn from_task(task: &Task) -> TaskView {
|
|
||||||
TaskView {
|
|
||||||
uid: task.uid,
|
|
||||||
index_uid: task.index_uid().map(ToOwned::to_owned),
|
|
||||||
status: task.status,
|
|
||||||
kind: task.kind.as_kind(),
|
|
||||||
canceled_by: task.canceled_by,
|
|
||||||
details: task.details.clone().map(DetailsView::from),
|
|
||||||
error: task.error.clone(),
|
|
||||||
duration: task.started_at.zip(task.finished_at).map(|(start, end)| end - start),
|
|
||||||
enqueued_at: task.enqueued_at,
|
|
||||||
started_at: task.started_at,
|
|
||||||
finished_at: task.finished_at,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default, Debug, PartialEq, Eq, Clone, Serialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
pub struct DetailsView {
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
pub received_documents: Option<u64>,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
pub indexed_documents: Option<Option<u64>>,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
pub primary_key: Option<Option<String>>,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
pub provided_ids: Option<usize>,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
pub deleted_documents: Option<Option<u64>>,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
pub matched_tasks: Option<u64>,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
pub canceled_tasks: Option<Option<u64>>,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
pub deleted_tasks: Option<Option<u64>>,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
pub original_filter: Option<Option<String>>,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
pub dump_uid: Option<Option<String>>,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
#[serde(flatten)]
|
|
||||||
pub settings: Option<Box<Settings<Unchecked>>>,
|
|
||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
|
||||||
pub swaps: Option<Vec<IndexSwap>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl From<Details> for DetailsView {
|
|
||||||
fn from(details: Details) -> Self {
|
|
||||||
match details {
|
|
||||||
Details::DocumentAdditionOrUpdate { received_documents, indexed_documents } => {
|
|
||||||
DetailsView {
|
|
||||||
received_documents: Some(received_documents),
|
|
||||||
indexed_documents: Some(indexed_documents),
|
|
||||||
..DetailsView::default()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Details::SettingsUpdate { settings } => {
|
|
||||||
DetailsView { settings: Some(settings), ..DetailsView::default() }
|
|
||||||
}
|
|
||||||
Details::IndexInfo { primary_key } => {
|
|
||||||
DetailsView { primary_key: Some(primary_key), ..DetailsView::default() }
|
|
||||||
}
|
|
||||||
Details::DocumentDeletion {
|
|
||||||
provided_ids: received_document_ids,
|
|
||||||
deleted_documents,
|
|
||||||
} => DetailsView {
|
|
||||||
provided_ids: Some(received_document_ids),
|
|
||||||
deleted_documents: Some(deleted_documents),
|
|
||||||
original_filter: Some(None),
|
|
||||||
..DetailsView::default()
|
|
||||||
},
|
|
||||||
Details::DocumentDeletionByFilter { original_filter, deleted_documents } => {
|
|
||||||
DetailsView {
|
|
||||||
provided_ids: Some(0),
|
|
||||||
original_filter: Some(Some(original_filter)),
|
|
||||||
deleted_documents: Some(deleted_documents),
|
|
||||||
..DetailsView::default()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Details::ClearAll { deleted_documents } => {
|
|
||||||
DetailsView { deleted_documents: Some(deleted_documents), ..DetailsView::default() }
|
|
||||||
}
|
|
||||||
Details::TaskCancelation { matched_tasks, canceled_tasks, original_filter } => {
|
|
||||||
DetailsView {
|
|
||||||
matched_tasks: Some(matched_tasks),
|
|
||||||
canceled_tasks: Some(canceled_tasks),
|
|
||||||
original_filter: Some(Some(original_filter)),
|
|
||||||
..DetailsView::default()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Details::TaskDeletion { matched_tasks, deleted_tasks, original_filter } => {
|
|
||||||
DetailsView {
|
|
||||||
matched_tasks: Some(matched_tasks),
|
|
||||||
deleted_tasks: Some(deleted_tasks),
|
|
||||||
original_filter: Some(Some(original_filter)),
|
|
||||||
..DetailsView::default()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Details::Dump { dump_uid } => {
|
|
||||||
DetailsView { dump_uid: Some(dump_uid), ..DetailsView::default() }
|
|
||||||
}
|
|
||||||
Details::IndexSwap { swaps } => {
|
|
||||||
DetailsView { swaps: Some(swaps), ..Default::default() }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Deserr)]
|
#[derive(Debug, Deserr)]
|
||||||
#[deserr(error = DeserrQueryParamError, rename_all = camelCase, deny_unknown_fields)]
|
#[deserr(error = DeserrQueryParamError, rename_all = camelCase, deny_unknown_fields)]
|
||||||
pub struct TasksFilterQuery {
|
pub struct TasksFilterQuery {
|
||||||
|
@ -1,4 +1,5 @@
|
|||||||
mod errors;
|
mod errors;
|
||||||
|
mod webhook;
|
||||||
|
|
||||||
use meili_snap::insta::assert_json_snapshot;
|
use meili_snap::insta::assert_json_snapshot;
|
||||||
use time::format_description::well_known::Rfc3339;
|
use time::format_description::well_known::Rfc3339;
|
||||||
|
123
meilisearch/tests/tasks/webhook.rs
Normal file
123
meilisearch/tests/tasks/webhook.rs
Normal file
@ -0,0 +1,123 @@
|
|||||||
|
//! To test the webhook, we need to spawn a new server with a URL listening for
|
||||||
|
//! post requests. The webhook handle starts a server and forwards all the
|
||||||
|
//! received requests into a channel for you to handle.
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use actix_http::body::MessageBody;
|
||||||
|
use actix_web::dev::{ServiceFactory, ServiceResponse};
|
||||||
|
use actix_web::web::{Bytes, Data};
|
||||||
|
use actix_web::{post, App, HttpResponse, HttpServer};
|
||||||
|
use meili_snap::{json_string, snapshot};
|
||||||
|
use meilisearch::Opt;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
use crate::common::{default_settings, Server};
|
||||||
|
use crate::json;
|
||||||
|
|
||||||
|
#[post("/")]
|
||||||
|
async fn forward_body(sender: Data<mpsc::UnboundedSender<Vec<u8>>>, body: Bytes) -> HttpResponse {
|
||||||
|
let body = body.to_vec();
|
||||||
|
sender.send(body).unwrap();
|
||||||
|
HttpResponse::Ok().into()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_app(
|
||||||
|
sender: Arc<mpsc::UnboundedSender<Vec<u8>>>,
|
||||||
|
) -> actix_web::App<
|
||||||
|
impl ServiceFactory<
|
||||||
|
actix_web::dev::ServiceRequest,
|
||||||
|
Config = (),
|
||||||
|
Response = ServiceResponse<impl MessageBody>,
|
||||||
|
Error = actix_web::Error,
|
||||||
|
InitError = (),
|
||||||
|
>,
|
||||||
|
> {
|
||||||
|
App::new().service(forward_body).app_data(Data::from(sender))
|
||||||
|
}
|
||||||
|
|
||||||
|
struct WebhookHandle {
|
||||||
|
pub server_handle: tokio::task::JoinHandle<Result<(), std::io::Error>>,
|
||||||
|
pub url: String,
|
||||||
|
pub receiver: mpsc::UnboundedReceiver<Vec<u8>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_webhook_server() -> WebhookHandle {
|
||||||
|
let mut log_builder = env_logger::Builder::new();
|
||||||
|
log_builder.parse_filters("info");
|
||||||
|
log_builder.init();
|
||||||
|
|
||||||
|
let (sender, receiver) = mpsc::unbounded_channel();
|
||||||
|
let sender = Arc::new(sender);
|
||||||
|
|
||||||
|
// By listening on the port 0, the system will give us any available port.
|
||||||
|
let server =
|
||||||
|
HttpServer::new(move || create_app(sender.clone())).bind(("127.0.0.1", 0)).unwrap();
|
||||||
|
let (ip, scheme) = server.addrs_with_scheme()[0];
|
||||||
|
let url = format!("{scheme}://{ip}/");
|
||||||
|
|
||||||
|
let server_handle = tokio::spawn(server.run());
|
||||||
|
WebhookHandle { server_handle, url, receiver }
|
||||||
|
}
|
||||||
|
|
||||||
|
#[actix_web::test]
|
||||||
|
async fn test_basic_webhook() {
|
||||||
|
let WebhookHandle { server_handle, url, mut receiver } = create_webhook_server().await;
|
||||||
|
|
||||||
|
let db_path = tempfile::tempdir().unwrap();
|
||||||
|
let server = Server::new_with_options(Opt {
|
||||||
|
task_webhook_url: Some(Url::parse(&url).unwrap()),
|
||||||
|
..default_settings(db_path.path())
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let index = server.index("tamo");
|
||||||
|
// May be flaky: we're relying on the fact that while the first document addition is processed, the other
|
||||||
|
// operations will be received and will be batched together. If it doesn't happen it's not a problem
|
||||||
|
// the rest of the test won't assume anything about the number of tasks per batch.
|
||||||
|
for i in 0..5 {
|
||||||
|
let (_, _status) = index.add_documents(json!({ "id": i, "doggo": "bone" }), None).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut nb_tasks = 0;
|
||||||
|
while let Some(payload) = receiver.recv().await {
|
||||||
|
let payload = String::from_utf8(payload).unwrap();
|
||||||
|
let jsonl = payload.split('\n');
|
||||||
|
for json in jsonl {
|
||||||
|
if json.is_empty() {
|
||||||
|
break; // we reached EOF
|
||||||
|
}
|
||||||
|
nb_tasks += 1;
|
||||||
|
let json: serde_json::Value = serde_json::from_str(json).unwrap();
|
||||||
|
snapshot!(
|
||||||
|
json_string!(json, { ".uid" => "[uid]", ".duration" => "[duration]", ".enqueuedAt" => "[date]", ".startedAt" => "[date]", ".finishedAt" => "[date]" }),
|
||||||
|
@r###"
|
||||||
|
{
|
||||||
|
"uid": "[uid]",
|
||||||
|
"indexUid": "tamo",
|
||||||
|
"status": "succeeded",
|
||||||
|
"type": "documentAdditionOrUpdate",
|
||||||
|
"canceledBy": null,
|
||||||
|
"details": {
|
||||||
|
"receivedDocuments": 1,
|
||||||
|
"indexedDocuments": 1
|
||||||
|
},
|
||||||
|
"error": null,
|
||||||
|
"duration": "[duration]",
|
||||||
|
"enqueuedAt": "[date]",
|
||||||
|
"startedAt": "[date]",
|
||||||
|
"finishedAt": "[date]"
|
||||||
|
}
|
||||||
|
"###);
|
||||||
|
}
|
||||||
|
if nb_tasks == 5 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(nb_tasks == 5, "We should have received the 5 tasks but only received {nb_tasks}");
|
||||||
|
|
||||||
|
server_handle.abort();
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user