From d78ad51082c4cfc3598a0105655f25154c9e9b60 Mon Sep 17 00:00:00 2001 From: Tamo Date: Mon, 27 Nov 2023 15:11:22 +0100 Subject: [PATCH] Implement the webhook --- Cargo.lock | 1 + index-scheduler/Cargo.toml | 1 + index-scheduler/src/insta_snapshot.rs | 1 + index-scheduler/src/lib.rs | 35 ++++++++++++++++++++++++--- meilisearch/src/lib.rs | 1 + 5 files changed, 36 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d57d381ed..ca7eb715f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2792,6 +2792,7 @@ dependencies = [ "tempfile", "thiserror", "time", + "ureq", "uuid 1.5.0", ] diff --git a/index-scheduler/Cargo.toml b/index-scheduler/Cargo.toml index c4a37b7d6..4d6e4ffd0 100644 --- a/index-scheduler/Cargo.toml +++ b/index-scheduler/Cargo.toml @@ -30,6 +30,7 @@ synchronoise = "1.0.1" tempfile = "3.5.0" thiserror = "1.0.40" time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] } +ureq = "2.9.1" uuid = { version = "1.3.1", features = ["serde", "v4"] } [dev-dependencies] diff --git a/index-scheduler/src/insta_snapshot.rs b/index-scheduler/src/insta_snapshot.rs index ddb9e934a..9261bf66d 100644 --- a/index-scheduler/src/insta_snapshot.rs +++ b/index-scheduler/src/insta_snapshot.rs @@ -37,6 +37,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String { snapshots_path: _, auth_path: _, version_file_path: _, + webhook_url: _, test_breakpoint_sdr: _, planned_failures: _, run_loop_iteration: _, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index b9b360fa4..6756990af 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -170,8 +170,8 @@ impl ProcessingTasks { } /// Set the processing tasks to an empty list - fn stop_processing(&mut self) { - self.processing = RoaringBitmap::new(); + fn stop_processing(&mut self) -> RoaringBitmap { + std::mem::take(&mut self.processing) } /// Returns `true` if there, at least, is one task that is currently processing that we must stop. @@ -241,6 +241,7 @@ pub struct IndexSchedulerOptions { pub snapshots_path: PathBuf, /// The path to the folder containing the dumps. pub dumps_path: PathBuf, + pub webhook_url: Option, /// The maximum size, in bytes, of the task index. pub task_db_size: usize, /// The size, in bytes, with which a meilisearch index is opened the first time of each meilisearch index. @@ -323,6 +324,9 @@ pub struct IndexScheduler { /// The maximum number of tasks that will be batched together. pub(crate) max_number_of_batched_tasks: usize, + /// The webhook url we should send tasks to after processing every batches. + pub(crate) webhook_url: Option, + /// A frame to output the indexation profiling files to disk. pub(crate) puffin_frame: Arc, @@ -388,6 +392,7 @@ impl IndexScheduler { dumps_path: self.dumps_path.clone(), auth_path: self.auth_path.clone(), version_file_path: self.version_file_path.clone(), + webhook_url: self.webhook_url.clone(), currently_updating_index: self.currently_updating_index.clone(), embedders: self.embedders.clone(), #[cfg(test)] @@ -487,6 +492,7 @@ impl IndexScheduler { snapshots_path: options.snapshots_path, auth_path: options.auth_path, version_file_path: options.version_file_path, + webhook_url: options.webhook_url, currently_updating_index: Arc::new(RwLock::new(None)), embedders: Default::default(), @@ -1251,19 +1257,41 @@ impl IndexScheduler { } } - self.processing_tasks.write().unwrap().stop_processing(); + let processed = self.processing_tasks.write().unwrap().stop_processing(); #[cfg(test)] self.maybe_fail(tests::FailureLocation::CommittingWtxn)?; wtxn.commit().map_err(Error::HeedTransaction)?; + // We shouldn't crash the tick function if we can't send data to the webhook. + let _ = self.notify_webhook(&processed); + #[cfg(test)] self.breakpoint(Breakpoint::AfterProcessing); Ok(TickOutcome::TickAgain(processed_tasks)) } + /// Once the tasks changes have been commited we must send all the tasks that were updated to our webhook if there is one. + fn notify_webhook(&self, updated: &RoaringBitmap) -> Result<()> { + if let Some(ref url) = self.webhook_url { + let rtxn = self.env.read_txn()?; + + // on average a task takes ~50 bytes + let mut buffer = Vec::with_capacity(updated.len() as usize * 50); + + for id in updated { + let task = self.get_task(&rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?; + let _ = serde_json::to_writer(&mut buffer, &task); + } + + let _ = ureq::post(url).send_bytes(&buffer); + } + + Ok(()) + } + /// Register a task to cleanup the task queue if needed fn cleanup_task_queue(&self) -> Result<()> { let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?; @@ -1677,6 +1705,7 @@ mod tests { indexes_path: tempdir.path().join("indexes"), snapshots_path: tempdir.path().join("snapshots"), dumps_path: tempdir.path().join("dumps"), + webhook_url: None, task_db_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose. index_base_map_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose. enable_mdb_writemap: false, diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index e0f488eea..14a1c5b45 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -228,6 +228,7 @@ fn open_or_create_database_unchecked( indexes_path: opt.db_path.join("indexes"), snapshots_path: opt.snapshot_dir.clone(), dumps_path: opt.dump_dir.clone(), + webhook_url: opt.task_webhook_url.clone(), task_db_size: opt.max_task_db_size.get_bytes() as usize, index_base_map_size: opt.max_index_size.get_bytes() as usize, enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,