Implement the webhook

This commit is contained in:
Tamo 2023-11-27 15:11:22 +01:00 committed by Clément Renault
parent 1956045a06
commit d78ad51082
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
5 changed files with 36 additions and 3 deletions

1
Cargo.lock generated
View File

@ -2792,6 +2792,7 @@ dependencies = [
"tempfile",
"thiserror",
"time",
"ureq",
"uuid 1.5.0",
]

View File

@ -30,6 +30,7 @@ synchronoise = "1.0.1"
tempfile = "3.5.0"
thiserror = "1.0.40"
time = { version = "0.3.20", features = ["serde-well-known", "formatting", "parsing", "macros"] }
ureq = "2.9.1"
uuid = { version = "1.3.1", features = ["serde", "v4"] }
[dev-dependencies]

View File

@ -37,6 +37,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
snapshots_path: _,
auth_path: _,
version_file_path: _,
webhook_url: _,
test_breakpoint_sdr: _,
planned_failures: _,
run_loop_iteration: _,

View File

@ -170,8 +170,8 @@ impl ProcessingTasks {
}
/// Set the processing tasks to an empty list
fn stop_processing(&mut self) {
self.processing = RoaringBitmap::new();
fn stop_processing(&mut self) -> RoaringBitmap {
std::mem::take(&mut self.processing)
}
/// Returns `true` if there, at least, is one task that is currently processing that we must stop.
@ -241,6 +241,7 @@ pub struct IndexSchedulerOptions {
pub snapshots_path: PathBuf,
/// The path to the folder containing the dumps.
pub dumps_path: PathBuf,
pub webhook_url: Option<String>,
/// The maximum size, in bytes, of the task index.
pub task_db_size: usize,
/// The size, in bytes, with which a meilisearch index is opened the first time of each meilisearch index.
@ -323,6 +324,9 @@ pub struct IndexScheduler {
/// The maximum number of tasks that will be batched together.
pub(crate) max_number_of_batched_tasks: usize,
/// The webhook url we should send tasks to after processing every batches.
pub(crate) webhook_url: Option<String>,
/// A frame to output the indexation profiling files to disk.
pub(crate) puffin_frame: Arc<puffin::GlobalFrameView>,
@ -388,6 +392,7 @@ impl IndexScheduler {
dumps_path: self.dumps_path.clone(),
auth_path: self.auth_path.clone(),
version_file_path: self.version_file_path.clone(),
webhook_url: self.webhook_url.clone(),
currently_updating_index: self.currently_updating_index.clone(),
embedders: self.embedders.clone(),
#[cfg(test)]
@ -487,6 +492,7 @@ impl IndexScheduler {
snapshots_path: options.snapshots_path,
auth_path: options.auth_path,
version_file_path: options.version_file_path,
webhook_url: options.webhook_url,
currently_updating_index: Arc::new(RwLock::new(None)),
embedders: Default::default(),
@ -1251,19 +1257,41 @@ impl IndexScheduler {
}
}
self.processing_tasks.write().unwrap().stop_processing();
let processed = self.processing_tasks.write().unwrap().stop_processing();
#[cfg(test)]
self.maybe_fail(tests::FailureLocation::CommittingWtxn)?;
wtxn.commit().map_err(Error::HeedTransaction)?;
// We shouldn't crash the tick function if we can't send data to the webhook.
let _ = self.notify_webhook(&processed);
#[cfg(test)]
self.breakpoint(Breakpoint::AfterProcessing);
Ok(TickOutcome::TickAgain(processed_tasks))
}
/// Once the tasks changes have been commited we must send all the tasks that were updated to our webhook if there is one.
fn notify_webhook(&self, updated: &RoaringBitmap) -> Result<()> {
if let Some(ref url) = self.webhook_url {
let rtxn = self.env.read_txn()?;
// on average a task takes ~50 bytes
let mut buffer = Vec::with_capacity(updated.len() as usize * 50);
for id in updated {
let task = self.get_task(&rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
let _ = serde_json::to_writer(&mut buffer, &task);
}
let _ = ureq::post(url).send_bytes(&buffer);
}
Ok(())
}
/// Register a task to cleanup the task queue if needed
fn cleanup_task_queue(&self) -> Result<()> {
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
@ -1677,6 +1705,7 @@ mod tests {
indexes_path: tempdir.path().join("indexes"),
snapshots_path: tempdir.path().join("snapshots"),
dumps_path: tempdir.path().join("dumps"),
webhook_url: None,
task_db_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
index_base_map_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
enable_mdb_writemap: false,

View File

@ -228,6 +228,7 @@ fn open_or_create_database_unchecked(
indexes_path: opt.db_path.join("indexes"),
snapshots_path: opt.snapshot_dir.clone(),
dumps_path: opt.dump_dir.clone(),
webhook_url: opt.task_webhook_url.clone(),
task_db_size: opt.max_task_db_size.get_bytes() as usize,
index_base_map_size: opt.max_index_size.get_bytes() as usize,
enable_mdb_writemap: opt.experimental_reduce_indexing_memory_usage,