diff --git a/meilisearch-http/src/routes/dump.rs b/meilisearch-http/src/routes/dump.rs index b58552f27..7d32fdda5 100644 --- a/meilisearch-http/src/routes/dump.rs +++ b/meilisearch-http/src/routes/dump.rs @@ -45,8 +45,8 @@ async fn get_dump_status( meilisearch: GuardedData, MeiliSearch>, path: web::Path, ) -> Result { - let res = meilisearch.dump_info(path.dump_uid.clone()).await?; + todo!(); - debug!("returns: {:?}", res); - Ok(HttpResponse::Ok().json(res)) + // debug!("returns: {:?}", res); + // Ok(HttpResponse::Ok().json(res)) } diff --git a/meilisearch-lib/src/dump/actor.rs b/meilisearch-lib/src/dump/actor.rs index 48fc077ca..b7f615e44 100644 --- a/meilisearch-lib/src/dump/actor.rs +++ b/meilisearch-lib/src/dump/actor.rs @@ -9,7 +9,7 @@ use time::macros::format_description; use time::OffsetDateTime; use tokio::sync::{mpsc, oneshot, RwLock}; -use super::error::{DumpActorError, Result}; +use super::error::{DumpError, Result}; use super::{DumpInfo, DumpJob, DumpMsg, DumpStatus}; use crate::tasks::Scheduler; use crate::update_file_store::UpdateFileStore; @@ -106,7 +106,7 @@ impl DumpActor { let _lock = match self.lock.try_lock() { Some(lock) => lock, None => { - ret.send(Err(DumpActorError::DumpAlreadyRunning)) + ret.send(Err(DumpError::DumpAlreadyRunning)) .expect("Dump actor is dead"); return; } @@ -123,7 +123,6 @@ impl DumpActor { dump_path: self.dump_path.clone(), db_path: self.analytics_path.clone(), update_file_store: self.update_file_store.clone(), - scheduler: self.scheduler.clone(), uid: uid.clone(), update_db_size: self.update_db_size, index_db_size: self.index_db_size, @@ -155,7 +154,7 @@ impl DumpActor { async fn handle_dump_info(&self, uid: String) -> Result { match self.dump_infos.read().await.get(&uid) { Some(info) => Ok(info.clone()), - _ => Err(DumpActorError::DumpDoesNotExist(uid)), + _ => Err(DumpError::DumpDoesNotExist(uid)), } } } diff --git a/meilisearch-lib/src/dump/error.rs b/meilisearch-lib/src/dump/error.rs index f72b6d1dd..7931a8d75 100644 --- a/meilisearch-lib/src/dump/error.rs +++ b/meilisearch-lib/src/dump/error.rs @@ -3,10 +3,10 @@ use meilisearch_error::{internal_error, Code, ErrorCode}; use crate::{index_resolver::error::IndexResolverError, tasks::error::TaskError}; -pub type Result = std::result::Result; +pub type Result = std::result::Result; #[derive(thiserror::Error, Debug)] -pub enum DumpActorError { +pub enum DumpError { #[error("A dump is already processing. You must wait until the current process is finished before requesting another dump.")] DumpAlreadyRunning, #[error("Dump `{0}` not found.")] @@ -18,7 +18,7 @@ pub enum DumpActorError { } internal_error!( - DumpActorError: milli::heed::Error, + DumpError: milli::heed::Error, std::io::Error, tokio::task::JoinError, tokio::sync::oneshot::error::RecvError, @@ -29,13 +29,13 @@ internal_error!( TaskError ); -impl ErrorCode for DumpActorError { +impl ErrorCode for DumpError { fn error_code(&self) -> Code { match self { - DumpActorError::DumpAlreadyRunning => Code::DumpAlreadyInProgress, - DumpActorError::DumpDoesNotExist(_) => Code::DumpNotFound, - DumpActorError::Internal(_) => Code::Internal, - DumpActorError::IndexResolver(e) => e.error_code(), + DumpError::DumpAlreadyRunning => Code::DumpAlreadyInProgress, + DumpError::DumpDoesNotExist(_) => Code::DumpNotFound, + DumpError::Internal(_) => Code::Internal, + DumpError::IndexResolver(e) => e.error_code(), } } } diff --git a/meilisearch-lib/src/dump/handle_impl.rs b/meilisearch-lib/src/dump/handle_impl.rs index 16a312e70..9577b3663 100644 --- a/meilisearch-lib/src/dump/handle_impl.rs +++ b/meilisearch-lib/src/dump/handle_impl.rs @@ -1,7 +1,7 @@ use tokio::sync::{mpsc, oneshot}; use super::error::Result; -use super::{DumpActorHandle, DumpInfo, DumpMsg}; +use super::{DumpActorHandle, DumpMsg}; #[derive(Clone)] pub struct DumpActorHandleImpl { diff --git a/meilisearch-lib/src/dump/mod.rs b/meilisearch-lib/src/dump/mod.rs index bc717b35e..59b51a601 100644 --- a/meilisearch-lib/src/dump/mod.rs +++ b/meilisearch-lib/src/dump/mod.rs @@ -1,32 +1,30 @@ use std::fs::File; use std::path::{Path, PathBuf}; -use std::sync::Arc; use anyhow::bail; -use log::info; +use log::{info, trace}; +use meilisearch_auth::AuthController; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; -pub use actor::DumpActor; -pub use handle_impl::*; -pub use message::DumpMsg; use tempfile::TempDir; -use tokio::sync::RwLock; +use tokio::fs::create_dir_all; -use crate::compression::from_tar_gz; +use crate::analytics; +use crate::compression::{from_tar_gz, to_tar_gz}; +use crate::dump::error::DumpError; use crate::options::IndexerOpts; -use crate::tasks::Scheduler; use crate::update_file_store::UpdateFileStore; use error::Result; use self::loaders::{v2, v3, v4}; -mod actor; +// mod actor; mod compat; pub mod error; -mod handle_impl; +// mod handle_impl; mod loaders; -mod message; +// mod message; const META_FILE_NAME: &str = "metadata.json"; @@ -51,18 +49,6 @@ impl Metadata { } } -#[async_trait::async_trait] -#[cfg_attr(test, mockall::automock)] -pub trait DumpActorHandle { - /// Start the creation of a dump - /// Implementation: [handle_impl::DumpActorHandleImpl::create_dump] - async fn create_dump(&self) -> Result; - - /// Return the status of an already created dump - /// Implementation: [handle_impl::DumpActorHandleImpl::dump_info] - async fn dump_info(&self, uid: String) -> Result; -} - #[derive(Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct MetadataV1 { @@ -159,49 +145,6 @@ pub enum DumpStatus { Failed, } -#[derive(Debug, Serialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct DumpInfo { - pub uid: String, - pub status: DumpStatus, - #[serde(skip_serializing_if = "Option::is_none")] - pub error: Option, - #[serde(with = "time::serde::rfc3339")] - started_at: OffsetDateTime, - #[serde( - skip_serializing_if = "Option::is_none", - with = "time::serde::rfc3339::option" - )] - finished_at: Option, -} - -impl DumpInfo { - pub fn new(uid: String, status: DumpStatus) -> Self { - Self { - uid, - status, - error: None, - started_at: OffsetDateTime::now_utc(), - finished_at: None, - } - } - - pub fn with_error(&mut self, error: String) { - self.status = DumpStatus::Failed; - self.finished_at = Some(OffsetDateTime::now_utc()); - self.error = Some(error); - } - - pub fn done(&mut self) { - self.finished_at = Some(OffsetDateTime::now_utc()); - self.status = DumpStatus::Done; - } - - pub fn dump_already_in_progress(&self) -> bool { - self.status == DumpStatus::InProgress - } -} - pub fn load_dump( dst_path: impl AsRef, src_path: impl AsRef, @@ -313,76 +256,59 @@ fn persist_dump(dst_path: impl AsRef, tmp_dst: TempDir) -> anyhow::Result< } pub struct DumpJob { - dump_path: PathBuf, - db_path: PathBuf, - update_file_store: UpdateFileStore, - scheduler: Arc>, - uid: String, - update_db_size: usize, - index_db_size: usize, + pub dump_path: PathBuf, + pub db_path: PathBuf, + pub update_file_store: UpdateFileStore, + pub uid: String, + pub update_db_size: usize, + pub index_db_size: usize, } impl DumpJob { - async fn run(self) -> Result<()> { - // trace!("Performing dump."); - // - // create_dir_all(&self.dump_path).await?; - // - // let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??; - // let temp_dump_path = temp_dump_dir.path().to_owned(); - // - // let meta = MetadataVersion::new_v4(self.index_db_size, self.update_db_size); - // let meta_path = temp_dump_path.join(META_FILE_NAME); - // let mut meta_file = File::create(&meta_path)?; - // serde_json::to_writer(&mut meta_file, &meta)?; - // analytics::copy_user_id(&self.db_path, &temp_dump_path); - // - // create_dir_all(&temp_dump_path.join("indexes")).await?; - // - // let (sender, receiver) = oneshot::channel(); - // - // self.scheduler - // .write() - // .await - // .schedule_job(Job::Dump { - // ret: sender, - // path: temp_dump_path.clone(), - // }) - // .await; - // - // // wait until the job has started performing before finishing the dump process - // let sender = receiver.await??; - // - // AuthController::dump(&self.db_path, &temp_dump_path)?; - // - // //TODO(marin): this is not right, the scheduler should dump itself, not do it here... + pub async fn run(self) -> Result<()> { + trace!("Performing dump."); + + create_dir_all(&self.dump_path).await?; + + let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??; + let temp_dump_path = temp_dump_dir.path().to_owned(); + + let meta = MetadataVersion::new_v4(self.index_db_size, self.update_db_size); + let meta_path = temp_dump_path.join(META_FILE_NAME); + let mut meta_file = File::create(&meta_path)?; + serde_json::to_writer(&mut meta_file, &meta)?; + analytics::copy_user_id(&self.db_path, &temp_dump_path); + + create_dir_all(&temp_dump_path.join("indexes")).await?; + + AuthController::dump(&self.db_path, &temp_dump_path)?; + // TODO: Dump indexes and updates + + //TODO(marin): this is not right, the scheduler should dump itself, not do it here... // self.scheduler // .read() // .await // .dump(&temp_dump_path, self.update_file_store.clone()) // .await?; - // - // let dump_path = tokio::task::spawn_blocking(move || -> Result { - // // for now we simply copy the updates/updates_files - // // FIXME: We may copy more files than necessary, if new files are added while we are - // // performing the dump. We need a way to filter them out. - // - // let temp_dump_file = tempfile::NamedTempFile::new_in(&self.dump_path)?; - // to_tar_gz(temp_dump_path, temp_dump_file.path()) - // .map_err(|e| DumpActorError::Internal(e.into()))?; - // - // let dump_path = self.dump_path.join(self.uid).with_extension("dump"); - // temp_dump_file.persist(&dump_path)?; - // - // Ok(dump_path) - // }) - // .await??; - // - // // notify the update loop that we are finished performing the dump. - // let _ = sender.send(()); - // - // info!("Created dump in {:?}.", dump_path); - // + + let dump_path = tokio::task::spawn_blocking(move || -> Result { + // for now we simply copy the updates/updates_files + // FIXME: We may copy more files than necessary, if new files are added while we are + // performing the dump. We need a way to filter them out. + + let temp_dump_file = tempfile::NamedTempFile::new_in(&self.dump_path)?; + to_tar_gz(temp_dump_path, temp_dump_file.path()) + .map_err(|e| DumpError::Internal(e.into()))?; + + let dump_path = self.dump_path.join(self.uid).with_extension("dump"); + temp_dump_file.persist(&dump_path)?; + + Ok(dump_path) + }) + .await??; + + info!("Created dump in {:?}.", dump_path); + Ok(()) } } diff --git a/meilisearch-lib/src/index_controller/error.rs b/meilisearch-lib/src/index_controller/error.rs index 11ef03d73..529887b6a 100644 --- a/meilisearch-lib/src/index_controller/error.rs +++ b/meilisearch-lib/src/index_controller/error.rs @@ -6,7 +6,7 @@ use tokio::task::JoinError; use super::DocumentAdditionFormat; use crate::document_formats::DocumentFormatError; -use crate::dump::error::DumpActorError; +use crate::dump::error::DumpError; use crate::index::error::IndexError; use crate::tasks::error::TaskError; use crate::update_file_store::UpdateFileStoreError; @@ -28,7 +28,7 @@ pub enum IndexControllerError { #[error("{0}")] TaskError(#[from] TaskError), #[error("{0}")] - DumpError(#[from] DumpActorError), + DumpError(#[from] DumpError), #[error("{0}")] DocumentFormatError(#[from] DocumentFormatError), #[error("A {0} payload is missing.")] diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index b73402d56..14f262a51 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -13,13 +13,13 @@ use futures::StreamExt; use milli::update::IndexDocumentsMethod; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::RwLock; use tokio::task::spawn_blocking; use tokio::time::sleep; use uuid::Uuid; use crate::document_formats::{read_csv, read_json, read_ndjson}; -use crate::dump::{self, load_dump, DumpActor, DumpActorHandle, DumpActorHandleImpl, DumpInfo}; +use crate::dump::load_dump; use crate::index::{ Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked, }; @@ -75,7 +75,6 @@ pub struct IndexController { scheduler: Arc>, task_store: TaskStore, dump_path: PathBuf, - dump_handle: dump::DumpActorHandleImpl, pub update_file_store: UpdateFileStore, } @@ -85,7 +84,6 @@ impl Clone for IndexController { Self { index_resolver: self.index_resolver.clone(), scheduler: self.scheduler.clone(), - dump_handle: self.dump_handle.clone(), update_file_store: self.update_file_store.clone(), task_store: self.task_store.clone(), dump_path: self.dump_path.clone(), @@ -228,23 +226,6 @@ impl IndexControllerBuilder { let dump_path = self .dump_dst .ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?; - let dump_handle = { - let analytics_path = &db_path; - let (sender, receiver) = mpsc::channel(10); - let actor = DumpActor::new( - receiver, - update_file_store.clone(), - scheduler.clone(), - dump_path.clone(), - analytics_path, - index_size, - task_store_size, - ); - - tokio::task::spawn_local(actor.run()); - - DumpActorHandleImpl { sender } - }; if self.schedule_snapshot { let snapshot_period = self @@ -269,7 +250,6 @@ impl IndexControllerBuilder { Ok(IndexController { index_resolver, scheduler, - dump_handle, dump_path, update_file_store, task_store, @@ -633,14 +613,6 @@ where indexes, }) } - - pub async fn create_dump(&self) -> Result { - Ok(self.dump_handle.create_dump().await?) - } - - pub async fn dump_info(&self, uid: String) -> Result { - Ok(self.dump_handle.dump_info(uid).await?) - } } pub async fn get_arc_ownership_blocking(mut item: Arc) -> T { diff --git a/meilisearch-lib/src/tasks/batch_handlers/dump_handler.rs b/meilisearch-lib/src/tasks/batch_handlers/dump_handler.rs new file mode 100644 index 000000000..c0ef70ba8 --- /dev/null +++ b/meilisearch-lib/src/tasks/batch_handlers/dump_handler.rs @@ -0,0 +1,92 @@ +use std::path::{Path, PathBuf}; + +use log::{error, trace}; +use time::{macros::format_description, OffsetDateTime}; + +use crate::dump::DumpJob; +use crate::tasks::batch::{Batch, BatchContent}; +use crate::tasks::BatchHandler; +use crate::update_file_store::UpdateFileStore; + +pub struct DumpHandler { + update_file_store: UpdateFileStore, + dump_path: PathBuf, + db_path: PathBuf, + update_db_size: usize, + index_db_size: usize, +} + +/// Generate uid from creation date +fn generate_uid() -> String { + OffsetDateTime::now_utc() + .format(format_description!( + "[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]" + )) + .unwrap() +} + +impl DumpHandler { + pub fn new( + update_file_store: UpdateFileStore, + dump_path: impl AsRef, + db_path: impl AsRef, + index_db_size: usize, + update_db_size: usize, + ) -> Self { + Self { + update_file_store, + dump_path: dump_path.as_ref().into(), + db_path: db_path.as_ref().into(), + index_db_size, + update_db_size, + } + } + + async fn create_dump(&self) { + let uid = generate_uid(); + + let task = DumpJob { + dump_path: self.dump_path.clone(), + db_path: self.db_path.clone(), + update_file_store: self.update_file_store.clone(), + uid: uid.clone(), + update_db_size: self.update_db_size, + index_db_size: self.index_db_size, + }; + + let task_result = tokio::task::spawn_local(task.run()).await; + + match task_result { + Ok(Ok(())) => { + trace!("Dump succeed"); + } + Ok(Err(e)) => { + error!("Dump failed: {}", e); + } + Err(_) => { + error!("Dump panicked. Dump status set to failed"); + } + }; + } +} + +#[async_trait::async_trait] +impl BatchHandler for DumpHandler { + fn accept(&self, batch: &Batch) -> bool { + matches!(batch.content, BatchContent::Dump { .. }) + } + + async fn process_batch(&self, batch: Batch) -> Batch { + match batch.content { + BatchContent::Dump { .. } => { + self.create_dump().await; + batch + } + _ => unreachable!("invalid batch content for dump"), + } + } + + async fn finish(&self, _: &Batch) { + () + } +} diff --git a/meilisearch-lib/src/tasks/batch_handlers/mod.rs b/meilisearch-lib/src/tasks/batch_handlers/mod.rs index 0e94c76f1..f72c1b760 100644 --- a/meilisearch-lib/src/tasks/batch_handlers/mod.rs +++ b/meilisearch-lib/src/tasks/batch_handlers/mod.rs @@ -1,2 +1,3 @@ +pub mod dump_handler; pub mod empty_handler; mod index_resolver_handler;