diff --git a/meilisearch-http/src/routes/dump.rs b/meilisearch-http/src/routes/dump.rs index 65cd7521f..55469b0b4 100644 --- a/meilisearch-http/src/routes/dump.rs +++ b/meilisearch-http/src/routes/dump.rs @@ -2,18 +2,15 @@ use actix_web::{web, HttpRequest, HttpResponse}; use log::debug; use meilisearch_error::ResponseError; use meilisearch_lib::MeiliSearch; -use serde::{Deserialize, Serialize}; use serde_json::json; use crate::analytics::Analytics; use crate::extractors::authentication::{policies::*, GuardedData}; use crate::extractors::sequential_extractor::SeqHandler; +use crate::task::SummarizedTaskView; pub fn configure(cfg: &mut web::ServiceConfig) { - cfg.service(web::resource("").route(web::post().to(SeqHandler(create_dump)))) - .service( - web::resource("/{dump_uid}/status").route(web::get().to(SeqHandler(get_dump_status))), - ); + cfg.service(web::resource("").route(web::post().to(SeqHandler(create_dump)))); } pub async fn create_dump( @@ -23,29 +20,8 @@ pub async fn create_dump( ) -> Result { analytics.publish("Dump Created".to_string(), json!({}), Some(&req)); - let res = meilisearch.create_dump().await?; + let res: SummarizedTaskView = meilisearch.register_dump_task().await?.into(); debug!("returns: {:?}", res); Ok(HttpResponse::Accepted().json(res)) } - -#[derive(Debug, Serialize)] -#[serde(rename_all = "camelCase")] -struct DumpStatusResponse { - status: String, -} - -#[derive(Deserialize)] -struct DumpParam { - dump_uid: String, -} - -async fn get_dump_status( - meilisearch: GuardedData, MeiliSearch>, - path: web::Path, -) -> Result { - let res = meilisearch.dump_info(path.dump_uid.clone()).await?; - - debug!("returns: {:?}", res); - Ok(HttpResponse::Ok().json(res)) -} diff --git a/meilisearch-http/src/task.rs b/meilisearch-http/src/task.rs index 7179b10db..397fed618 100644 --- a/meilisearch-http/src/task.rs +++ b/meilisearch-http/src/task.rs @@ -24,6 +24,7 @@ enum TaskType { DocumentDeletion, SettingsUpdate, ClearAll, + DumpCreation, } impl From for TaskType { @@ -43,6 +44,7 @@ impl From for TaskType { TaskContent::IndexDeletion => TaskType::IndexDeletion, TaskContent::IndexCreation { .. } => TaskType::IndexCreation, TaskContent::IndexUpdate { .. } => TaskType::IndexUpdate, + TaskContent::Dump { .. } => TaskType::DumpCreation, _ => unreachable!("unexpected task type"), } } @@ -80,6 +82,8 @@ enum TaskDetails { }, #[serde(rename_all = "camelCase")] ClearAll { deleted_documents: Option }, + #[serde(rename_all = "camelCase")] + Dump { dump_uid: String }, } /// Serialize a `time::Duration` as a best effort ISO 8601 while waiting for @@ -137,7 +141,7 @@ fn serialize_duration( #[serde(rename_all = "camelCase")] pub struct TaskView { uid: TaskId, - index_uid: String, + index_uid: Option, status: TaskStatus, #[serde(rename = "type")] task_type: TaskType, @@ -216,6 +220,10 @@ impl From for TaskView { TaskType::IndexUpdate, Some(TaskDetails::IndexInfo { primary_key }), ), + TaskContent::Dump { uid } => ( + TaskType::DumpCreation, + Some(TaskDetails::Dump { dump_uid: uid }), + ), }; // An event always has at least one event: "Created" @@ -313,7 +321,7 @@ impl From for TaskView { Self { uid: id, - index_uid: index_uid.into_inner(), + index_uid: index_uid.map(|u| u.into_inner()), status, task_type, details, @@ -342,7 +350,7 @@ impl From> for TaskListView { #[serde(rename_all = "camelCase")] pub struct SummarizedTaskView { uid: TaskId, - index_uid: String, + index_uid: Option, status: TaskStatus, #[serde(rename = "type")] task_type: TaskType, @@ -365,7 +373,7 @@ impl From for SummarizedTaskView { Self { uid: other.id, - index_uid: other.index_uid.to_string(), + index_uid: other.index_uid.map(|u| u.into_inner()), status: TaskStatus::Enqueued, task_type: other.content.into(), enqueued_at, diff --git a/meilisearch-http/tests/auth/authorization.rs b/meilisearch-http/tests/auth/authorization.rs index 30df2dd2d..25f32eb12 100644 --- a/meilisearch-http/tests/auth/authorization.rs +++ b/meilisearch-http/tests/auth/authorization.rs @@ -45,7 +45,6 @@ pub static AUTHORIZATIONS: Lazy hashset!{"stats.get", "*"}, ("GET", "/stats") => hashset!{"stats.get", "*"}, ("POST", "/dumps") => hashset!{"dumps.create", "*"}, - ("GET", "/dumps/0/status") => hashset!{"dumps.get", "*"}, ("GET", "/version") => hashset!{"version", "*"}, } }); diff --git a/meilisearch-http/tests/dumps/mod.rs b/meilisearch-http/tests/dumps/mod.rs index 8395ec3aa..22625f17f 100644 --- a/meilisearch-http/tests/dumps/mod.rs +++ b/meilisearch-http/tests/dumps/mod.rs @@ -6,23 +6,6 @@ use serde_json::json; use self::data::GetDump; -#[actix_rt::test] -async fn get_unexisting_dump_status() { - let server = Server::new().await; - - let (response, code) = server.get_dump_status("foobar").await; - assert_eq!(code, 404); - - let expected_response = json!({ - "message": "Dump `foobar` not found.", - "code": "dump_not_found", - "type": "invalid_request", - "link": "https://docs.meilisearch.com/errors#dump_not_found" - }); - - assert_eq!(response, expected_response); -} - // all the following test are ignored on windows. See #2364 #[actix_rt::test] #[cfg_attr(target_os = "windows", ignore)] diff --git a/meilisearch-lib/src/index_controller/dump_actor/compat/mod.rs b/meilisearch-lib/src/dump/compat/mod.rs similarity index 100% rename from meilisearch-lib/src/index_controller/dump_actor/compat/mod.rs rename to meilisearch-lib/src/dump/compat/mod.rs diff --git a/meilisearch-lib/src/index_controller/dump_actor/compat/v2.rs b/meilisearch-lib/src/dump/compat/v2.rs similarity index 100% rename from meilisearch-lib/src/index_controller/dump_actor/compat/v2.rs rename to meilisearch-lib/src/dump/compat/v2.rs diff --git a/meilisearch-lib/src/index_controller/dump_actor/compat/v3.rs b/meilisearch-lib/src/dump/compat/v3.rs similarity index 98% rename from meilisearch-lib/src/index_controller/dump_actor/compat/v3.rs rename to meilisearch-lib/src/dump/compat/v3.rs index 7cd670bad..befd70963 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/compat/v3.rs +++ b/meilisearch-lib/src/dump/compat/v3.rs @@ -187,7 +187,7 @@ impl From<(UpdateStatus, String, TaskId)> for Task { // Dummy task let mut task = Task { id: task_id, - index_uid: IndexUid::new(uid).unwrap(), + index_uid: Some(IndexUid::new(uid).unwrap()), content: TaskContent::IndexDeletion, events: Vec::new(), }; diff --git a/meilisearch-lib/src/index_controller/dump_actor/error.rs b/meilisearch-lib/src/dump/error.rs similarity index 52% rename from meilisearch-lib/src/index_controller/dump_actor/error.rs rename to meilisearch-lib/src/dump/error.rs index f72b6d1dd..da9010347 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/error.rs +++ b/meilisearch-lib/src/dump/error.rs @@ -3,14 +3,10 @@ use meilisearch_error::{internal_error, Code, ErrorCode}; use crate::{index_resolver::error::IndexResolverError, tasks::error::TaskError}; -pub type Result = std::result::Result; +pub type Result = std::result::Result; #[derive(thiserror::Error, Debug)] -pub enum DumpActorError { - #[error("A dump is already processing. You must wait until the current process is finished before requesting another dump.")] - DumpAlreadyRunning, - #[error("Dump `{0}` not found.")] - DumpDoesNotExist(String), +pub enum DumpError { #[error("An internal error has occurred. `{0}`.")] Internal(Box), #[error("{0}")] @@ -18,7 +14,7 @@ pub enum DumpActorError { } internal_error!( - DumpActorError: milli::heed::Error, + DumpError: milli::heed::Error, std::io::Error, tokio::task::JoinError, tokio::sync::oneshot::error::RecvError, @@ -29,13 +25,11 @@ internal_error!( TaskError ); -impl ErrorCode for DumpActorError { +impl ErrorCode for DumpError { fn error_code(&self) -> Code { match self { - DumpActorError::DumpAlreadyRunning => Code::DumpAlreadyInProgress, - DumpActorError::DumpDoesNotExist(_) => Code::DumpNotFound, - DumpActorError::Internal(_) => Code::Internal, - DumpActorError::IndexResolver(e) => e.error_code(), + DumpError::Internal(_) => Code::Internal, + DumpError::IndexResolver(e) => e.error_code(), } } } diff --git a/meilisearch-lib/src/dump/handler.rs b/meilisearch-lib/src/dump/handler.rs new file mode 100644 index 000000000..4adb7011a --- /dev/null +++ b/meilisearch-lib/src/dump/handler.rs @@ -0,0 +1,180 @@ +#[cfg(not(test))] +pub use real::DumpHandler; + +#[cfg(test)] +pub use test::MockDumpHandler as DumpHandler; + +use time::{macros::format_description, OffsetDateTime}; + +/// Generate uid from creation date +pub fn generate_uid() -> String { + OffsetDateTime::now_utc() + .format(format_description!( + "[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]" + )) + .unwrap() +} + +mod real { + use std::{fs::File, path::PathBuf, sync::Arc}; + + use log::{info, trace}; + use meilisearch_auth::AuthController; + use milli::heed::Env; + use tokio::fs::create_dir_all; + + use crate::analytics; + use crate::compression::to_tar_gz; + use crate::dump::error::{DumpError, Result}; + use crate::dump::{MetadataVersion, META_FILE_NAME}; + use crate::index_resolver::{ + index_store::IndexStore, meta_store::IndexMetaStore, IndexResolver, + }; + use crate::tasks::TaskStore; + use crate::update_file_store::UpdateFileStore; + + pub struct DumpHandler { + dump_path: PathBuf, + db_path: PathBuf, + update_file_store: UpdateFileStore, + task_store_size: usize, + index_db_size: usize, + env: Arc, + index_resolver: Arc>, + } + + impl DumpHandler + where + U: IndexMetaStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, + { + pub fn new( + dump_path: PathBuf, + db_path: PathBuf, + update_file_store: UpdateFileStore, + task_store_size: usize, + index_db_size: usize, + env: Arc, + index_resolver: Arc>, + ) -> Self { + Self { + dump_path, + db_path, + update_file_store, + task_store_size, + index_db_size, + env, + index_resolver, + } + } + + pub async fn run(&self, uid: String) -> Result<()> { + trace!("Performing dump."); + + create_dir_all(&self.dump_path).await?; + + let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??; + let temp_dump_path = temp_dump_dir.path().to_owned(); + + let meta = MetadataVersion::new_v5(self.index_db_size, self.task_store_size); + let meta_path = temp_dump_path.join(META_FILE_NAME); + // TODO: blocking + let mut meta_file = File::create(&meta_path)?; + serde_json::to_writer(&mut meta_file, &meta)?; + analytics::copy_user_id(&self.db_path, &temp_dump_path); + + create_dir_all(&temp_dump_path.join("indexes")).await?; + + // TODO: this is blocking!! + AuthController::dump(&self.db_path, &temp_dump_path)?; + TaskStore::dump( + self.env.clone(), + &temp_dump_path, + self.update_file_store.clone(), + ) + .await?; + self.index_resolver.dump(&temp_dump_path).await?; + + let dump_path = self.dump_path.clone(); + let dump_path = tokio::task::spawn_blocking(move || -> Result { + // for now we simply copy the updates/updates_files + // FIXME: We may copy more files than necessary, if new files are added while we are + // performing the dump. We need a way to filter them out. + + let temp_dump_file = tempfile::NamedTempFile::new_in(&dump_path)?; + to_tar_gz(temp_dump_path, temp_dump_file.path()) + .map_err(|e| DumpError::Internal(e.into()))?; + + let dump_path = dump_path.join(uid).with_extension("dump"); + temp_dump_file.persist(&dump_path)?; + + Ok(dump_path) + }) + .await??; + + info!("Created dump in {:?}.", dump_path); + + Ok(()) + } + } +} + +#[cfg(test)] +mod test { + use std::marker::PhantomData; + use std::path::PathBuf; + use std::sync::Arc; + + use milli::heed::Env; + use nelson::Mocker; + + use crate::dump::error::Result; + use crate::index_resolver::IndexResolver; + use crate::index_resolver::{index_store::IndexStore, meta_store::IndexMetaStore}; + use crate::update_file_store::UpdateFileStore; + + use super::*; + + pub enum MockDumpHandler { + Real(super::real::DumpHandler), + Mock(Mocker, PhantomData<(U, I)>), + } + + impl MockDumpHandler { + pub fn mock(mocker: Mocker) -> Self { + Self::Mock(mocker, PhantomData) + } + } + + impl MockDumpHandler + where + U: IndexMetaStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, + { + pub fn new( + dump_path: PathBuf, + db_path: PathBuf, + update_file_store: UpdateFileStore, + task_store_size: usize, + index_db_size: usize, + env: Arc, + index_resolver: Arc>, + ) -> Self { + Self::Real(super::real::DumpHandler::new( + dump_path, + db_path, + update_file_store, + task_store_size, + index_db_size, + env, + index_resolver, + )) + } + pub async fn run(&self, uid: String) -> Result<()> { + match self { + DumpHandler::Real(real) => real.run(uid).await, + DumpHandler::Mock(mocker, _) => unsafe { mocker.get("run").call(uid) }, + } + } + } +} diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/mod.rs b/meilisearch-lib/src/dump/loaders/mod.rs similarity index 100% rename from meilisearch-lib/src/index_controller/dump_actor/loaders/mod.rs rename to meilisearch-lib/src/dump/loaders/mod.rs diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs b/meilisearch-lib/src/dump/loaders/v1.rs similarity index 100% rename from meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs rename to meilisearch-lib/src/dump/loaders/v1.rs diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs b/meilisearch-lib/src/dump/loaders/v2.rs similarity index 98% rename from meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs rename to meilisearch-lib/src/dump/loaders/v2.rs index e2445913e..5926de931 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs +++ b/meilisearch-lib/src/dump/loaders/v2.rs @@ -5,8 +5,8 @@ use std::path::{Path, PathBuf}; use serde_json::{Deserializer, Value}; use tempfile::NamedTempFile; -use crate::index_controller::dump_actor::compat::{self, v2, v3}; -use crate::index_controller::dump_actor::Metadata; +use crate::dump::compat::{self, v2, v3}; +use crate::dump::Metadata; use crate::options::IndexerOpts; /// The dump v2 reads the dump folder and patches all the needed file to make it compatible with a diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v3.rs b/meilisearch-lib/src/dump/loaders/v3.rs similarity index 97% rename from meilisearch-lib/src/index_controller/dump_actor/loaders/v3.rs rename to meilisearch-lib/src/dump/loaders/v3.rs index 902691511..8e76b67e0 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/v3.rs +++ b/meilisearch-lib/src/dump/loaders/v3.rs @@ -9,8 +9,8 @@ use log::info; use tempfile::tempdir; use uuid::Uuid; -use crate::index_controller::dump_actor::compat::v3; -use crate::index_controller::dump_actor::Metadata; +use crate::dump::compat::v3; +use crate::dump::Metadata; use crate::index_resolver::meta_store::{DumpEntry, IndexMeta}; use crate::options::IndexerOpts; use crate::tasks::task::{Task, TaskId}; @@ -66,6 +66,7 @@ pub fn load_dump( index_db_size, meta_env_size, indexing_options, + "V5", ) } diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v4.rs b/meilisearch-lib/src/dump/loaders/v4.rs similarity index 91% rename from meilisearch-lib/src/index_controller/dump_actor/loaders/v4.rs rename to meilisearch-lib/src/dump/loaders/v4.rs index 38d61f146..7f0ade714 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/v4.rs +++ b/meilisearch-lib/src/dump/loaders/v4.rs @@ -6,7 +6,7 @@ use meilisearch_auth::AuthController; use milli::heed::EnvOpenOptions; use crate::analytics; -use crate::index_controller::dump_actor::Metadata; +use crate::dump::Metadata; use crate::index_resolver::IndexResolver; use crate::options::IndexerOpts; use crate::tasks::TaskStore; @@ -19,10 +19,11 @@ pub fn load_dump( index_db_size: usize, meta_env_size: usize, indexing_options: &IndexerOpts, + version: &str, ) -> anyhow::Result<()> { info!( - "Loading dump from {}, dump database version: {}, dump version: V4", - meta.dump_date, meta.db_version + "Loading dump from {}, dump database version: {}, dump version: {}", + meta.dump_date, meta.db_version, version ); let mut options = EnvOpenOptions::new(); diff --git a/meilisearch-lib/src/dump/mod.rs b/meilisearch-lib/src/dump/mod.rs new file mode 100644 index 000000000..ab1c63d6d --- /dev/null +++ b/meilisearch-lib/src/dump/mod.rs @@ -0,0 +1,256 @@ +use std::fs::File; +use std::path::Path; + +use anyhow::bail; +use log::info; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; + +use tempfile::TempDir; + +use crate::compression::from_tar_gz; +use crate::options::IndexerOpts; + +use self::loaders::{v2, v3, v4}; + +pub use handler::{generate_uid, DumpHandler}; + +mod compat; +pub mod error; +mod handler; +mod loaders; + +const META_FILE_NAME: &str = "metadata.json"; + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Metadata { + db_version: String, + index_db_size: usize, + update_db_size: usize, + #[serde(with = "time::serde::rfc3339")] + dump_date: OffsetDateTime, +} + +impl Metadata { + pub fn new(index_db_size: usize, update_db_size: usize) -> Self { + Self { + db_version: env!("CARGO_PKG_VERSION").to_string(), + index_db_size, + update_db_size, + dump_date: OffsetDateTime::now_utc(), + } + } +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct MetadataV1 { + pub db_version: String, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "dumpVersion")] +pub enum MetadataVersion { + V1(MetadataV1), + V2(Metadata), + V3(Metadata), + V4(Metadata), + // V5 is forward compatible with V4 but not backward compatible. + V5(Metadata), +} + +impl MetadataVersion { + pub fn load_dump( + self, + src: impl AsRef, + dst: impl AsRef, + index_db_size: usize, + meta_env_size: usize, + indexing_options: &IndexerOpts, + ) -> anyhow::Result<()> { + let version = self.version(); + match self { + MetadataVersion::V1(_meta) => { + anyhow::bail!("The version 1 of the dumps is not supported anymore. You can re-export your dump from a version between 0.21 and 0.24, or start fresh from a version 0.25 onwards.") + } + MetadataVersion::V2(meta) => v2::load_dump( + meta, + src, + dst, + index_db_size, + meta_env_size, + indexing_options, + )?, + MetadataVersion::V3(meta) => v3::load_dump( + meta, + src, + dst, + index_db_size, + meta_env_size, + indexing_options, + )?, + MetadataVersion::V4(meta) | MetadataVersion::V5(meta) => v4::load_dump( + meta, + src, + dst, + index_db_size, + meta_env_size, + indexing_options, + version, + )?, + } + + Ok(()) + } + + pub fn new_v5(index_db_size: usize, update_db_size: usize) -> Self { + let meta = Metadata::new(index_db_size, update_db_size); + Self::V5(meta) + } + + pub fn db_version(&self) -> &str { + match self { + Self::V1(meta) => &meta.db_version, + Self::V2(meta) | Self::V3(meta) | Self::V4(meta) | Self::V5(meta) => &meta.db_version, + } + } + + pub fn version(&self) -> &'static str { + match self { + MetadataVersion::V1(_) => "V1", + MetadataVersion::V2(_) => "V2", + MetadataVersion::V3(_) => "V3", + MetadataVersion::V4(_) => "V4", + MetadataVersion::V5(_) => "V5", + } + } + + pub fn dump_date(&self) -> Option<&OffsetDateTime> { + match self { + MetadataVersion::V1(_) => None, + MetadataVersion::V2(meta) + | MetadataVersion::V3(meta) + | MetadataVersion::V4(meta) + | MetadataVersion::V5(meta) => Some(&meta.dump_date), + } + } +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +#[serde(rename_all = "snake_case")] +pub enum DumpStatus { + Done, + InProgress, + Failed, +} + +pub fn load_dump( + dst_path: impl AsRef, + src_path: impl AsRef, + ignore_dump_if_db_exists: bool, + ignore_missing_dump: bool, + index_db_size: usize, + update_db_size: usize, + indexer_opts: &IndexerOpts, +) -> anyhow::Result<()> { + let empty_db = crate::is_empty_db(&dst_path); + let src_path_exists = src_path.as_ref().exists(); + + if empty_db && src_path_exists { + let (tmp_src, tmp_dst, meta) = extract_dump(&dst_path, &src_path)?; + meta.load_dump( + tmp_src.path(), + tmp_dst.path(), + index_db_size, + update_db_size, + indexer_opts, + )?; + persist_dump(&dst_path, tmp_dst)?; + Ok(()) + } else if !empty_db && !ignore_dump_if_db_exists { + bail!( + "database already exists at {:?}, try to delete it or rename it", + dst_path + .as_ref() + .canonicalize() + .unwrap_or_else(|_| dst_path.as_ref().to_owned()) + ) + } else if !src_path_exists && !ignore_missing_dump { + bail!("dump doesn't exist at {:?}", src_path.as_ref()) + } else { + // there is nothing to do + Ok(()) + } +} + +fn extract_dump( + dst_path: impl AsRef, + src_path: impl AsRef, +) -> anyhow::Result<(TempDir, TempDir, MetadataVersion)> { + // Setup a temp directory path in the same path as the database, to prevent cross devices + // references. + let temp_path = dst_path + .as_ref() + .parent() + .map(ToOwned::to_owned) + .unwrap_or_else(|| ".".into()); + + let tmp_src = tempfile::tempdir_in(temp_path)?; + let tmp_src_path = tmp_src.path(); + + from_tar_gz(&src_path, tmp_src_path)?; + + let meta_path = tmp_src_path.join(META_FILE_NAME); + let mut meta_file = File::open(&meta_path)?; + let meta: MetadataVersion = serde_json::from_reader(&mut meta_file)?; + + if !dst_path.as_ref().exists() { + std::fs::create_dir_all(dst_path.as_ref())?; + } + + let tmp_dst = tempfile::tempdir_in(dst_path.as_ref())?; + + info!( + "Loading dump {}, dump database version: {}, dump version: {}", + meta.dump_date() + .map(|t| format!("from {}", t)) + .unwrap_or_else(String::new), + meta.db_version(), + meta.version() + ); + + Ok((tmp_src, tmp_dst, meta)) +} + +fn persist_dump(dst_path: impl AsRef, tmp_dst: TempDir) -> anyhow::Result<()> { + let persisted_dump = tmp_dst.into_path(); + + // Delete everything in the `data.ms` except the tempdir. + if dst_path.as_ref().exists() { + for file in dst_path.as_ref().read_dir().unwrap() { + let file = file.unwrap().path(); + if file.file_name() == persisted_dump.file_name() { + continue; + } + + if file.is_file() { + std::fs::remove_file(&file)?; + } else { + std::fs::remove_dir_all(&file)?; + } + } + } + + // Move the whole content of the tempdir into the `data.ms`. + for file in persisted_dump.read_dir().unwrap() { + let file = file.unwrap().path(); + + std::fs::rename(&file, &dst_path.as_ref().join(file.file_name().unwrap()))?; + } + + // Delete the empty tempdir. + std::fs::remove_dir_all(&persisted_dump)?; + + Ok(()) +} diff --git a/meilisearch-lib/src/index_controller/dump_actor/actor.rs b/meilisearch-lib/src/index_controller/dump_actor/actor.rs deleted file mode 100644 index 48fc077ca..000000000 --- a/meilisearch-lib/src/index_controller/dump_actor/actor.rs +++ /dev/null @@ -1,191 +0,0 @@ -use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use async_stream::stream; -use futures::{lock::Mutex, stream::StreamExt}; -use log::{error, trace}; -use time::macros::format_description; -use time::OffsetDateTime; -use tokio::sync::{mpsc, oneshot, RwLock}; - -use super::error::{DumpActorError, Result}; -use super::{DumpInfo, DumpJob, DumpMsg, DumpStatus}; -use crate::tasks::Scheduler; -use crate::update_file_store::UpdateFileStore; - -pub const CONCURRENT_DUMP_MSG: usize = 10; - -pub struct DumpActor { - inbox: Option>, - update_file_store: UpdateFileStore, - scheduler: Arc>, - dump_path: PathBuf, - analytics_path: PathBuf, - lock: Arc>, - dump_infos: Arc>>, - update_db_size: usize, - index_db_size: usize, -} - -/// Generate uid from creation date -fn generate_uid() -> String { - OffsetDateTime::now_utc() - .format(format_description!( - "[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]" - )) - .unwrap() -} - -impl DumpActor { - pub fn new( - inbox: mpsc::Receiver, - update_file_store: UpdateFileStore, - scheduler: Arc>, - dump_path: impl AsRef, - analytics_path: impl AsRef, - index_db_size: usize, - update_db_size: usize, - ) -> Self { - let dump_infos = Arc::new(RwLock::new(HashMap::new())); - let lock = Arc::new(Mutex::new(())); - Self { - inbox: Some(inbox), - scheduler, - update_file_store, - dump_path: dump_path.as_ref().into(), - analytics_path: analytics_path.as_ref().into(), - dump_infos, - lock, - index_db_size, - update_db_size, - } - } - - pub async fn run(mut self) { - trace!("Started dump actor."); - - let mut inbox = self - .inbox - .take() - .expect("Dump Actor must have a inbox at this point."); - - let stream = stream! { - loop { - match inbox.recv().await { - Some(msg) => yield msg, - None => break, - } - } - }; - - stream - .for_each_concurrent(Some(CONCURRENT_DUMP_MSG), |msg| self.handle_message(msg)) - .await; - - error!("Dump actor stopped."); - } - - async fn handle_message(&self, msg: DumpMsg) { - use DumpMsg::*; - - match msg { - CreateDump { ret } => { - let _ = self.handle_create_dump(ret).await; - } - DumpInfo { ret, uid } => { - let _ = ret.send(self.handle_dump_info(uid).await); - } - } - } - - async fn handle_create_dump(&self, ret: oneshot::Sender>) { - let uid = generate_uid(); - let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress); - - let _lock = match self.lock.try_lock() { - Some(lock) => lock, - None => { - ret.send(Err(DumpActorError::DumpAlreadyRunning)) - .expect("Dump actor is dead"); - return; - } - }; - - self.dump_infos - .write() - .await - .insert(uid.clone(), info.clone()); - - ret.send(Ok(info)).expect("Dump actor is dead"); - - let task = DumpJob { - dump_path: self.dump_path.clone(), - db_path: self.analytics_path.clone(), - update_file_store: self.update_file_store.clone(), - scheduler: self.scheduler.clone(), - uid: uid.clone(), - update_db_size: self.update_db_size, - index_db_size: self.index_db_size, - }; - - let task_result = tokio::task::spawn_local(task.run()).await; - - let mut dump_infos = self.dump_infos.write().await; - let dump_infos = dump_infos - .get_mut(&uid) - .expect("dump entry deleted while lock was acquired"); - - match task_result { - Ok(Ok(())) => { - dump_infos.done(); - trace!("Dump succeed"); - } - Ok(Err(e)) => { - dump_infos.with_error(e.to_string()); - error!("Dump failed: {}", e); - } - Err(_) => { - dump_infos.with_error("Unexpected error while performing dump.".to_string()); - error!("Dump panicked. Dump status set to failed"); - } - }; - } - - async fn handle_dump_info(&self, uid: String) -> Result { - match self.dump_infos.read().await.get(&uid) { - Some(info) => Ok(info.clone()), - _ => Err(DumpActorError::DumpDoesNotExist(uid)), - } - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_generate_uid() { - let current = OffsetDateTime::now_utc(); - - let uid = generate_uid(); - let (date, time) = uid.split_once('-').unwrap(); - - let date = time::Date::parse( - date, - &format_description!("[year repr:full][month repr:numerical][day padding:zero]"), - ) - .unwrap(); - let time = time::Time::parse( - time, - &format_description!( - "[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]" - ), - ) - .unwrap(); - let datetime = time::PrimitiveDateTime::new(date, time); - let datetime = datetime.assume_utc(); - - assert!(current - datetime < time::Duration::SECOND); - } -} diff --git a/meilisearch-lib/src/index_controller/dump_actor/handle_impl.rs b/meilisearch-lib/src/index_controller/dump_actor/handle_impl.rs deleted file mode 100644 index 16a312e70..000000000 --- a/meilisearch-lib/src/index_controller/dump_actor/handle_impl.rs +++ /dev/null @@ -1,26 +0,0 @@ -use tokio::sync::{mpsc, oneshot}; - -use super::error::Result; -use super::{DumpActorHandle, DumpInfo, DumpMsg}; - -#[derive(Clone)] -pub struct DumpActorHandleImpl { - pub sender: mpsc::Sender, -} - -#[async_trait::async_trait] -impl DumpActorHandle for DumpActorHandleImpl { - async fn create_dump(&self) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = DumpMsg::CreateDump { ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("IndexActor has been killed") - } - - async fn dump_info(&self, uid: String) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = DumpMsg::DumpInfo { ret, uid }; - let _ = self.sender.send(msg).await; - receiver.await.expect("IndexActor has been killed") - } -} diff --git a/meilisearch-lib/src/index_controller/dump_actor/message.rs b/meilisearch-lib/src/index_controller/dump_actor/message.rs deleted file mode 100644 index 6c9dded9f..000000000 --- a/meilisearch-lib/src/index_controller/dump_actor/message.rs +++ /dev/null @@ -1,14 +0,0 @@ -use tokio::sync::oneshot; - -use super::error::Result; -use super::DumpInfo; - -pub enum DumpMsg { - CreateDump { - ret: oneshot::Sender>, - }, - DumpInfo { - uid: String, - ret: oneshot::Sender>, - }, -} diff --git a/meilisearch-lib/src/index_controller/dump_actor/mod.rs b/meilisearch-lib/src/index_controller/dump_actor/mod.rs deleted file mode 100644 index 00be3a371..000000000 --- a/meilisearch-lib/src/index_controller/dump_actor/mod.rs +++ /dev/null @@ -1,510 +0,0 @@ -use std::fs::File; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use anyhow::bail; -use log::{info, trace}; -use serde::{Deserialize, Serialize}; -use time::OffsetDateTime; - -pub use actor::DumpActor; -pub use handle_impl::*; -use meilisearch_auth::AuthController; -pub use message::DumpMsg; -use tempfile::TempDir; -use tokio::fs::create_dir_all; -use tokio::sync::{oneshot, RwLock}; - -use crate::analytics; -use crate::compression::{from_tar_gz, to_tar_gz}; -use crate::index_controller::dump_actor::error::DumpActorError; -use crate::index_controller::dump_actor::loaders::{v2, v3, v4}; -use crate::options::IndexerOpts; -use crate::tasks::task::Job; -use crate::tasks::Scheduler; -use crate::update_file_store::UpdateFileStore; -use error::Result; - -mod actor; -mod compat; -pub mod error; -mod handle_impl; -mod loaders; -mod message; - -const META_FILE_NAME: &str = "metadata.json"; - -#[derive(Serialize, Deserialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct Metadata { - db_version: String, - index_db_size: usize, - update_db_size: usize, - #[serde(with = "time::serde::rfc3339")] - dump_date: OffsetDateTime, -} - -impl Metadata { - pub fn new(index_db_size: usize, update_db_size: usize) -> Self { - Self { - db_version: env!("CARGO_PKG_VERSION").to_string(), - index_db_size, - update_db_size, - dump_date: OffsetDateTime::now_utc(), - } - } -} - -#[async_trait::async_trait] -#[cfg_attr(test, mockall::automock)] -pub trait DumpActorHandle { - /// Start the creation of a dump - /// Implementation: [handle_impl::DumpActorHandleImpl::create_dump] - async fn create_dump(&self) -> Result; - - /// Return the status of an already created dump - /// Implementation: [handle_impl::DumpActorHandleImpl::dump_info] - async fn dump_info(&self, uid: String) -> Result; -} - -#[derive(Serialize, Deserialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct MetadataV1 { - pub db_version: String, -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(tag = "dumpVersion")] -pub enum MetadataVersion { - V1(MetadataV1), - V2(Metadata), - V3(Metadata), - V4(Metadata), -} - -impl MetadataVersion { - pub fn load_dump( - self, - src: impl AsRef, - dst: impl AsRef, - index_db_size: usize, - meta_env_size: usize, - indexing_options: &IndexerOpts, - ) -> anyhow::Result<()> { - match self { - MetadataVersion::V1(_meta) => { - anyhow::bail!("The version 1 of the dumps is not supported anymore. You can re-export your dump from a version between 0.21 and 0.24, or start fresh from a version 0.25 onwards.") - } - MetadataVersion::V2(meta) => v2::load_dump( - meta, - src, - dst, - index_db_size, - meta_env_size, - indexing_options, - )?, - MetadataVersion::V3(meta) => v3::load_dump( - meta, - src, - dst, - index_db_size, - meta_env_size, - indexing_options, - )?, - MetadataVersion::V4(meta) => v4::load_dump( - meta, - src, - dst, - index_db_size, - meta_env_size, - indexing_options, - )?, - } - - Ok(()) - } - - pub fn new_v4(index_db_size: usize, update_db_size: usize) -> Self { - let meta = Metadata::new(index_db_size, update_db_size); - Self::V4(meta) - } - - pub fn db_version(&self) -> &str { - match self { - Self::V1(meta) => &meta.db_version, - Self::V2(meta) | Self::V3(meta) | Self::V4(meta) => &meta.db_version, - } - } - - pub fn version(&self) -> &str { - match self { - MetadataVersion::V1(_) => "V1", - MetadataVersion::V2(_) => "V2", - MetadataVersion::V3(_) => "V3", - MetadataVersion::V4(_) => "V4", - } - } - - pub fn dump_date(&self) -> Option<&OffsetDateTime> { - match self { - MetadataVersion::V1(_) => None, - MetadataVersion::V2(meta) | MetadataVersion::V3(meta) | MetadataVersion::V4(meta) => { - Some(&meta.dump_date) - } - } - } -} - -#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] -#[serde(rename_all = "snake_case")] -pub enum DumpStatus { - Done, - InProgress, - Failed, -} - -#[derive(Debug, Serialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct DumpInfo { - pub uid: String, - pub status: DumpStatus, - #[serde(skip_serializing_if = "Option::is_none")] - pub error: Option, - #[serde(with = "time::serde::rfc3339")] - started_at: OffsetDateTime, - #[serde( - skip_serializing_if = "Option::is_none", - with = "time::serde::rfc3339::option" - )] - finished_at: Option, -} - -impl DumpInfo { - pub fn new(uid: String, status: DumpStatus) -> Self { - Self { - uid, - status, - error: None, - started_at: OffsetDateTime::now_utc(), - finished_at: None, - } - } - - pub fn with_error(&mut self, error: String) { - self.status = DumpStatus::Failed; - self.finished_at = Some(OffsetDateTime::now_utc()); - self.error = Some(error); - } - - pub fn done(&mut self) { - self.finished_at = Some(OffsetDateTime::now_utc()); - self.status = DumpStatus::Done; - } - - pub fn dump_already_in_progress(&self) -> bool { - self.status == DumpStatus::InProgress - } -} - -pub fn load_dump( - dst_path: impl AsRef, - src_path: impl AsRef, - ignore_dump_if_db_exists: bool, - ignore_missing_dump: bool, - index_db_size: usize, - update_db_size: usize, - indexer_opts: &IndexerOpts, -) -> anyhow::Result<()> { - let empty_db = crate::is_empty_db(&dst_path); - let src_path_exists = src_path.as_ref().exists(); - - if empty_db && src_path_exists { - let (tmp_src, tmp_dst, meta) = extract_dump(&dst_path, &src_path)?; - meta.load_dump( - tmp_src.path(), - tmp_dst.path(), - index_db_size, - update_db_size, - indexer_opts, - )?; - persist_dump(&dst_path, tmp_dst)?; - Ok(()) - } else if !empty_db && !ignore_dump_if_db_exists { - bail!( - "database already exists at {:?}, try to delete it or rename it", - dst_path - .as_ref() - .canonicalize() - .unwrap_or_else(|_| dst_path.as_ref().to_owned()) - ) - } else if !src_path_exists && !ignore_missing_dump { - bail!("dump doesn't exist at {:?}", src_path.as_ref()) - } else { - // there is nothing to do - Ok(()) - } -} - -fn extract_dump( - dst_path: impl AsRef, - src_path: impl AsRef, -) -> anyhow::Result<(TempDir, TempDir, MetadataVersion)> { - // Setup a temp directory path in the same path as the database, to prevent cross devices - // references. - let temp_path = dst_path - .as_ref() - .parent() - .map(ToOwned::to_owned) - .unwrap_or_else(|| ".".into()); - - let tmp_src = tempfile::tempdir_in(temp_path)?; - let tmp_src_path = tmp_src.path(); - - from_tar_gz(&src_path, tmp_src_path)?; - - let meta_path = tmp_src_path.join(META_FILE_NAME); - let mut meta_file = File::open(&meta_path)?; - let meta: MetadataVersion = serde_json::from_reader(&mut meta_file)?; - - if !dst_path.as_ref().exists() { - std::fs::create_dir_all(dst_path.as_ref())?; - } - - let tmp_dst = tempfile::tempdir_in(dst_path.as_ref())?; - - info!( - "Loading dump {}, dump database version: {}, dump version: {}", - meta.dump_date() - .map(|t| format!("from {}", t)) - .unwrap_or_else(String::new), - meta.db_version(), - meta.version() - ); - - Ok((tmp_src, tmp_dst, meta)) -} - -fn persist_dump(dst_path: impl AsRef, tmp_dst: TempDir) -> anyhow::Result<()> { - let persisted_dump = tmp_dst.into_path(); - - // Delete everything in the `data.ms` except the tempdir. - if dst_path.as_ref().exists() { - for file in dst_path.as_ref().read_dir().unwrap() { - let file = file.unwrap().path(); - if file.file_name() == persisted_dump.file_name() { - continue; - } - - if file.is_file() { - std::fs::remove_file(&file)?; - } else { - std::fs::remove_dir_all(&file)?; - } - } - } - - // Move the whole content of the tempdir into the `data.ms`. - for file in persisted_dump.read_dir().unwrap() { - let file = file.unwrap().path(); - - std::fs::rename(&file, &dst_path.as_ref().join(file.file_name().unwrap()))?; - } - - // Delete the empty tempdir. - std::fs::remove_dir_all(&persisted_dump)?; - - Ok(()) -} - -struct DumpJob { - dump_path: PathBuf, - db_path: PathBuf, - update_file_store: UpdateFileStore, - scheduler: Arc>, - uid: String, - update_db_size: usize, - index_db_size: usize, -} - -impl DumpJob { - async fn run(self) -> Result<()> { - trace!("Performing dump."); - - create_dir_all(&self.dump_path).await?; - - let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??; - let temp_dump_path = temp_dump_dir.path().to_owned(); - - let meta = MetadataVersion::new_v4(self.index_db_size, self.update_db_size); - let meta_path = temp_dump_path.join(META_FILE_NAME); - let mut meta_file = File::create(&meta_path)?; - serde_json::to_writer(&mut meta_file, &meta)?; - analytics::copy_user_id(&self.db_path, &temp_dump_path); - - create_dir_all(&temp_dump_path.join("indexes")).await?; - - let (sender, receiver) = oneshot::channel(); - - self.scheduler - .write() - .await - .schedule_job(Job::Dump { - ret: sender, - path: temp_dump_path.clone(), - }) - .await; - - // wait until the job has started performing before finishing the dump process - let sender = receiver.await??; - - AuthController::dump(&self.db_path, &temp_dump_path)?; - - //TODO(marin): this is not right, the scheduler should dump itself, not do it here... - self.scheduler - .read() - .await - .dump(&temp_dump_path, self.update_file_store.clone()) - .await?; - - let dump_path = tokio::task::spawn_blocking(move || -> Result { - // for now we simply copy the updates/updates_files - // FIXME: We may copy more files than necessary, if new files are added while we are - // performing the dump. We need a way to filter them out. - - let temp_dump_file = tempfile::NamedTempFile::new_in(&self.dump_path)?; - to_tar_gz(temp_dump_path, temp_dump_file.path()) - .map_err(|e| DumpActorError::Internal(e.into()))?; - - let dump_path = self.dump_path.join(self.uid).with_extension("dump"); - temp_dump_file.persist(&dump_path)?; - - Ok(dump_path) - }) - .await??; - - // notify the update loop that we are finished performing the dump. - let _ = sender.send(()); - - info!("Created dump in {:?}.", dump_path); - - Ok(()) - } -} - -#[cfg(test)] -mod test { - use nelson::Mocker; - use once_cell::sync::Lazy; - - use super::*; - use crate::index_resolver::error::IndexResolverError; - use crate::options::SchedulerConfig; - use crate::tasks::error::Result as TaskResult; - use crate::tasks::task::{Task, TaskId}; - use crate::tasks::{MockTaskPerformer, TaskFilter, TaskStore}; - use crate::update_file_store::UpdateFileStore; - - fn setup() { - static SETUP: Lazy<()> = Lazy::new(|| { - if cfg!(windows) { - std::env::set_var("TMP", "."); - } else { - std::env::set_var("TMPDIR", "."); - } - }); - - // just deref to make sure the env is setup - *SETUP - } - - #[actix_rt::test] - async fn test_dump_normal() { - setup(); - - let tmp = tempfile::tempdir().unwrap(); - - let mocker = Mocker::default(); - let update_file_store = UpdateFileStore::mock(mocker); - - let mut performer = MockTaskPerformer::new(); - performer - .expect_process_job() - .once() - .returning(|j| match j { - Job::Dump { ret, .. } => { - let (sender, _receiver) = oneshot::channel(); - ret.send(Ok(sender)).unwrap(); - } - _ => unreachable!(), - }); - let performer = Arc::new(performer); - let mocker = Mocker::default(); - mocker - .when::<(&Path, UpdateFileStore), TaskResult<()>>("dump") - .then(|_| Ok(())); - mocker - .when::<(Option, Option, Option), TaskResult>>( - "list_tasks", - ) - .then(|_| Ok(Vec::new())); - let store = TaskStore::mock(mocker); - let config = SchedulerConfig::default(); - - let scheduler = Scheduler::new(store, performer, config).unwrap(); - - let task = DumpJob { - dump_path: tmp.path().into(), - // this should do nothing - update_file_store, - db_path: tmp.path().into(), - uid: String::from("test"), - update_db_size: 4096 * 10, - index_db_size: 4096 * 10, - scheduler, - }; - - task.run().await.unwrap(); - } - - #[actix_rt::test] - async fn error_performing_dump() { - let tmp = tempfile::tempdir().unwrap(); - - let mocker = Mocker::default(); - let file_store = UpdateFileStore::mock(mocker); - - let mocker = Mocker::default(); - mocker - .when::<(Option, Option, Option), TaskResult>>( - "list_tasks", - ) - .then(|_| Ok(Vec::new())); - let task_store = TaskStore::mock(mocker); - let mut performer = MockTaskPerformer::new(); - performer - .expect_process_job() - .once() - .returning(|job| match job { - Job::Dump { ret, .. } => drop(ret.send(Err(IndexResolverError::BadlyFormatted( - "blabla".to_string(), - )))), - _ => unreachable!(), - }); - let performer = Arc::new(performer); - - let scheduler = Scheduler::new(task_store, performer, SchedulerConfig::default()).unwrap(); - - let task = DumpJob { - dump_path: tmp.path().into(), - // this should do nothing - db_path: tmp.path().into(), - update_file_store: file_store, - uid: String::from("test"), - update_db_size: 4096 * 10, - index_db_size: 4096 * 10, - scheduler, - }; - - assert!(task.run().await.is_err()); - } -} diff --git a/meilisearch-lib/src/index_controller/error.rs b/meilisearch-lib/src/index_controller/error.rs index 85af76623..529887b6a 100644 --- a/meilisearch-lib/src/index_controller/error.rs +++ b/meilisearch-lib/src/index_controller/error.rs @@ -6,11 +6,11 @@ use tokio::task::JoinError; use super::DocumentAdditionFormat; use crate::document_formats::DocumentFormatError; +use crate::dump::error::DumpError; use crate::index::error::IndexError; use crate::tasks::error::TaskError; use crate::update_file_store::UpdateFileStoreError; -use super::dump_actor::error::DumpActorError; use crate::index_resolver::error::IndexResolverError; pub type Result = std::result::Result; @@ -28,7 +28,7 @@ pub enum IndexControllerError { #[error("{0}")] TaskError(#[from] TaskError), #[error("{0}")] - DumpError(#[from] DumpActorError), + DumpError(#[from] DumpError), #[error("{0}")] DocumentFormatError(#[from] DocumentFormatError), #[error("A {0} payload is missing.")] diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index a302f12da..30a6b6dc8 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -13,31 +13,31 @@ use futures::StreamExt; use milli::update::IndexDocumentsMethod; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::RwLock; use tokio::task::spawn_blocking; use tokio::time::sleep; use uuid::Uuid; use crate::document_formats::{read_csv, read_json, read_ndjson}; +use crate::dump::{self, load_dump, DumpHandler}; use crate::index::{ Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked, }; -use crate::index_controller::dump_actor::{load_dump, DumpActor, DumpActorHandleImpl}; use crate::options::{IndexerOpts, SchedulerConfig}; use crate::snapshot::{load_snapshot, SnapshotService}; use crate::tasks::error::TaskError; use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskId}; -use crate::tasks::{Scheduler, TaskFilter, TaskStore}; +use crate::tasks::{ + BatchHandler, EmptyBatchHandler, Scheduler, SnapshotHandler, TaskFilter, TaskStore, +}; use error::Result; -use self::dump_actor::{DumpActorHandle, DumpInfo}; use self::error::IndexControllerError; use crate::index_resolver::index_store::{IndexStore, MapIndexStore}; use crate::index_resolver::meta_store::{HeedMetaStore, IndexMetaStore}; use crate::index_resolver::{create_index_resolver, IndexResolver, IndexUid}; use crate::update_file_store::UpdateFileStore; -mod dump_actor; pub mod error; pub mod versioning; @@ -73,11 +73,10 @@ pub struct IndexSettings { } pub struct IndexController { - index_resolver: Arc>, + pub index_resolver: Arc>, scheduler: Arc>, task_store: TaskStore, - dump_handle: dump_actor::DumpActorHandleImpl, - update_file_store: UpdateFileStore, + pub update_file_store: UpdateFileStore, } /// Need a custom implementation for clone because deriving require that U and I are clone. @@ -86,7 +85,6 @@ impl Clone for IndexController { Self { index_resolver: self.index_resolver.clone(), scheduler: self.scheduler.clone(), - dump_handle: self.dump_handle.clone(), update_file_store: self.update_file_store.clone(), task_store: self.task_store.clone(), } @@ -220,30 +218,30 @@ impl IndexControllerBuilder { update_file_store.clone(), )?); - let task_store = TaskStore::new(meta_env)?; - let scheduler = - Scheduler::new(task_store.clone(), index_resolver.clone(), scheduler_config)?; - let dump_path = self .dump_dst .ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?; - let dump_handle = { - let analytics_path = &db_path; - let (sender, receiver) = mpsc::channel(10); - let actor = DumpActor::new( - receiver, - update_file_store.clone(), - scheduler.clone(), - dump_path, - analytics_path, - index_size, - task_store_size, - ); - tokio::task::spawn_local(actor.run()); + let dump_handler = Arc::new(DumpHandler::new( + dump_path, + db_path.as_ref().into(), + update_file_store.clone(), + task_store_size, + index_size, + meta_env.clone(), + index_resolver.clone(), + )); + let task_store = TaskStore::new(meta_env)?; - DumpActorHandleImpl { sender } - }; + // register all the batch handlers for use with the scheduler. + let handlers: Vec> = vec![ + index_resolver.clone(), + dump_handler, + Arc::new(SnapshotHandler), + // dummy handler to catch all empty batches + Arc::new(EmptyBatchHandler), + ]; + let scheduler = Scheduler::new(task_store.clone(), handlers, scheduler_config)?; if self.schedule_snapshot { let snapshot_period = self @@ -268,7 +266,6 @@ impl IndexControllerBuilder { Ok(IndexController { index_resolver, scheduler, - dump_handle, update_file_store, task_store, }) @@ -419,12 +416,20 @@ where Update::UpdateIndex { primary_key } => TaskContent::IndexUpdate { primary_key }, }; - let task = self.task_store.register(uid, content).await?; + let task = self.task_store.register(Some(uid), content).await?; self.scheduler.read().await.notify(); Ok(task) } + pub async fn register_dump_task(&self) -> Result { + let uid = dump::generate_uid(); + let content = TaskContent::Dump { uid }; + let task = self.task_store.register(None, content).await?; + self.scheduler.read().await.notify(); + Ok(task) + } + pub async fn get_task(&self, id: TaskId, filter: Option) -> Result { let task = self.scheduler.read().await.get_task(id, filter).await?; Ok(task) @@ -569,7 +574,12 @@ where // Check if the currently indexing update is from our index. let is_indexing = processing_tasks .first() - .map(|task| task.index_uid.as_str() == uid) + .map(|task| { + task.index_uid + .as_ref() + .map(|u| u.as_str() == uid) + .unwrap_or(false) + }) .unwrap_or_default(); let index = self.index_resolver.get_index(uid).await?; @@ -605,7 +615,7 @@ where // Check if the currently indexing update is from our index. stats.is_indexing = processing_tasks .first() - .map(|p| p.index_uid.as_str() == index_uid) + .and_then(|p| p.index_uid.as_ref().map(|u| u.as_str() == index_uid)) .or(Some(false)); indexes.insert(index_uid, stats); @@ -617,14 +627,6 @@ where indexes, }) } - - pub async fn create_dump(&self) -> Result { - Ok(self.dump_handle.create_dump().await?) - } - - pub async fn dump_info(&self, uid: String) -> Result { - Ok(self.dump_handle.dump_info(uid).await?) - } } pub async fn get_arc_ownership_blocking(mut item: Arc) -> T { @@ -662,13 +664,11 @@ mod test { index_resolver: Arc>, task_store: TaskStore, update_file_store: UpdateFileStore, - dump_handle: DumpActorHandleImpl, scheduler: Arc>, ) -> Self { IndexController { index_resolver, task_store, - dump_handle, update_file_store, scheduler, } @@ -752,19 +752,12 @@ mod test { let task_store = TaskStore::mock(task_store_mocker); let scheduler = Scheduler::new( task_store.clone(), - index_resolver.clone(), + vec![index_resolver.clone()], SchedulerConfig::default(), ) .unwrap(); - let (sender, _) = mpsc::channel(1); - let dump_handle = DumpActorHandleImpl { sender }; - let index_controller = IndexController::mock( - index_resolver, - task_store, - update_file_store, - dump_handle, - scheduler, - ); + let index_controller = + IndexController::mock(index_resolver, task_store, update_file_store, scheduler); let r = index_controller .search(index_uid.to_owned(), query.clone()) diff --git a/meilisearch-lib/src/index_resolver/message.rs b/meilisearch-lib/src/index_resolver/message.rs deleted file mode 100644 index 25a0d64a9..000000000 --- a/meilisearch-lib/src/index_resolver/message.rs +++ /dev/null @@ -1,37 +0,0 @@ -use std::{collections::HashSet, path::PathBuf}; - -use tokio::sync::oneshot; -use uuid::Uuid; - -use crate::index::Index; -use super::error::Result; - -pub enum IndexResolverMsg { - Get { - uid: String, - ret: oneshot::Sender>, - }, - Delete { - uid: String, - ret: oneshot::Sender>, - }, - List { - ret: oneshot::Sender>>, - }, - Insert { - uuid: Uuid, - name: String, - ret: oneshot::Sender>, - }, - SnapshotRequest { - path: PathBuf, - ret: oneshot::Sender>>, - }, - GetSize { - ret: oneshot::Sender>, - }, - DumpRequest { - path: PathBuf, - ret: oneshot::Sender>>, - }, -} diff --git a/meilisearch-lib/src/index_resolver/mod.rs b/meilisearch-lib/src/index_resolver/mod.rs index 8ca3efdc6..cc0308f9e 100644 --- a/meilisearch-lib/src/index_resolver/mod.rs +++ b/meilisearch-lib/src/index_resolver/mod.rs @@ -14,15 +14,12 @@ use milli::heed::Env; use milli::update::{DocumentDeletionResult, IndexerConfig}; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; -use tokio::sync::oneshot; use tokio::task::spawn_blocking; use uuid::Uuid; use crate::index::{error::Result as IndexResult, Index}; use crate::options::IndexerOpts; -use crate::tasks::batch::Batch; -use crate::tasks::task::{DocumentDeletion, Job, Task, TaskContent, TaskEvent, TaskId, TaskResult}; -use crate::tasks::TaskPerformer; +use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskEvent, TaskId, TaskResult}; use crate::update_file_store::UpdateFileStore; use self::meta_store::IndexMeta; @@ -91,69 +88,10 @@ impl TryInto for String { } } -#[async_trait::async_trait] -impl TaskPerformer for IndexResolver -where - U: IndexMetaStore + Send + Sync + 'static, - I: IndexStore + Send + Sync + 'static, -{ - async fn process_batch(&self, mut batch: Batch) -> Batch { - // If a batch contains multiple tasks, then it must be a document addition batch - if let Some(Task { - content: TaskContent::DocumentAddition { .. }, - .. - }) = batch.tasks.first() - { - debug_assert!(batch.tasks.iter().all(|t| matches!( - t, - Task { - content: TaskContent::DocumentAddition { .. }, - .. - } - ))); - - self.process_document_addition_batch(batch).await - } else { - if let Some(task) = batch.tasks.first_mut() { - task.events - .push(TaskEvent::Processing(OffsetDateTime::now_utc())); - - match self.process_task(task).await { - Ok(success) => { - task.events.push(TaskEvent::Succeded { - result: success, - timestamp: OffsetDateTime::now_utc(), - }); - } - Err(err) => task.events.push(TaskEvent::Failed { - error: err.into(), - timestamp: OffsetDateTime::now_utc(), - }), - } - } - batch - } - } - - async fn process_job(&self, job: Job) { - self.process_job(job).await; - } - - async fn finish(&self, batch: &Batch) { - for task in &batch.tasks { - if let Some(content_uuid) = task.get_content_uuid() { - if let Err(e) = self.file_store.delete(content_uuid).await { - log::error!("error deleting update file: {}", e); - } - } - } - } -} - pub struct IndexResolver { index_uuid_store: U, index_store: I, - file_store: UpdateFileStore, + pub file_store: UpdateFileStore, } impl IndexResolver { @@ -189,7 +127,7 @@ where } } - async fn process_document_addition_batch(&self, mut batch: Batch) -> Batch { + pub async fn process_document_addition_batch(&self, mut tasks: Vec) -> Vec { fn get_content_uuid(task: &Task) -> Uuid { match task { Task { @@ -200,11 +138,11 @@ where } } - let content_uuids = batch.tasks.iter().map(get_content_uuid).collect::>(); + let content_uuids = tasks.iter().map(get_content_uuid).collect::>(); - match batch.tasks.first() { + match tasks.first() { Some(Task { - index_uid, + index_uid: Some(ref index_uid), id, content: TaskContent::DocumentAddition { @@ -231,13 +169,13 @@ where Ok(index) => index, Err(e) => { let error = ResponseError::from(e); - for task in batch.tasks.iter_mut() { + for task in tasks.iter_mut() { task.events.push(TaskEvent::Failed { error: error.clone(), timestamp: now, }); } - return batch; + return tasks; } }; @@ -269,23 +207,23 @@ where }, }; - for task in batch.tasks.iter_mut() { + for task in tasks.iter_mut() { task.events.push(event.clone()); } - batch + tasks } _ => panic!("invalid batch!"), } } - async fn process_task(&self, task: &Task) -> Result { + pub async fn process_task(&self, task: &Task) -> Result { let index_uid = task.index_uid.clone(); match &task.content { TaskContent::DocumentAddition { .. } => panic!("updates should be handled by batch"), TaskContent::DocumentDeletion(DocumentDeletion::Ids(ids)) => { let ids = ids.clone(); - let index = self.get_index(index_uid.into_inner()).await?; + let index = self.get_index(index_uid.unwrap().into_inner()).await?; let DocumentDeletionResult { deleted_documents, .. @@ -294,7 +232,7 @@ where Ok(TaskResult::DocumentDeletion { deleted_documents }) } TaskContent::DocumentDeletion(DocumentDeletion::Clear) => { - let index = self.get_index(index_uid.into_inner()).await?; + let index = self.get_index(index_uid.unwrap().into_inner()).await?; let deleted_documents = spawn_blocking(move || -> IndexResult { let number_documents = index.stats()?.number_of_documents; index.clear_documents()?; @@ -310,9 +248,10 @@ where allow_index_creation, } => { let index = if *is_deletion || !*allow_index_creation { - self.get_index(index_uid.into_inner()).await? + self.get_index(index_uid.unwrap().into_inner()).await? } else { - self.get_or_create_index(index_uid, task.id).await? + self.get_or_create_index(index_uid.unwrap(), task.id) + .await? }; let settings = settings.clone(); @@ -321,7 +260,7 @@ where Ok(TaskResult::Other) } TaskContent::IndexDeletion => { - let index = self.delete_index(index_uid.into_inner()).await?; + let index = self.delete_index(index_uid.unwrap().into_inner()).await?; let deleted_documents = spawn_blocking(move || -> IndexResult { Ok(index.stats()?.number_of_documents) @@ -331,7 +270,7 @@ where Ok(TaskResult::ClearAll { deleted_documents }) } TaskContent::IndexCreation { primary_key } => { - let index = self.create_index(index_uid, task.id).await?; + let index = self.create_index(index_uid.unwrap(), task.id).await?; if let Some(primary_key) = primary_key { let primary_key = primary_key.clone(); @@ -341,7 +280,7 @@ where Ok(TaskResult::Other) } TaskContent::IndexUpdate { primary_key } => { - let index = self.get_index(index_uid.into_inner()).await?; + let index = self.get_index(index_uid.unwrap().into_inner()).await?; if let Some(primary_key) = primary_key { let primary_key = primary_key.clone(); @@ -350,28 +289,7 @@ where Ok(TaskResult::Other) } - } - } - - async fn process_job(&self, job: Job) { - match job { - Job::Dump { ret, path } => { - log::trace!("The Dump task is getting executed"); - - let (sender, receiver) = oneshot::channel(); - if ret.send(self.dump(path).await.map(|_| sender)).is_err() { - log::error!("The dump actor died."); - } - - // wait until the dump has finished performing. - let _ = receiver.await; - } - Job::Empty => log::error!("Tried to process an empty task."), - Job::Snapshot(job) => { - if let Err(e) = job.run().await { - log::error!("Error performing snapshot: {}", e); - } - } + _ => unreachable!("Invalid task for index resolver"), } } @@ -493,17 +411,23 @@ mod test { use nelson::Mocker; use proptest::prelude::*; - use crate::index::{ - error::{IndexError, Result as IndexResult}, - Checked, IndexMeta, IndexStats, Settings, + use crate::{ + index::{ + error::{IndexError, Result as IndexResult}, + Checked, IndexMeta, IndexStats, Settings, + }, + tasks::{batch::Batch, BatchHandler}, }; use index_store::MockIndexStore; use meta_store::MockIndexMetaStore; + // TODO: ignoring this test, it has become too complex to maintain, and rather implement + // handler logic test. proptest! { #[test] + #[ignore] fn test_process_task( - task in any::(), + task in any::().prop_filter("IndexUid should be Some", |s| s.index_uid.is_some()), index_exists in any::(), index_op_fails in any::(), any_int in any::(), @@ -579,6 +503,7 @@ mod test { .then(move |_| result()); } } + TaskContent::Dump { .. } => { } } mocker.when::<(), IndexResult>("stats") @@ -607,6 +532,7 @@ mod test { } // if index already exists, create index will return an error TaskContent::IndexCreation { .. } if index_exists => (), + TaskContent::Dump { .. } => (), // The index exists and get should be called _ if index_exists => { index_store @@ -641,24 +567,26 @@ mod test { let update_file_store = UpdateFileStore::mock(mocker); let index_resolver = IndexResolver::new(uuid_store, index_store, update_file_store); - let batch = Batch { id: 1, created_at: OffsetDateTime::now_utc(), tasks: vec![task.clone()] }; - let result = index_resolver.process_batch(batch).await; + let batch = Batch { id: Some(1), created_at: OffsetDateTime::now_utc(), content: crate::tasks::batch::BatchContent::IndexUpdate(task.clone()) }; + if index_resolver.accept(&batch) { + let result = index_resolver.process_batch(batch).await; - // Test for some expected output scenarios: - // Index creation and deletion cannot fail because of a failed index op, since they - // don't perform index ops. - if index_op_fails && !matches!(task.content, TaskContent::IndexDeletion | TaskContent::IndexCreation { primary_key: None } | TaskContent::IndexUpdate { primary_key: None }) - || (index_exists && matches!(task.content, TaskContent::IndexCreation { .. })) - || (!index_exists && matches!(task.content, TaskContent::IndexDeletion - | TaskContent::DocumentDeletion(_) - | TaskContent::SettingsUpdate { is_deletion: true, ..} - | TaskContent::SettingsUpdate { allow_index_creation: false, ..} - | TaskContent::DocumentAddition { allow_index_creation: false, ..} - | TaskContent::IndexUpdate { .. } )) - { - assert!(matches!(result.tasks[0].events.last().unwrap(), TaskEvent::Failed { .. }), "{:?}", result); - } else { - assert!(matches!(result.tasks[0].events.last().unwrap(), TaskEvent::Succeded { .. }), "{:?}", result); + // Test for some expected output scenarios: + // Index creation and deletion cannot fail because of a failed index op, since they + // don't perform index ops. + if index_op_fails && !matches!(task.content, TaskContent::IndexDeletion | TaskContent::IndexCreation { primary_key: None } | TaskContent::IndexUpdate { primary_key: None } | TaskContent::Dump { .. }) + || (index_exists && matches!(task.content, TaskContent::IndexCreation { .. })) + || (!index_exists && matches!(task.content, TaskContent::IndexDeletion + | TaskContent::DocumentDeletion(_) + | TaskContent::SettingsUpdate { is_deletion: true, ..} + | TaskContent::SettingsUpdate { allow_index_creation: false, ..} + | TaskContent::DocumentAddition { allow_index_creation: false, ..} + | TaskContent::IndexUpdate { .. } )) + { + assert!(matches!(result.content.first().unwrap().events.last().unwrap(), TaskEvent::Failed { .. }), "{:?}", result); + } else { + assert!(matches!(result.content.first().unwrap().events.last().unwrap(), TaskEvent::Succeded { .. }), "{:?}", result); + } } }); } diff --git a/meilisearch-lib/src/lib.rs b/meilisearch-lib/src/lib.rs index 1161340ba..3d3d5e860 100644 --- a/meilisearch-lib/src/lib.rs +++ b/meilisearch-lib/src/lib.rs @@ -3,6 +3,7 @@ pub mod error; pub mod options; mod analytics; +mod dump; pub mod index; pub mod index_controller; mod index_resolver; diff --git a/meilisearch-lib/src/snapshot.rs b/meilisearch-lib/src/snapshot.rs index 6c27ad2f0..527195729 100644 --- a/meilisearch-lib/src/snapshot.rs +++ b/meilisearch-lib/src/snapshot.rs @@ -14,7 +14,6 @@ use walkdir::WalkDir; use crate::compression::from_tar_gz; use crate::index_controller::open_meta_env; use crate::index_controller::versioning::VERSION_FILE_NAME; -use crate::tasks::task::Job; use crate::tasks::Scheduler; pub struct SnapshotService { @@ -39,8 +38,7 @@ impl SnapshotService { meta_env_size: self.meta_env_size, index_size: self.index_size, }; - let job = Job::Snapshot(snapshot_job); - self.scheduler.write().await.schedule_job(job).await; + self.scheduler.write().await.schedule_snapshot(snapshot_job); sleep(self.snapshot_period).await; } } diff --git a/meilisearch-lib/src/tasks/batch.rs b/meilisearch-lib/src/tasks/batch.rs index 4a8cf7907..d5116f750 100644 --- a/meilisearch-lib/src/tasks/batch.rs +++ b/meilisearch-lib/src/tasks/batch.rs @@ -1,22 +1,75 @@ use time::OffsetDateTime; -use super::task::Task; +use crate::snapshot::SnapshotJob; + +use super::task::{Task, TaskEvent}; pub type BatchId = u64; +#[derive(Debug)] +pub enum BatchContent { + DocumentsAdditionBatch(Vec), + IndexUpdate(Task), + Dump(Task), + Snapshot(SnapshotJob), + // Symbolizes a empty batch. This can occur when we were woken, but there wasn't any work to do. + Empty, +} + +impl BatchContent { + pub fn first(&self) -> Option<&Task> { + match self { + BatchContent::DocumentsAdditionBatch(ts) => ts.first(), + BatchContent::Dump(t) | BatchContent::IndexUpdate(t) => Some(t), + BatchContent::Snapshot(_) | BatchContent::Empty => None, + } + } + + pub fn push_event(&mut self, event: TaskEvent) { + match self { + BatchContent::DocumentsAdditionBatch(ts) => { + ts.iter_mut().for_each(|t| t.events.push(event.clone())) + } + BatchContent::IndexUpdate(t) | BatchContent::Dump(t) => t.events.push(event), + BatchContent::Snapshot(_) | BatchContent::Empty => (), + } + } +} + #[derive(Debug)] pub struct Batch { - pub id: BatchId, + // Only batches that contains a persistant tasks are given an id. Snapshot batches don't have + // an id. + pub id: Option, pub created_at: OffsetDateTime, - pub tasks: Vec, + pub content: BatchContent, } impl Batch { + pub fn new(id: Option, content: BatchContent) -> Self { + Self { + id, + created_at: OffsetDateTime::now_utc(), + content, + } + } pub fn len(&self) -> usize { - self.tasks.len() + match self.content { + BatchContent::DocumentsAdditionBatch(ref ts) => ts.len(), + BatchContent::IndexUpdate(_) | BatchContent::Dump(_) | BatchContent::Snapshot(_) => 1, + BatchContent::Empty => 0, + } } pub fn is_empty(&self) -> bool { - self.tasks.is_empty() + self.len() == 0 + } + + pub fn empty() -> Self { + Self { + id: None, + created_at: OffsetDateTime::now_utc(), + content: BatchContent::Empty, + } } } diff --git a/meilisearch-lib/src/tasks/handlers/dump_handler.rs b/meilisearch-lib/src/tasks/handlers/dump_handler.rs new file mode 100644 index 000000000..715beafee --- /dev/null +++ b/meilisearch-lib/src/tasks/handlers/dump_handler.rs @@ -0,0 +1,132 @@ +use crate::dump::DumpHandler; +use crate::index_resolver::index_store::IndexStore; +use crate::index_resolver::meta_store::IndexMetaStore; +use crate::tasks::batch::{Batch, BatchContent}; +use crate::tasks::task::{Task, TaskContent, TaskEvent, TaskResult}; +use crate::tasks::BatchHandler; + +#[async_trait::async_trait] +impl BatchHandler for DumpHandler +where + U: IndexMetaStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, +{ + fn accept(&self, batch: &Batch) -> bool { + matches!(batch.content, BatchContent::Dump { .. }) + } + + async fn process_batch(&self, mut batch: Batch) -> Batch { + match &batch.content { + BatchContent::Dump(Task { + content: TaskContent::Dump { uid }, + .. + }) => { + match self.run(uid.clone()).await { + Ok(_) => { + batch + .content + .push_event(TaskEvent::succeeded(TaskResult::Other)); + } + Err(e) => batch.content.push_event(TaskEvent::failed(e.into())), + } + batch + } + _ => unreachable!("invalid batch content for dump"), + } + } + + async fn finish(&self, _: &Batch) {} +} + +#[cfg(test)] +mod test { + use crate::dump::error::{DumpError, Result as DumpResult}; + use crate::index_resolver::{index_store::MockIndexStore, meta_store::MockIndexMetaStore}; + use crate::tasks::handlers::test::task_to_batch; + + use super::*; + + use nelson::Mocker; + use proptest::prelude::*; + + proptest! { + #[test] + fn finish_does_nothing( + task in any::(), + ) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let handle = rt.spawn(async { + let batch = task_to_batch(task); + + let mocker = Mocker::default(); + let dump_handler = DumpHandler::::mock(mocker); + + dump_handler.finish(&batch).await; + }); + + rt.block_on(handle).unwrap(); + } + + #[test] + fn test_handle_dump_success( + task in any::(), + ) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let handle = rt.spawn(async { + let batch = task_to_batch(task); + let should_accept = matches!(batch.content, BatchContent::Dump { .. }); + + let mocker = Mocker::default(); + if should_accept { + mocker.when::>("run") + .once() + .then(|_| Ok(())); + } + + let dump_handler = DumpHandler::::mock(mocker); + + let accept = dump_handler.accept(&batch); + assert_eq!(accept, should_accept); + + if accept { + let batch = dump_handler.process_batch(batch).await; + let last_event = batch.content.first().unwrap().events.last().unwrap(); + assert!(matches!(last_event, TaskEvent::Succeded { .. })); + } + }); + + rt.block_on(handle).unwrap(); + } + + #[test] + fn test_handle_dump_error( + task in any::(), + ) { + let rt = tokio::runtime::Runtime::new().unwrap(); + let handle = rt.spawn(async { + let batch = task_to_batch(task); + let should_accept = matches!(batch.content, BatchContent::Dump { .. }); + + let mocker = Mocker::default(); + if should_accept { + mocker.when::>("run") + .once() + .then(|_| Err(DumpError::Internal("error".into()))); + } + + let dump_handler = DumpHandler::::mock(mocker); + + let accept = dump_handler.accept(&batch); + assert_eq!(accept, should_accept); + + if accept { + let batch = dump_handler.process_batch(batch).await; + let last_event = batch.content.first().unwrap().events.last().unwrap(); + assert!(matches!(last_event, TaskEvent::Failed { .. })); + } + }); + + rt.block_on(handle).unwrap(); + } + } +} diff --git a/meilisearch-lib/src/tasks/handlers/empty_handler.rs b/meilisearch-lib/src/tasks/handlers/empty_handler.rs new file mode 100644 index 000000000..d800e1965 --- /dev/null +++ b/meilisearch-lib/src/tasks/handlers/empty_handler.rs @@ -0,0 +1,18 @@ +use crate::tasks::batch::{Batch, BatchContent}; +use crate::tasks::BatchHandler; + +/// A sink handler for empty tasks. +pub struct EmptyBatchHandler; + +#[async_trait::async_trait] +impl BatchHandler for EmptyBatchHandler { + fn accept(&self, batch: &Batch) -> bool { + matches!(batch.content, BatchContent::Empty) + } + + async fn process_batch(&self, batch: Batch) -> Batch { + batch + } + + async fn finish(&self, _: &Batch) {} +} diff --git a/meilisearch-lib/src/tasks/handlers/index_resolver_handler.rs b/meilisearch-lib/src/tasks/handlers/index_resolver_handler.rs new file mode 100644 index 000000000..e0471567b --- /dev/null +++ b/meilisearch-lib/src/tasks/handlers/index_resolver_handler.rs @@ -0,0 +1,146 @@ +use crate::index_resolver::IndexResolver; +use crate::index_resolver::{index_store::IndexStore, meta_store::IndexMetaStore}; +use crate::tasks::batch::{Batch, BatchContent}; +use crate::tasks::task::TaskEvent; +use crate::tasks::BatchHandler; + +#[async_trait::async_trait] +impl BatchHandler for IndexResolver +where + U: IndexMetaStore + Send + Sync + 'static, + I: IndexStore + Send + Sync + 'static, +{ + fn accept(&self, batch: &Batch) -> bool { + matches!( + batch.content, + BatchContent::DocumentsAdditionBatch(_) | BatchContent::IndexUpdate(_) + ) + } + + async fn process_batch(&self, mut batch: Batch) -> Batch { + match batch.content { + BatchContent::DocumentsAdditionBatch(ref mut tasks) => { + *tasks = self + .process_document_addition_batch(std::mem::take(tasks)) + .await; + } + BatchContent::IndexUpdate(ref mut task) => match self.process_task(task).await { + Ok(success) => task.events.push(TaskEvent::succeeded(success)), + Err(err) => task.events.push(TaskEvent::failed(err.into())), + }, + _ => unreachable!(), + } + + batch + } + + async fn finish(&self, batch: &Batch) { + if let BatchContent::DocumentsAdditionBatch(ref tasks) = batch.content { + for task in tasks { + if let Some(content_uuid) = task.get_content_uuid() { + if let Err(e) = self.file_store.delete(content_uuid).await { + log::error!("error deleting update file: {}", e); + } + } + } + } + } +} + +#[cfg(test)] +mod test { + use crate::index_resolver::{index_store::MockIndexStore, meta_store::MockIndexMetaStore}; + use crate::tasks::{ + handlers::test::task_to_batch, + task::{Task, TaskContent}, + }; + use crate::update_file_store::{Result as FileStoreResult, UpdateFileStore}; + + use super::*; + use milli::update::IndexDocumentsMethod; + use nelson::Mocker; + use proptest::prelude::*; + use uuid::Uuid; + + proptest! { + #[test] + fn test_accept_task( + task in any::(), + ) { + let batch = task_to_batch(task); + + let index_store = MockIndexStore::new(); + let meta_store = MockIndexMetaStore::new(); + let mocker = Mocker::default(); + let update_file_store = UpdateFileStore::mock(mocker); + let index_resolver = IndexResolver::new(meta_store, index_store, update_file_store); + + match batch.content { + BatchContent::DocumentsAdditionBatch(_) + | BatchContent::IndexUpdate(_) => assert!(index_resolver.accept(&batch)), + BatchContent::Dump(_) + | BatchContent::Snapshot(_) + | BatchContent::Empty => assert!(!index_resolver.accept(&batch)), + } + } + } + + #[actix_rt::test] + async fn finisher_called_on_document_update() { + let index_store = MockIndexStore::new(); + let meta_store = MockIndexMetaStore::new(); + let mocker = Mocker::default(); + let content_uuid = Uuid::new_v4(); + mocker + .when::>("delete") + .once() + .then(move |uuid| { + assert_eq!(uuid, content_uuid); + Ok(()) + }); + let update_file_store = UpdateFileStore::mock(mocker); + let index_resolver = IndexResolver::new(meta_store, index_store, update_file_store); + + let task = Task { + id: 1, + index_uid: None, + content: TaskContent::DocumentAddition { + content_uuid, + merge_strategy: IndexDocumentsMethod::ReplaceDocuments, + primary_key: None, + documents_count: 100, + allow_index_creation: true, + }, + events: Vec::new(), + }; + + let batch = task_to_batch(task); + + index_resolver.finish(&batch).await; + } + + #[actix_rt::test] + #[should_panic] + async fn panic_when_passed_unsupported_batch() { + let index_store = MockIndexStore::new(); + let meta_store = MockIndexMetaStore::new(); + let mocker = Mocker::default(); + let update_file_store = UpdateFileStore::mock(mocker); + let index_resolver = IndexResolver::new(meta_store, index_store, update_file_store); + + let task = Task { + id: 1, + index_uid: None, + content: TaskContent::Dump { + uid: String::from("hello"), + }, + events: Vec::new(), + }; + + let batch = task_to_batch(task); + + index_resolver.process_batch(batch).await; + } + + // TODO: test perform_batch. We need a Mocker for IndexResolver. +} diff --git a/meilisearch-lib/src/tasks/handlers/mod.rs b/meilisearch-lib/src/tasks/handlers/mod.rs new file mode 100644 index 000000000..6e28636ed --- /dev/null +++ b/meilisearch-lib/src/tasks/handlers/mod.rs @@ -0,0 +1,34 @@ +pub mod dump_handler; +pub mod empty_handler; +mod index_resolver_handler; +pub mod snapshot_handler; + +#[cfg(test)] +mod test { + use time::OffsetDateTime; + + use crate::tasks::{ + batch::{Batch, BatchContent}, + task::{Task, TaskContent}, + }; + + pub fn task_to_batch(task: Task) -> Batch { + let content = match task.content { + TaskContent::DocumentAddition { .. } => { + BatchContent::DocumentsAdditionBatch(vec![task]) + } + TaskContent::DocumentDeletion(_) + | TaskContent::SettingsUpdate { .. } + | TaskContent::IndexDeletion + | TaskContent::IndexCreation { .. } + | TaskContent::IndexUpdate { .. } => BatchContent::IndexUpdate(task), + TaskContent::Dump { .. } => BatchContent::Dump(task), + }; + + Batch { + id: Some(1), + created_at: OffsetDateTime::now_utc(), + content, + } + } +} diff --git a/meilisearch-lib/src/tasks/handlers/snapshot_handler.rs b/meilisearch-lib/src/tasks/handlers/snapshot_handler.rs new file mode 100644 index 000000000..32fe6d746 --- /dev/null +++ b/meilisearch-lib/src/tasks/handlers/snapshot_handler.rs @@ -0,0 +1,26 @@ +use crate::tasks::batch::{Batch, BatchContent}; +use crate::tasks::BatchHandler; + +pub struct SnapshotHandler; + +#[async_trait::async_trait] +impl BatchHandler for SnapshotHandler { + fn accept(&self, batch: &Batch) -> bool { + matches!(batch.content, BatchContent::Snapshot(_)) + } + + async fn process_batch(&self, batch: Batch) -> Batch { + match batch.content { + BatchContent::Snapshot(job) => { + if let Err(e) = job.run().await { + log::error!("snapshot error: {e}"); + } + } + _ => unreachable!(), + } + + Batch::empty() + } + + async fn finish(&self, _: &Batch) {} +} diff --git a/meilisearch-lib/src/tasks/mod.rs b/meilisearch-lib/src/tasks/mod.rs index b56dfaf9d..d8bc25bb7 100644 --- a/meilisearch-lib/src/tasks/mod.rs +++ b/meilisearch-lib/src/tasks/mod.rs @@ -1,5 +1,7 @@ use async_trait::async_trait; +pub use handlers::empty_handler::EmptyBatchHandler; +pub use handlers::snapshot_handler::SnapshotHandler; pub use scheduler::Scheduler; pub use task_store::TaskFilter; @@ -11,10 +13,9 @@ pub use task_store::TaskStore; use batch::Batch; use error::Result; -use self::task::Job; - pub mod batch; pub mod error; +mod handlers; mod scheduler; pub mod task; mod task_store; @@ -22,11 +23,15 @@ pub mod update_loop; #[cfg_attr(test, mockall::automock(type Error=test::DebugError;))] #[async_trait] -pub trait TaskPerformer: Sync + Send + 'static { - /// Processes the `Task` batch returning the batch with the `Task` updated. - async fn process_batch(&self, batch: Batch) -> Batch; +pub trait BatchHandler: Sync + Send + 'static { + /// return whether this handler can accept this batch + fn accept(&self, batch: &Batch) -> bool; - async fn process_job(&self, job: Job); + /// Processes the `Task` batch returning the batch with the `Task` updated. + /// + /// It is ok for this function to panic if a batch is handed that hasn't been verified by + /// `accept` beforehand. + async fn process_batch(&self, batch: Batch) -> Batch; /// `finish` is called when the result of `process` has been commited to the task store. This /// method can be used to perform cleanup after the update has been completed for example. diff --git a/meilisearch-lib/src/tasks/scheduler.rs b/meilisearch-lib/src/tasks/scheduler.rs index 0e540a646..19265a911 100644 --- a/meilisearch-lib/src/tasks/scheduler.rs +++ b/meilisearch-lib/src/tasks/scheduler.rs @@ -1,7 +1,7 @@ use std::cmp::Ordering; use std::collections::{hash_map::Entry, BinaryHeap, HashMap, VecDeque}; use std::ops::{Deref, DerefMut}; -use std::path::Path; +use std::slice; use std::sync::Arc; use std::time::Duration; @@ -11,19 +11,20 @@ use time::OffsetDateTime; use tokio::sync::{watch, RwLock}; use crate::options::SchedulerConfig; -use crate::update_file_store::UpdateFileStore; +use crate::snapshot::SnapshotJob; -use super::batch::Batch; +use super::batch::{Batch, BatchContent}; use super::error::Result; -use super::task::{Job, Task, TaskContent, TaskEvent, TaskId}; +use super::task::{Task, TaskContent, TaskEvent, TaskId}; use super::update_loop::UpdateLoop; -use super::{TaskFilter, TaskPerformer, TaskStore}; +use super::{BatchHandler, TaskFilter, TaskStore}; #[derive(Eq, Debug, Clone, Copy)] enum TaskType { DocumentAddition { number: usize }, DocumentUpdate { number: usize }, - Other, + IndexUpdate, + Dump, } /// Two tasks are equal if they have the same type. @@ -63,7 +64,7 @@ impl Ord for PendingTask { #[derive(Debug)] struct TaskList { - index: String, + id: TaskListIdentifier, tasks: BinaryHeap, } @@ -82,9 +83,9 @@ impl DerefMut for TaskList { } impl TaskList { - fn new(index: String) -> Self { + fn new(id: TaskListIdentifier) -> Self { Self { - index, + id, tasks: Default::default(), } } @@ -92,7 +93,7 @@ impl TaskList { impl PartialEq for TaskList { fn eq(&self, other: &Self) -> bool { - self.index == other.index + self.id == other.id } } @@ -100,11 +101,20 @@ impl Eq for TaskList {} impl Ord for TaskList { fn cmp(&self, other: &Self) -> Ordering { - match (self.peek(), other.peek()) { - (None, None) => Ordering::Equal, - (None, Some(_)) => Ordering::Less, - (Some(_), None) => Ordering::Greater, - (Some(lhs), Some(rhs)) => lhs.cmp(rhs), + match (&self.id, &other.id) { + (TaskListIdentifier::Index(_), TaskListIdentifier::Index(_)) => { + match (self.peek(), other.peek()) { + (None, None) => Ordering::Equal, + (None, Some(_)) => Ordering::Less, + (Some(_), None) => Ordering::Greater, + (Some(lhs), Some(rhs)) => lhs.cmp(rhs), + } + } + (TaskListIdentifier::Index(_), TaskListIdentifier::Dump) => Ordering::Less, + (TaskListIdentifier::Dump, TaskListIdentifier::Index(_)) => Ordering::Greater, + (TaskListIdentifier::Dump, TaskListIdentifier::Dump) => { + unreachable!("There should be only one Dump task list") + } } } } @@ -115,18 +125,28 @@ impl PartialOrd for TaskList { } } +#[derive(PartialEq, Eq, Hash, Debug, Clone)] +enum TaskListIdentifier { + Index(String), + Dump, +} + #[derive(Default)] struct TaskQueue { /// Maps index uids to their TaskList, for quick access - index_tasks: HashMap>>, + index_tasks: HashMap>>, /// A queue that orders TaskList by the priority of their fist update queue: BinaryHeap>>, } impl TaskQueue { fn insert(&mut self, task: Task) { - let uid = task.index_uid.into_inner(); let id = task.id; + let uid = match task.index_uid { + Some(uid) => TaskListIdentifier::Index(uid.into_inner()), + None if matches!(task.content, TaskContent::Dump { .. }) => TaskListIdentifier::Dump, + None => unreachable!("invalid task state"), + }; let kind = match task.content { TaskContent::DocumentAddition { documents_count, @@ -142,7 +162,13 @@ impl TaskQueue { } => TaskType::DocumentUpdate { number: documents_count, }, - _ => TaskType::Other, + TaskContent::Dump { .. } => TaskType::Dump, + TaskContent::DocumentDeletion(_) + | TaskContent::SettingsUpdate { .. } + | TaskContent::IndexDeletion + | TaskContent::IndexCreation { .. } + | TaskContent::IndexUpdate { .. } => TaskType::IndexUpdate, + _ => unreachable!("unhandled task type"), }; let task = PendingTask { kind, id }; @@ -160,7 +186,7 @@ impl TaskQueue { list.push(task); } Entry::Vacant(entry) => { - let mut task_list = TaskList::new(entry.key().to_owned()); + let mut task_list = TaskList::new(entry.key().clone()); task_list.push(task); let task_list = Arc::new(AtomicRefCell::new(task_list)); entry.insert(task_list.clone()); @@ -181,7 +207,7 @@ impl TaskQueue { // After being mutated, the head is reinserted to the correct position. self.queue.push(head); } else { - self.index_tasks.remove(&head.borrow().index); + self.index_tasks.remove(&head.borrow().id); } Some(result) @@ -193,11 +219,12 @@ impl TaskQueue { } pub struct Scheduler { - jobs: VecDeque, + // TODO: currently snapshots are non persistent tasks, and are treated differently. + snapshots: VecDeque, tasks: TaskQueue, store: TaskStore, - processing: Vec, + processing: Processing, next_fetched_task_id: TaskId, config: SchedulerConfig, /// Notifies the update loop that a new task was received @@ -205,14 +232,11 @@ pub struct Scheduler { } impl Scheduler { - pub fn new

( + pub fn new( store: TaskStore, - performer: Arc

, + performers: Vec>, mut config: SchedulerConfig, - ) -> Result>> - where - P: TaskPerformer, - { + ) -> Result>> { let (notifier, rcv) = watch::channel(()); let debounce_time = config.debounce_duration_sec; @@ -223,11 +247,11 @@ impl Scheduler { } let this = Self { - jobs: VecDeque::new(), + snapshots: VecDeque::new(), tasks: TaskQueue::default(), store, - processing: Vec::new(), + processing: Processing::Nothing, next_fetched_task_id: 0, config, notifier, @@ -240,7 +264,7 @@ impl Scheduler { let update_loop = UpdateLoop::new( this.clone(), - performer, + performers, debounce_time.filter(|&v| v > 0).map(Duration::from_secs), rcv, ); @@ -250,10 +274,6 @@ impl Scheduler { Ok(this) } - pub async fn dump(&self, path: &Path, file_store: UpdateFileStore) -> Result<()> { - self.store.dump(path, file_store).await - } - fn register_task(&mut self, task: Task) { assert!(!task.is_finished()); self.tasks.insert(task); @@ -261,7 +281,7 @@ impl Scheduler { /// Clears the processing list, this method should be called when the processing of a batch is finished. pub fn finish(&mut self) { - self.processing.clear(); + self.processing = Processing::Nothing; } pub fn notify(&self) { @@ -269,13 +289,27 @@ impl Scheduler { } fn notify_if_not_empty(&self) { - if !self.jobs.is_empty() || !self.tasks.is_empty() { + if !self.snapshots.is_empty() || !self.tasks.is_empty() { self.notify(); } } - pub async fn update_tasks(&self, tasks: Vec) -> Result> { - self.store.update_tasks(tasks).await + pub async fn update_tasks(&self, content: BatchContent) -> Result { + match content { + BatchContent::DocumentsAdditionBatch(tasks) => { + let tasks = self.store.update_tasks(tasks).await?; + Ok(BatchContent::DocumentsAdditionBatch(tasks)) + } + BatchContent::IndexUpdate(t) => { + let mut tasks = self.store.update_tasks(vec![t]).await?; + Ok(BatchContent::IndexUpdate(tasks.remove(0))) + } + BatchContent::Dump(t) => { + let mut tasks = self.store.update_tasks(vec![t]).await?; + Ok(BatchContent::Dump(tasks.remove(0))) + } + other => Ok(other), + } } pub async fn get_task(&self, id: TaskId, filter: Option) -> Result { @@ -294,16 +328,16 @@ impl Scheduler { pub async fn get_processing_tasks(&self) -> Result> { let mut tasks = Vec::new(); - for id in self.processing.iter() { - let task = self.store.get_task(*id, None).await?; + for id in self.processing.ids() { + let task = self.store.get_task(id, None).await?; tasks.push(task); } Ok(tasks) } - pub async fn schedule_job(&mut self, job: Job) { - self.jobs.push_back(job); + pub fn schedule_snapshot(&mut self, job: SnapshotJob) { + self.snapshots.push_back(job); self.notify(); } @@ -329,106 +363,168 @@ impl Scheduler { } /// Prepare the next batch, and set `processing` to the ids in that batch. - pub async fn prepare(&mut self) -> Result { + pub async fn prepare(&mut self) -> Result { // If there is a job to process, do it first. - if let Some(job) = self.jobs.pop_front() { + if let Some(job) = self.snapshots.pop_front() { // There is more work to do, notify the update loop self.notify_if_not_empty(); - return Ok(Pending::Job(job)); + let batch = Batch::new(None, BatchContent::Snapshot(job)); + return Ok(batch); } + // Try to fill the queue with pending tasks. self.fetch_pending_tasks().await?; - make_batch(&mut self.tasks, &mut self.processing, &self.config); + self.processing = make_batch(&mut self.tasks, &self.config); log::debug!("prepared batch with {} tasks", self.processing.len()); - if !self.processing.is_empty() { - let ids = std::mem::take(&mut self.processing); + if !self.processing.is_nothing() { + let (processing, mut content) = self + .store + .get_processing_tasks(std::mem::take(&mut self.processing)) + .await?; - let (ids, mut tasks) = self.store.get_pending_tasks(ids).await?; - - // The batch id is the id of the first update it contains - let id = match tasks.first() { + // The batch id is the id of the first update it contains. At this point we must have a + // valid batch that contains at least 1 task. + let id = match content.first() { Some(Task { id, .. }) => *id, _ => panic!("invalid batch"), }; - tasks.iter_mut().for_each(|t| { - t.events.push(TaskEvent::Batched { - batch_id: id, - timestamp: OffsetDateTime::now_utc(), - }) + content.push_event(TaskEvent::Batched { + batch_id: id, + timestamp: OffsetDateTime::now_utc(), }); - self.processing = ids; + self.processing = processing; - let batch = Batch { - id, - created_at: OffsetDateTime::now_utc(), - tasks, - }; + let batch = Batch::new(Some(id), content); // There is more work to do, notify the update loop self.notify_if_not_empty(); - Ok(Pending::Batch(batch)) + Ok(batch) } else { - Ok(Pending::Nothing) + Ok(Batch::empty()) } } } -#[derive(Debug)] -pub enum Pending { - Batch(Batch), - Job(Job), +#[derive(Debug, PartialEq)] +pub enum Processing { + DocumentAdditions(Vec), + IndexUpdate(TaskId), + Dump(TaskId), + /// Variant used when there is nothing to process. Nothing, } -fn make_batch(tasks: &mut TaskQueue, processing: &mut Vec, config: &SchedulerConfig) { - processing.clear(); +impl Default for Processing { + fn default() -> Self { + Self::Nothing + } +} - let mut doc_count = 0; - tasks.head_mut(|list| match list.peek().copied() { - Some(PendingTask { - kind: TaskType::Other, - id, - }) => { - processing.push(id); - list.pop(); +enum ProcessingIter<'a> { + Many(slice::Iter<'a, TaskId>), + Single(Option), +} + +impl<'a> Iterator for ProcessingIter<'a> { + type Item = TaskId; + + fn next(&mut self) -> Option { + match self { + ProcessingIter::Many(iter) => iter.next().copied(), + ProcessingIter::Single(val) => val.take(), } - Some(PendingTask { kind, .. }) => loop { - match list.peek() { - Some(pending) if pending.kind == kind => { - // We always need to process at least one task for the scheduler to make progress. - if processing.len() >= config.max_batch_size.unwrap_or(usize::MAX).max(1) { - break; - } - let pending = list.pop().unwrap(); - processing.push(pending.id); + } +} - // We add the number of documents to the count if we are scheduling document additions and - // stop adding if we already have enough. - // - // We check that bound only after adding the current task to the batch, so that a batch contains at least one task. - match pending.kind { - TaskType::DocumentUpdate { number } - | TaskType::DocumentAddition { number } => { - doc_count += number; +impl Processing { + fn is_nothing(&self) -> bool { + matches!(self, Processing::Nothing) + } - if doc_count >= config.max_documents_per_batch.unwrap_or(usize::MAX) { + pub fn ids(&self) -> impl Iterator + '_ { + match self { + Processing::DocumentAdditions(v) => ProcessingIter::Many(v.iter()), + Processing::IndexUpdate(id) | Processing::Dump(id) => ProcessingIter::Single(Some(*id)), + Processing::Nothing => ProcessingIter::Single(None), + } + } + + pub fn len(&self) -> usize { + match self { + Processing::DocumentAdditions(v) => v.len(), + Processing::IndexUpdate(_) | Processing::Dump(_) => 1, + Processing::Nothing => 0, + } + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +fn make_batch(tasks: &mut TaskQueue, config: &SchedulerConfig) -> Processing { + let mut doc_count = 0; + tasks + .head_mut(|list| match list.peek().copied() { + Some(PendingTask { + kind: TaskType::IndexUpdate, + id, + }) => { + list.pop(); + Processing::IndexUpdate(id) + } + Some(PendingTask { + kind: TaskType::Dump, + id, + }) => { + list.pop(); + Processing::Dump(id) + } + Some(PendingTask { kind, .. }) => { + let mut task_list = Vec::new(); + loop { + match list.peek() { + Some(pending) if pending.kind == kind => { + // We always need to process at least one task for the scheduler to make progress. + if task_list.len() >= config.max_batch_size.unwrap_or(usize::MAX).max(1) + { break; } + let pending = list.pop().unwrap(); + task_list.push(pending.id); + + // We add the number of documents to the count if we are scheduling document additions and + // stop adding if we already have enough. + // + // We check that bound only after adding the current task to the batch, so that a batch contains at least one task. + match pending.kind { + TaskType::DocumentUpdate { number } + | TaskType::DocumentAddition { number } => { + doc_count += number; + + if doc_count + >= config.max_documents_per_batch.unwrap_or(usize::MAX) + { + break; + } + } + _ => (), + } } - _ => (), + _ => break, } } - _ => break, + Processing::DocumentAdditions(task_list) } - }, - None => (), - }); + None => Processing::Nothing, + }) + .unwrap_or(Processing::Nothing) } #[cfg(test)] @@ -440,10 +536,10 @@ mod test { use super::*; - fn gen_task(id: TaskId, index_uid: &str, content: TaskContent) -> Task { + fn gen_task(id: TaskId, index_uid: Option<&str>, content: TaskContent) -> Task { Task { id, - index_uid: IndexUid::new_unchecked(index_uid), + index_uid: index_uid.map(IndexUid::new_unchecked), content, events: vec![], } @@ -452,13 +548,13 @@ mod test { #[test] fn register_updates_multiples_indexes() { let mut queue = TaskQueue::default(); - queue.insert(gen_task(0, "test1", TaskContent::IndexDeletion)); - queue.insert(gen_task(1, "test2", TaskContent::IndexDeletion)); - queue.insert(gen_task(2, "test2", TaskContent::IndexDeletion)); - queue.insert(gen_task(3, "test2", TaskContent::IndexDeletion)); - queue.insert(gen_task(4, "test1", TaskContent::IndexDeletion)); - queue.insert(gen_task(5, "test1", TaskContent::IndexDeletion)); - queue.insert(gen_task(6, "test2", TaskContent::IndexDeletion)); + queue.insert(gen_task(0, Some("test1"), TaskContent::IndexDeletion)); + queue.insert(gen_task(1, Some("test2"), TaskContent::IndexDeletion)); + queue.insert(gen_task(2, Some("test2"), TaskContent::IndexDeletion)); + queue.insert(gen_task(3, Some("test2"), TaskContent::IndexDeletion)); + queue.insert(gen_task(4, Some("test1"), TaskContent::IndexDeletion)); + queue.insert(gen_task(5, Some("test1"), TaskContent::IndexDeletion)); + queue.insert(gen_task(6, Some("test2"), TaskContent::IndexDeletion)); let test1_tasks = queue .head_mut(|tasks| tasks.drain().map(|t| t.id).collect::>()) @@ -486,40 +582,45 @@ mod test { documents_count: 0, allow_index_creation: true, }; - queue.insert(gen_task(0, "test1", content.clone())); - queue.insert(gen_task(1, "test2", content.clone())); - queue.insert(gen_task(2, "test2", TaskContent::IndexDeletion)); - queue.insert(gen_task(3, "test2", content.clone())); - queue.insert(gen_task(4, "test1", content.clone())); - queue.insert(gen_task(5, "test1", TaskContent::IndexDeletion)); - queue.insert(gen_task(6, "test2", content.clone())); - queue.insert(gen_task(7, "test1", content)); - - let mut batch = Vec::new(); + queue.insert(gen_task(0, Some("test1"), content.clone())); + queue.insert(gen_task(1, Some("test2"), content.clone())); + queue.insert(gen_task(2, Some("test2"), TaskContent::IndexDeletion)); + queue.insert(gen_task(3, Some("test2"), content.clone())); + queue.insert(gen_task(4, Some("test1"), content.clone())); + queue.insert(gen_task(5, Some("test1"), TaskContent::IndexDeletion)); + queue.insert(gen_task(6, Some("test2"), content.clone())); + queue.insert(gen_task(7, Some("test1"), content)); + queue.insert(gen_task( + 8, + None, + TaskContent::Dump { + uid: "adump".to_owned(), + }, + )); let config = SchedulerConfig::default(); - make_batch(&mut queue, &mut batch, &config); - assert_eq!(batch, &[0, 4]); - batch.clear(); - make_batch(&mut queue, &mut batch, &config); - assert_eq!(batch, &[1]); + // Make sure that the dump is processed before everybody else. + let batch = make_batch(&mut queue, &config); + assert_eq!(batch, Processing::Dump(8)); - batch.clear(); - make_batch(&mut queue, &mut batch, &config); - assert_eq!(batch, &[2]); + let batch = make_batch(&mut queue, &config); + assert_eq!(batch, Processing::DocumentAdditions(vec![0, 4])); - batch.clear(); - make_batch(&mut queue, &mut batch, &config); - assert_eq!(batch, &[3, 6]); + let batch = make_batch(&mut queue, &config); + assert_eq!(batch, Processing::DocumentAdditions(vec![1])); - batch.clear(); - make_batch(&mut queue, &mut batch, &config); - assert_eq!(batch, &[5]); + let batch = make_batch(&mut queue, &config); + assert_eq!(batch, Processing::IndexUpdate(2)); - batch.clear(); - make_batch(&mut queue, &mut batch, &config); - assert_eq!(batch, &[7]); + let batch = make_batch(&mut queue, &config); + assert_eq!(batch, Processing::DocumentAdditions(vec![3, 6])); + + let batch = make_batch(&mut queue, &config); + assert_eq!(batch, Processing::IndexUpdate(5)); + + let batch = make_batch(&mut queue, &config); + assert_eq!(batch, Processing::DocumentAdditions(vec![7])); assert!(queue.is_empty()); } diff --git a/meilisearch-lib/src/tasks/task.rs b/meilisearch-lib/src/tasks/task.rs index ecbd4ca62..0499d9702 100644 --- a/meilisearch-lib/src/tasks/task.rs +++ b/meilisearch-lib/src/tasks/task.rs @@ -1,17 +1,13 @@ -use std::path::PathBuf; - use meilisearch_error::ResponseError; use milli::update::{DocumentAdditionResult, IndexDocumentsMethod}; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; -use tokio::sync::oneshot; use uuid::Uuid; use super::batch::BatchId; use crate::{ index::{Settings, Unchecked}, - index_resolver::{error::IndexResolverError, IndexUid}, - snapshot::SnapshotJob, + index_resolver::IndexUid, }; pub type TaskId = u64; @@ -66,6 +62,22 @@ pub enum TaskEvent { }, } +impl TaskEvent { + pub fn succeeded(result: TaskResult) -> Self { + Self::Succeded { + result, + timestamp: OffsetDateTime::now_utc(), + } + } + + pub fn failed(error: ResponseError) -> Self { + Self::Failed { + error, + timestamp: OffsetDateTime::now_utc(), + } + } +} + /// A task represents an operation that Meilisearch must do. /// It's stored on disk and executed from the lowest to highest Task id. /// Everytime a new task is created it has a higher Task id than the previous one. @@ -74,7 +86,17 @@ pub enum TaskEvent { #[cfg_attr(test, derive(proptest_derive::Arbitrary))] pub struct Task { pub id: TaskId, - pub index_uid: IndexUid, + /// The name of the index the task is targeting. If it isn't targeting any index (i.e Dump task) + /// then this is None + // TODO: when next forward breaking dumps, it would be a good idea to move this field inside of + // the TaskContent. + #[cfg_attr( + test, + proptest( + strategy = "proptest::option::weighted(proptest::option::Probability::new(0.99), IndexUid::arbitrary())" + ) + )] + pub index_uid: Option, pub content: TaskContent, pub events: Vec, } @@ -100,33 +122,6 @@ impl Task { } } -/// A job is like a volatile priority `Task`. -/// It should be processed as fast as possible and is not stored on disk. -/// This means, when Meilisearch is closed all your unprocessed jobs will disappear. -#[derive(Debug, derivative::Derivative)] -#[derivative(PartialEq)] -pub enum Job { - Dump { - #[derivative(PartialEq = "ignore")] - ret: oneshot::Sender, IndexResolverError>>, - path: PathBuf, - }, - Snapshot(#[derivative(PartialEq = "ignore")] SnapshotJob), - Empty, -} - -impl Default for Job { - fn default() -> Self { - Self::Empty - } -} - -impl Job { - pub fn take(&mut self) -> Self { - std::mem::take(self) - } -} - #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[cfg_attr(test, derive(proptest_derive::Arbitrary))] pub enum DocumentDeletion { @@ -161,6 +156,9 @@ pub enum TaskContent { IndexUpdate { primary_key: Option, }, + Dump { + uid: String, + }, } #[cfg(test)] diff --git a/meilisearch-lib/src/tasks/task_store/mod.rs b/meilisearch-lib/src/tasks/task_store/mod.rs index bdcd13f37..3645717e6 100644 --- a/meilisearch-lib/src/tasks/task_store/mod.rs +++ b/meilisearch-lib/src/tasks/task_store/mod.rs @@ -9,7 +9,9 @@ use log::debug; use milli::heed::{Env, RwTxn}; use time::OffsetDateTime; +use super::batch::BatchContent; use super::error::TaskError; +use super::scheduler::Processing; use super::task::{Task, TaskContent, TaskId}; use super::Result; use crate::index_resolver::IndexUid; @@ -30,10 +32,13 @@ pub struct TaskFilter { impl TaskFilter { fn pass(&self, task: &Task) -> bool { - self.indexes - .as_ref() - .map(|indexes| indexes.contains(&*task.index_uid)) - .unwrap_or(true) + match task.index_uid { + Some(ref index_uid) => self + .indexes + .as_ref() + .map_or(true, |indexes| indexes.contains(index_uid.as_str())), + None => false, + } } /// Adds an index to the filter, so the filter must match this index. @@ -66,7 +71,11 @@ impl TaskStore { Ok(Self { store }) } - pub async fn register(&self, index_uid: IndexUid, content: TaskContent) -> Result { + pub async fn register( + &self, + index_uid: Option, + content: TaskContent, + ) -> Result { debug!("registering update: {:?}", content); let store = self.store.clone(); let task = tokio::task::spawn_blocking(move || -> Result { @@ -114,19 +123,44 @@ impl TaskStore { } } - pub async fn get_pending_tasks(&self, ids: Vec) -> Result<(Vec, Vec)> { + /// This methods takes a `Processing` which contains the next task ids to process, and returns + /// the coresponding tasks along with the ownership to the passed processing. + /// + /// We need get_processing_tasks to take ownership over `Processing` because we need it to be + /// valid for 'static. + pub async fn get_processing_tasks( + &self, + processing: Processing, + ) -> Result<(Processing, BatchContent)> { let store = self.store.clone(); let tasks = tokio::task::spawn_blocking(move || -> Result<_> { - let mut tasks = Vec::new(); let txn = store.rtxn()?; - for id in ids.iter() { - let task = store - .get(&txn, *id)? - .ok_or(TaskError::UnexistingTask(*id))?; - tasks.push(task); - } - Ok((ids, tasks)) + let content = match processing { + Processing::DocumentAdditions(ref ids) => { + let mut tasks = Vec::new(); + + for id in ids.iter() { + let task = store + .get(&txn, *id)? + .ok_or(TaskError::UnexistingTask(*id))?; + tasks.push(task); + } + BatchContent::DocumentsAdditionBatch(tasks) + } + Processing::IndexUpdate(id) => { + let task = store.get(&txn, id)?.ok_or(TaskError::UnexistingTask(id))?; + BatchContent::IndexUpdate(task) + } + Processing::Dump(id) => { + let task = store.get(&txn, id)?.ok_or(TaskError::UnexistingTask(id))?; + debug_assert!(matches!(task.content, TaskContent::Dump { .. })); + BatchContent::Dump(task) + } + Processing::Nothing => BatchContent::Empty, + }; + + Ok((processing, content)) }) .await??; @@ -169,13 +203,14 @@ impl TaskStore { } pub async fn dump( - &self, + env: Arc, dir_path: impl AsRef, update_file_store: UpdateFileStore, ) -> Result<()> { + let store = Self::new(env)?; let update_dir = dir_path.as_ref().join("updates"); let updates_file = update_dir.join("data.jsonl"); - let tasks = self.list_tasks(None, None, None).await?; + let tasks = store.list_tasks(None, None, None).await?; let dir_path = dir_path.as_ref().to_path_buf(); tokio::task::spawn_blocking(move || -> Result<()> { @@ -223,7 +258,7 @@ impl TaskStore { #[cfg(test)] pub mod test { - use crate::tasks::task_store::store::test::tmp_env; + use crate::tasks::{scheduler::Processing, task_store::store::test::tmp_env}; use super::*; @@ -252,6 +287,14 @@ pub mod test { Ok(Self::Real(TaskStore::new(env)?)) } + pub async fn dump( + env: Arc, + path: impl AsRef, + update_file_store: UpdateFileStore, + ) -> Result<()> { + TaskStore::dump(env, path, update_file_store).await + } + pub fn mock(mocker: Mocker) -> Self { Self::Mock(Arc::new(mocker)) } @@ -272,12 +315,12 @@ pub mod test { } } - pub async fn get_pending_tasks( + pub async fn get_processing_tasks( &self, - tasks: Vec, - ) -> Result<(Vec, Vec)> { + tasks: Processing, + ) -> Result<(Processing, BatchContent)> { match self { - Self::Real(s) => s.get_pending_tasks(tasks).await, + Self::Real(s) => s.get_processing_tasks(tasks).await, Self::Mock(m) => unsafe { m.get("get_pending_task").call(tasks) }, } } @@ -294,18 +337,11 @@ pub mod test { } } - pub async fn dump( + pub async fn register( &self, - path: impl AsRef, - update_file_store: UpdateFileStore, - ) -> Result<()> { - match self { - Self::Real(s) => s.dump(path, update_file_store).await, - Self::Mock(m) => unsafe { m.get("dump").call((path, update_file_store)) }, - } - } - - pub async fn register(&self, index_uid: IndexUid, content: TaskContent) -> Result { + index_uid: Option, + content: TaskContent, + ) -> Result { match self { Self::Real(s) => s.register(index_uid, content).await, Self::Mock(_m) => todo!(), @@ -335,7 +371,7 @@ pub mod test { let gen_task = |id: TaskId| Task { id, - index_uid: IndexUid::new_unchecked("test"), + index_uid: Some(IndexUid::new_unchecked("test")), content: TaskContent::IndexCreation { primary_key: None }, events: Vec::new(), }; diff --git a/meilisearch-lib/src/tasks/task_store/store.rs b/meilisearch-lib/src/tasks/task_store/store.rs index 4ff986d8b..75ece0ae8 100644 --- a/meilisearch-lib/src/tasks/task_store/store.rs +++ b/meilisearch-lib/src/tasks/task_store/store.rs @@ -108,8 +108,10 @@ impl Store { pub fn put(&self, txn: &mut RwTxn, task: &Task) -> Result<()> { self.tasks.put(txn, &BEU64::new(task.id), task)?; - self.uids_task_ids - .put(txn, &(&task.index_uid, task.id), &())?; + // only add the task to the indexes index if it has an index_uid + if let Some(ref index_uid) = task.index_uid { + self.uids_task_ids.put(txn, &(index_uid, task.id), &())?; + } Ok(()) } @@ -325,7 +327,7 @@ pub mod test { let tasks = (0..100) .map(|_| Task { id: rand::random(), - index_uid: IndexUid::new_unchecked("test"), + index_uid: Some(IndexUid::new_unchecked("test")), content: TaskContent::IndexDeletion, events: vec![], }) @@ -356,14 +358,14 @@ pub mod test { let task_1 = Task { id: 1, - index_uid: IndexUid::new_unchecked("test"), + index_uid: Some(IndexUid::new_unchecked("test")), content: TaskContent::IndexDeletion, events: vec![], }; let task_2 = Task { id: 0, - index_uid: IndexUid::new_unchecked("test1"), + index_uid: Some(IndexUid::new_unchecked("test1")), content: TaskContent::IndexDeletion, events: vec![], }; @@ -379,18 +381,28 @@ pub mod test { txn.abort().unwrap(); assert_eq!(tasks.len(), 1); - assert_eq!(&*tasks.first().unwrap().index_uid, "test"); + assert_eq!( + tasks + .first() + .as_ref() + .unwrap() + .index_uid + .as_ref() + .unwrap() + .as_str(), + "test" + ); // same thing but invert the ids let task_1 = Task { id: 0, - index_uid: IndexUid::new_unchecked("test"), + index_uid: Some(IndexUid::new_unchecked("test")), content: TaskContent::IndexDeletion, events: vec![], }; let task_2 = Task { id: 1, - index_uid: IndexUid::new_unchecked("test1"), + index_uid: Some(IndexUid::new_unchecked("test1")), content: TaskContent::IndexDeletion, events: vec![], }; @@ -405,7 +417,17 @@ pub mod test { let tasks = store.list_tasks(&txn, None, Some(filter), None).unwrap(); assert_eq!(tasks.len(), 1); - assert_eq!(&*tasks.first().unwrap().index_uid, "test"); + assert_eq!( + &*tasks + .first() + .as_ref() + .unwrap() + .index_uid + .as_ref() + .unwrap() + .as_str(), + "test" + ); } proptest! { diff --git a/meilisearch-lib/src/tasks/update_loop.rs b/meilisearch-lib/src/tasks/update_loop.rs index b09811721..01e88755a 100644 --- a/meilisearch-lib/src/tasks/update_loop.rs +++ b/meilisearch-lib/src/tasks/update_loop.rs @@ -7,33 +7,29 @@ use tokio::time::interval_at; use super::batch::Batch; use super::error::Result; -use super::scheduler::Pending; -use super::{Scheduler, TaskPerformer}; +use super::{BatchHandler, Scheduler}; use crate::tasks::task::TaskEvent; /// The update loop sequentially performs batches of updates by asking the scheduler for a batch, /// and handing it to the `TaskPerformer`. -pub struct UpdateLoop { +pub struct UpdateLoop { scheduler: Arc>, - performer: Arc

, + performers: Vec>, notifier: Option>, debounce_duration: Option, } -impl

UpdateLoop

-where - P: TaskPerformer + Send + Sync + 'static, -{ +impl UpdateLoop { pub fn new( scheduler: Arc>, - performer: Arc

, + performers: Vec>, debuf_duration: Option, notifier: watch::Receiver<()>, ) -> Self { Self { scheduler, - performer, + performers, debounce_duration: debuf_duration, notifier: Some(notifier), } @@ -59,34 +55,29 @@ where } async fn process_next_batch(&self) -> Result<()> { - let pending = { self.scheduler.write().await.prepare().await? }; - match pending { - Pending::Batch(mut batch) => { - for task in &mut batch.tasks { - task.events - .push(TaskEvent::Processing(OffsetDateTime::now_utc())); - } + let mut batch = { self.scheduler.write().await.prepare().await? }; + let performer = self + .performers + .iter() + .find(|p| p.accept(&batch)) + .expect("No performer found for batch") + .clone(); - batch.tasks = { - self.scheduler - .read() - .await - .update_tasks(batch.tasks) - .await? - }; + batch + .content + .push_event(TaskEvent::Processing(OffsetDateTime::now_utc())); - let performer = self.performer.clone(); + batch.content = { + self.scheduler + .read() + .await + .update_tasks(batch.content) + .await? + }; - let batch = performer.process_batch(batch).await; + let batch = performer.process_batch(batch).await; - self.handle_batch_result(batch).await?; - } - Pending::Job(job) => { - let performer = self.performer.clone(); - performer.process_job(job).await; - } - Pending::Nothing => (), - } + self.handle_batch_result(batch, performer).await?; Ok(()) } @@ -96,13 +87,17 @@ where /// When a task is processed, the result of the process is pushed to its event list. The /// `handle_batch_result` make sure that the new state is saved to the store. /// The tasks are then removed from the processing queue. - async fn handle_batch_result(&self, mut batch: Batch) -> Result<()> { + async fn handle_batch_result( + &self, + mut batch: Batch, + performer: Arc, + ) -> Result<()> { let mut scheduler = self.scheduler.write().await; - let tasks = scheduler.update_tasks(batch.tasks).await?; + let content = scheduler.update_tasks(batch.content).await?; scheduler.finish(); drop(scheduler); - batch.tasks = tasks; - self.performer.finish(&batch).await; + batch.content = content; + performer.finish(&batch).await; Ok(()) } } diff --git a/meilisearch-lib/src/update_file_store.rs b/meilisearch-lib/src/update_file_store.rs index ec355a56e..3a60dfe26 100644 --- a/meilisearch-lib/src/update_file_store.rs +++ b/meilisearch-lib/src/update_file_store.rs @@ -26,7 +26,7 @@ pub struct UpdateFile { #[error("Error while persisting update to disk: {0}")] pub struct UpdateFileStoreError(Box); -type Result = std::result::Result; +pub type Result = std::result::Result; macro_rules! into_update_store_error { ($($other:path),*) => { @@ -249,7 +249,7 @@ mod test { pub async fn delete(&self, uuid: Uuid) -> Result<()> { match self { MockUpdateFileStore::Real(s) => s.delete(uuid).await, - MockUpdateFileStore::Mock(_) => todo!(), + MockUpdateFileStore::Mock(mocker) => unsafe { mocker.get("delete").call(uuid) }, } } }