From 7b47e4e87a4c4d80c718fb2b2c3175a598a5bb50 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Mon, 23 May 2022 16:30:06 +0200 Subject: [PATCH] snapshot batch handler --- meilisearch-lib/src/index_controller/mod.rs | 5 ++- meilisearch-lib/src/snapshot.rs | 2 +- .../src/tasks/batch_handlers/mod.rs | 1 + .../tasks/batch_handlers/snapshot_handler.rs | 31 +++++++++++++++++++ meilisearch-lib/src/tasks/mod.rs | 1 + meilisearch-lib/src/tasks/scheduler.rs | 6 +--- 6 files changed, 39 insertions(+), 7 deletions(-) create mode 100644 meilisearch-lib/src/tasks/batch_handlers/snapshot_handler.rs diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index 1eb61d9f0..039bd8dfa 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -27,7 +27,9 @@ use crate::options::{IndexerOpts, SchedulerConfig}; use crate::snapshot::{load_snapshot, SnapshotService}; use crate::tasks::error::TaskError; use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskId}; -use crate::tasks::{BatchHandler, EmptyBatchHandler, Scheduler, TaskFilter, TaskStore}; +use crate::tasks::{ + BatchHandler, EmptyBatchHandler, Scheduler, SnapshotHandler, TaskFilter, TaskStore, +}; use error::Result; use self::error::IndexControllerError; @@ -235,6 +237,7 @@ impl IndexControllerBuilder { let handlers: Vec> = vec![ index_resolver.clone(), dump_handler, + Arc::new(SnapshotHandler), // dummy handler to catch all empty batches Arc::new(EmptyBatchHandler), ]; diff --git a/meilisearch-lib/src/snapshot.rs b/meilisearch-lib/src/snapshot.rs index 6dda0f3e8..527195729 100644 --- a/meilisearch-lib/src/snapshot.rs +++ b/meilisearch-lib/src/snapshot.rs @@ -38,7 +38,7 @@ impl SnapshotService { meta_env_size: self.meta_env_size, index_size: self.index_size, }; - self.scheduler.write().await.register_snapshot(snapshot_job); + self.scheduler.write().await.schedule_snapshot(snapshot_job); sleep(self.snapshot_period).await; } } diff --git a/meilisearch-lib/src/tasks/batch_handlers/mod.rs b/meilisearch-lib/src/tasks/batch_handlers/mod.rs index f72c1b760..9199e872d 100644 --- a/meilisearch-lib/src/tasks/batch_handlers/mod.rs +++ b/meilisearch-lib/src/tasks/batch_handlers/mod.rs @@ -1,3 +1,4 @@ pub mod dump_handler; pub mod empty_handler; mod index_resolver_handler; +pub mod snapshot_handler; diff --git a/meilisearch-lib/src/tasks/batch_handlers/snapshot_handler.rs b/meilisearch-lib/src/tasks/batch_handlers/snapshot_handler.rs new file mode 100644 index 000000000..2948fb4ff --- /dev/null +++ b/meilisearch-lib/src/tasks/batch_handlers/snapshot_handler.rs @@ -0,0 +1,31 @@ +use crate::tasks::batch::{Batch, BatchContent}; +use crate::tasks::BatchHandler; + +pub struct SnapshotHandler; + +#[async_trait::async_trait] +impl BatchHandler for SnapshotHandler { + fn accept(&self, batch: &Batch) -> bool { + match batch.content { + BatchContent::Snapshot(_) => true, + _ => false, + } + } + + async fn process_batch(&self, batch: Batch) -> Batch { + match batch.content { + BatchContent::Snapshot(job) => { + if let Err(e) = job.run().await { + log::error!("snapshot error: {e}"); + } + } + _ => unreachable!(), + } + + Batch::empty() + } + + async fn finish(&self, _: &Batch) { + () + } +} diff --git a/meilisearch-lib/src/tasks/mod.rs b/meilisearch-lib/src/tasks/mod.rs index bc01c4901..4c51ec207 100644 --- a/meilisearch-lib/src/tasks/mod.rs +++ b/meilisearch-lib/src/tasks/mod.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; pub use batch_handlers::empty_handler::EmptyBatchHandler; +pub use batch_handlers::snapshot_handler::SnapshotHandler; pub use scheduler::Scheduler; pub use task_store::TaskFilter; diff --git a/meilisearch-lib/src/tasks/scheduler.rs b/meilisearch-lib/src/tasks/scheduler.rs index 1b3fd6daa..6089efd7f 100644 --- a/meilisearch-lib/src/tasks/scheduler.rs +++ b/meilisearch-lib/src/tasks/scheduler.rs @@ -279,10 +279,6 @@ impl Scheduler { self.tasks.insert(task); } - pub fn register_snapshot(&mut self, job: SnapshotJob) { - self.snapshots.push_back(job); - } - /// Clears the processing list, this method should be called when the processing of a batch is finished. pub fn finish(&mut self) { self.processing = Processing::Nothing; @@ -340,7 +336,7 @@ impl Scheduler { Ok(tasks) } - pub async fn schedule_snapshot(&mut self, job: SnapshotJob) { + pub fn schedule_snapshot(&mut self, job: SnapshotJob) { self.snapshots.push_back(job); self.notify(); }