diff --git a/meilisearch-http/src/task.rs b/meilisearch-http/src/task.rs index 3febe002f..f10d7e110 100644 --- a/meilisearch-http/src/task.rs +++ b/meilisearch-http/src/task.rs @@ -82,6 +82,8 @@ enum TaskDetails { }, #[serde(rename_all = "camelCase")] ClearAll { deleted_documents: Option }, + #[serde(rename_all = "camelCase")] + Dump { dump_uid: String }, } /// Serialize a `time::Duration` as a best effort ISO 8601 while waiting for @@ -218,7 +220,9 @@ impl From for TaskView { TaskType::IndexUpdate, Some(TaskDetails::IndexInfo { primary_key }), ), - TaskContent::Dump => (TaskType::Dump, None), + TaskContent::Dump { uid } => { + (TaskType::Dump, Some(TaskDetails::Dump { dump_uid: uid })) + } }; // An event always has at least one event: "Created" diff --git a/meilisearch-lib/src/dump/message.rs b/meilisearch-lib/src/dump/message.rs index 6c9dded9f..8ebeb3b57 100644 --- a/meilisearch-lib/src/dump/message.rs +++ b/meilisearch-lib/src/dump/message.rs @@ -1,7 +1,6 @@ use tokio::sync::oneshot; use super::error::Result; -use super::DumpInfo; pub enum DumpMsg { CreateDump { diff --git a/meilisearch-lib/src/dump/mod.rs b/meilisearch-lib/src/dump/mod.rs index 05deb8a40..b14b356fd 100644 --- a/meilisearch-lib/src/dump/mod.rs +++ b/meilisearch-lib/src/dump/mod.rs @@ -5,10 +5,12 @@ use std::sync::Arc; use anyhow::bail; use log::{info, trace}; use meilisearch_auth::AuthController; +use milli::heed::Env; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use tempfile::TempDir; +use time::macros::format_description; use tokio::fs::create_dir_all; use crate::analytics; @@ -18,6 +20,7 @@ use crate::index_resolver::index_store::IndexStore; use crate::index_resolver::meta_store::IndexMetaStore; use crate::index_resolver::IndexResolver; use crate::options::IndexerOpts; +use crate::tasks::TaskStore; use crate::update_file_store::UpdateFileStore; use error::Result; @@ -259,22 +262,31 @@ fn persist_dump(dst_path: impl AsRef, tmp_dst: TempDir) -> anyhow::Result< Ok(()) } -pub struct DumpJob { +/// Generate uid from creation date +pub fn generate_uid() -> String { + OffsetDateTime::now_utc() + .format(format_description!( + "[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]" + )) + .unwrap() +} + +pub struct DumpHandler { pub dump_path: PathBuf, pub db_path: PathBuf, pub update_file_store: UpdateFileStore, - pub uid: String, - pub update_db_size: usize, + pub task_store_size: usize, pub index_db_size: usize, + pub env: Arc, pub index_resolver: Arc>, } -impl DumpJob +impl DumpHandler where - U: IndexMetaStore, - I: IndexStore, + U: IndexMetaStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, { - pub async fn run(self) -> Result<()> { + pub async fn run(&self, uid: String) -> Result<()> { trace!("Performing dump."); create_dir_all(&self.dump_path).await?; @@ -282,7 +294,7 @@ where let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??; let temp_dump_path = temp_dump_dir.path().to_owned(); - let meta = MetadataVersion::new_v4(self.index_db_size, self.update_db_size); + let meta = MetadataVersion::new_v4(self.index_db_size, self.task_store_size); let meta_path = temp_dump_path.join(META_FILE_NAME); let mut meta_file = File::create(&meta_path)?; serde_json::to_writer(&mut meta_file, &meta)?; @@ -292,25 +304,25 @@ where // TODO: this is blocking!! AuthController::dump(&self.db_path, &temp_dump_path)?; + TaskStore::dump( + self.env.clone(), + &self.dump_path, + self.update_file_store.clone(), + ) + .await?; self.index_resolver.dump(&self.dump_path).await?; - //TODO(marin): this is not right, the scheduler should dump itself, not do it here... - // self.scheduler - // .read() - // .await - // .dump(&temp_dump_path, self.update_file_store.clone()) - // .await?; - + let dump_path = self.dump_path.clone(); let dump_path = tokio::task::spawn_blocking(move || -> Result { // for now we simply copy the updates/updates_files // FIXME: We may copy more files than necessary, if new files are added while we are // performing the dump. We need a way to filter them out. - let temp_dump_file = tempfile::NamedTempFile::new_in(&self.dump_path)?; + let temp_dump_file = tempfile::NamedTempFile::new_in(&dump_path)?; to_tar_gz(temp_dump_path, temp_dump_file.path()) .map_err(|e| DumpError::Internal(e.into()))?; - let dump_path = self.dump_path.join(self.uid).with_extension("dump"); + let dump_path = dump_path.join(uid).with_extension("dump"); temp_dump_file.persist(&dump_path)?; Ok(dump_path) diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index f89ebec4e..1eb61d9f0 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -19,7 +19,7 @@ use tokio::time::sleep; use uuid::Uuid; use crate::document_formats::{read_csv, read_json, read_ndjson}; -use crate::dump::load_dump; +use crate::dump::{self, load_dump, DumpHandler}; use crate::index::{ Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked, }; @@ -27,9 +27,7 @@ use crate::options::{IndexerOpts, SchedulerConfig}; use crate::snapshot::{load_snapshot, SnapshotService}; use crate::tasks::error::TaskError; use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskId}; -use crate::tasks::{ - BatchHandler, DumpHandler, EmptyBatchHandler, Scheduler, TaskFilter, TaskStore, -}; +use crate::tasks::{BatchHandler, EmptyBatchHandler, Scheduler, TaskFilter, TaskStore}; use error::Result; use self::error::IndexControllerError; @@ -222,14 +220,15 @@ impl IndexControllerBuilder { .dump_dst .ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?; - let dump_handler = Arc::new(DumpHandler::new( - update_file_store.clone(), + let dump_handler = Arc::new(DumpHandler { dump_path, - db_path.as_ref().clone(), - index_size, + db_path: db_path.as_ref().into(), + update_file_store: update_file_store.clone(), task_store_size, - index_resolver.clone(), - )); + index_db_size: index_size, + env: meta_env.clone(), + index_resolver: index_resolver.clone(), + }); let task_store = TaskStore::new(meta_env)?; // register all the batch handlers for use with the scheduler. @@ -421,7 +420,8 @@ where } pub async fn register_dump_task(&self) -> Result { - let content = TaskContent::Dump; + let uid = dump::generate_uid(); + let content = TaskContent::Dump { uid }; let task = self.task_store.register(None, content).await?; self.scheduler.read().await.notify(); Ok(task) diff --git a/meilisearch-lib/src/tasks/batch_handlers/dump_handler.rs b/meilisearch-lib/src/tasks/batch_handlers/dump_handler.rs index 057cf274f..fc506522f 100644 --- a/meilisearch-lib/src/tasks/batch_handlers/dump_handler.rs +++ b/meilisearch-lib/src/tasks/batch_handlers/dump_handler.rs @@ -1,101 +1,34 @@ -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use log::{error, trace}; -use time::{macros::format_description, OffsetDateTime}; - -use crate::dump::DumpJob; +use crate::dump::DumpHandler; use crate::index_resolver::index_store::IndexStore; use crate::index_resolver::meta_store::IndexMetaStore; -use crate::index_resolver::IndexResolver; use crate::tasks::batch::{Batch, BatchContent}; +use crate::tasks::task::{Task, TaskContent, TaskEvent, TaskResult}; use crate::tasks::BatchHandler; -use crate::update_file_store::UpdateFileStore; - -pub struct DumpHandler { - update_file_store: UpdateFileStore, - index_resolver: Arc>, - dump_path: PathBuf, - db_path: PathBuf, - update_db_size: usize, - index_db_size: usize, -} - -/// Generate uid from creation date -fn generate_uid() -> String { - OffsetDateTime::now_utc() - .format(format_description!( - "[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]" - )) - .unwrap() -} - -impl DumpHandler -where - U: IndexMetaStore + Send + Sync + 'static, - I: IndexStore + Send + Sync + 'static, -{ - pub fn new( - update_file_store: UpdateFileStore, - dump_path: impl AsRef, - db_path: impl AsRef, - index_db_size: usize, - update_db_size: usize, - index_resolver: Arc>, - ) -> Self { - Self { - update_file_store, - dump_path: dump_path.as_ref().into(), - db_path: db_path.as_ref().into(), - index_db_size, - update_db_size, - index_resolver, - } - } - - async fn create_dump(&self) { - let uid = generate_uid(); - - let task = DumpJob { - dump_path: self.dump_path.clone(), - db_path: self.db_path.clone(), - update_file_store: self.update_file_store.clone(), - uid: uid.clone(), - update_db_size: self.update_db_size, - index_db_size: self.index_db_size, - index_resolver: self.index_resolver.clone(), - }; - - let task_result = tokio::task::spawn_local(task.run()).await; - - match task_result { - Ok(Ok(())) => { - trace!("Dump succeed"); - } - Ok(Err(e)) => { - error!("Dump failed: {}", e); - } - Err(_) => { - error!("Dump panicked. Dump status set to failed"); - } - }; - } -} #[async_trait::async_trait] impl BatchHandler for DumpHandler where - U: IndexMetaStore + Send + Sync + 'static, - I: IndexStore + Send + Sync + 'static, + U: IndexMetaStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, { fn accept(&self, batch: &Batch) -> bool { matches!(batch.content, BatchContent::Dump { .. }) } - async fn process_batch(&self, batch: Batch) -> Batch { - match batch.content { - BatchContent::Dump { .. } => { - self.create_dump().await; + async fn process_batch(&self, mut batch: Batch) -> Batch { + match &batch.content { + BatchContent::Dump(Task { + content: TaskContent::Dump { uid }, + .. + }) => { + match self.run(uid.clone()).await { + Ok(_) => { + batch + .content + .push_event(TaskEvent::succeeded(TaskResult::Other)); + } + Err(e) => batch.content.push_event(TaskEvent::failed(e.into())), + } batch } _ => unreachable!("invalid batch content for dump"), diff --git a/meilisearch-lib/src/tasks/mod.rs b/meilisearch-lib/src/tasks/mod.rs index faa35f2da..bc01c4901 100644 --- a/meilisearch-lib/src/tasks/mod.rs +++ b/meilisearch-lib/src/tasks/mod.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; -pub use batch_handlers::{dump_handler::DumpHandler, empty_handler::EmptyBatchHandler}; +pub use batch_handlers::empty_handler::EmptyBatchHandler; pub use scheduler::Scheduler; pub use task_store::TaskFilter; diff --git a/meilisearch-lib/src/tasks/scheduler.rs b/meilisearch-lib/src/tasks/scheduler.rs index f3018b782..1b3fd6daa 100644 --- a/meilisearch-lib/src/tasks/scheduler.rs +++ b/meilisearch-lib/src/tasks/scheduler.rs @@ -1,7 +1,6 @@ use std::cmp::Ordering; use std::collections::{hash_map::Entry, BinaryHeap, HashMap, VecDeque}; use std::ops::{Deref, DerefMut}; -use std::path::Path; use std::slice; use std::sync::Arc; use std::time::Duration; @@ -13,7 +12,6 @@ use tokio::sync::{watch, RwLock}; use crate::options::SchedulerConfig; use crate::snapshot::SnapshotJob; -use crate::update_file_store::UpdateFileStore; use super::batch::{Batch, BatchContent}; use super::error::Result; @@ -276,10 +274,6 @@ impl Scheduler { Ok(this) } - pub async fn dump(&self, path: &Path, file_store: UpdateFileStore) -> Result<()> { - self.store.dump(path, file_store).await - } - fn register_task(&mut self, task: Task) { assert!(!task.is_finished()); self.tasks.insert(task); diff --git a/meilisearch-lib/src/tasks/task.rs b/meilisearch-lib/src/tasks/task.rs index 41a536a1e..0e0aa8af2 100644 --- a/meilisearch-lib/src/tasks/task.rs +++ b/meilisearch-lib/src/tasks/task.rs @@ -62,6 +62,22 @@ pub enum TaskEvent { }, } +impl TaskEvent { + pub fn succeeded(result: TaskResult) -> Self { + Self::Succeded { + result, + timestamp: OffsetDateTime::now_utc(), + } + } + + pub fn failed(error: ResponseError) -> Self { + Self::Failed { + error, + timestamp: OffsetDateTime::now_utc(), + } + } +} + /// A task represents an operation that Meilisearch must do. /// It's stored on disk and executed from the lowest to highest Task id. /// Everytime a new task is created it has a higher Task id than the previous one. @@ -140,7 +156,9 @@ pub enum TaskContent { IndexUpdate { primary_key: Option, }, - Dump, + Dump { + uid: String, + }, } #[cfg(test)] diff --git a/meilisearch-lib/src/tasks/task_store/mod.rs b/meilisearch-lib/src/tasks/task_store/mod.rs index f580c8e26..610a5bdeb 100644 --- a/meilisearch-lib/src/tasks/task_store/mod.rs +++ b/meilisearch-lib/src/tasks/task_store/mod.rs @@ -204,13 +204,14 @@ impl TaskStore { } pub async fn dump( - &self, + env: Arc, dir_path: impl AsRef, update_file_store: UpdateFileStore, ) -> Result<()> { + let store = Self::new(env)?; let update_dir = dir_path.as_ref().join("updates"); let updates_file = update_dir.join("data.jsonl"); - let tasks = self.list_tasks(None, None, None).await?; + let tasks = store.list_tasks(None, None, None).await?; let dir_path = dir_path.as_ref().to_path_buf(); tokio::task::spawn_blocking(move || -> Result<()> { @@ -287,6 +288,14 @@ pub mod test { Ok(Self::Real(TaskStore::new(env)?)) } + pub async fn dump( + env: Arc, + path: impl AsRef, + update_file_store: UpdateFileStore, + ) -> Result<()> { + TaskStore::dump(env, path, update_file_store).await + } + pub fn mock(mocker: Mocker) -> Self { Self::Mock(Arc::new(mocker)) } @@ -329,17 +338,6 @@ pub mod test { } } - pub async fn dump( - &self, - path: impl AsRef, - update_file_store: UpdateFileStore, - ) -> Result<()> { - match self { - Self::Real(s) => s.dump(path, update_file_store).await, - Self::Mock(m) => unsafe { m.get("dump").call((path, update_file_store)) }, - } - } - pub async fn register( &self, index_uid: Option,