2410: Make dump a task r=Kerollmops a=MarinPostma

This PR transforms the dump task into a proper task.
The `GET /dumps/:dump_uid` is removed.


Some changes were made to make this work, and a bit a refactoring was necessary.
- The `dump_actor` module has been renamed do `dumps` and moved to the root
- There isn't a `DumpActor` anymore, and the dump process is handled by the `DumpHandler`.
- The `TaskPerformer` is renamed to `BatchHandler`
- The `BatchHandler` trait no longer has a `perform_job` method, but instead has a `accept` method returning whether a handler can proccess a batch
- The scheduler now accept a list of `BatchHandler`, and iterates trhough them until it finds one to accept the current batch.
- `Job` doesn't exist anymore, and everything in now inside of the `BatchContent` enum.
- The `Vec<TaskId>` from `Batch` is replaced with a `BatchContent` enum which hints at the content.
- The Scheduler is slightly modified to accept batch, and prioritize them before regular tasks.
- The `TaskList` are not identified by a `String` representing the index uid anymore, but by a `TaskListIdentifier` which also works for dumps which are not targeting any specific indexes.
- The `GET /dump/:dump_id` no longer exists
- `DumpActorError` is renamed to `DumpError`


close #2410 

Co-authored-by: ad hoc <postma.marin@protonmail.com>
This commit is contained in:
bors[bot] 2022-05-30 14:09:43 +00:00 committed by GitHub
commit 3441cc6c36
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 1398 additions and 1292 deletions

View File

@ -2,18 +2,15 @@ use actix_web::{web, HttpRequest, HttpResponse};
use log::debug;
use meilisearch_error::ResponseError;
use meilisearch_lib::MeiliSearch;
use serde::{Deserialize, Serialize};
use serde_json::json;
use crate::analytics::Analytics;
use crate::extractors::authentication::{policies::*, GuardedData};
use crate::extractors::sequential_extractor::SeqHandler;
use crate::task::SummarizedTaskView;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_dump))))
.service(
web::resource("/{dump_uid}/status").route(web::get().to(SeqHandler(get_dump_status))),
);
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_dump))));
}
pub async fn create_dump(
@ -23,29 +20,8 @@ pub async fn create_dump(
) -> Result<HttpResponse, ResponseError> {
analytics.publish("Dump Created".to_string(), json!({}), Some(&req));
let res = meilisearch.create_dump().await?;
let res: SummarizedTaskView = meilisearch.register_dump_task().await?.into();
debug!("returns: {:?}", res);
Ok(HttpResponse::Accepted().json(res))
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct DumpStatusResponse {
status: String,
}
#[derive(Deserialize)]
struct DumpParam {
dump_uid: String,
}
async fn get_dump_status(
meilisearch: GuardedData<ActionPolicy<{ actions::DUMPS_GET }>, MeiliSearch>,
path: web::Path<DumpParam>,
) -> Result<HttpResponse, ResponseError> {
let res = meilisearch.dump_info(path.dump_uid.clone()).await?;
debug!("returns: {:?}", res);
Ok(HttpResponse::Ok().json(res))
}

View File

@ -24,6 +24,7 @@ enum TaskType {
DocumentDeletion,
SettingsUpdate,
ClearAll,
DumpCreation,
}
impl From<TaskContent> for TaskType {
@ -43,6 +44,7 @@ impl From<TaskContent> for TaskType {
TaskContent::IndexDeletion => TaskType::IndexDeletion,
TaskContent::IndexCreation { .. } => TaskType::IndexCreation,
TaskContent::IndexUpdate { .. } => TaskType::IndexUpdate,
TaskContent::Dump { .. } => TaskType::DumpCreation,
_ => unreachable!("unexpected task type"),
}
}
@ -80,6 +82,8 @@ enum TaskDetails {
},
#[serde(rename_all = "camelCase")]
ClearAll { deleted_documents: Option<u64> },
#[serde(rename_all = "camelCase")]
Dump { dump_uid: String },
}
/// Serialize a `time::Duration` as a best effort ISO 8601 while waiting for
@ -137,7 +141,7 @@ fn serialize_duration<S: Serializer>(
#[serde(rename_all = "camelCase")]
pub struct TaskView {
uid: TaskId,
index_uid: String,
index_uid: Option<String>,
status: TaskStatus,
#[serde(rename = "type")]
task_type: TaskType,
@ -216,6 +220,10 @@ impl From<Task> for TaskView {
TaskType::IndexUpdate,
Some(TaskDetails::IndexInfo { primary_key }),
),
TaskContent::Dump { uid } => (
TaskType::DumpCreation,
Some(TaskDetails::Dump { dump_uid: uid }),
),
};
// An event always has at least one event: "Created"
@ -313,7 +321,7 @@ impl From<Task> for TaskView {
Self {
uid: id,
index_uid: index_uid.into_inner(),
index_uid: index_uid.map(|u| u.into_inner()),
status,
task_type,
details,
@ -342,7 +350,7 @@ impl From<Vec<TaskView>> for TaskListView {
#[serde(rename_all = "camelCase")]
pub struct SummarizedTaskView {
uid: TaskId,
index_uid: String,
index_uid: Option<String>,
status: TaskStatus,
#[serde(rename = "type")]
task_type: TaskType,
@ -365,7 +373,7 @@ impl From<Task> for SummarizedTaskView {
Self {
uid: other.id,
index_uid: other.index_uid.to_string(),
index_uid: other.index_uid.map(|u| u.into_inner()),
status: TaskStatus::Enqueued,
task_type: other.content.into(),
enqueued_at,

View File

@ -45,7 +45,6 @@ pub static AUTHORIZATIONS: Lazy<HashMap<(&'static str, &'static str), HashSet<&'
("GET", "/indexes/products/stats") => hashset!{"stats.get", "*"},
("GET", "/stats") => hashset!{"stats.get", "*"},
("POST", "/dumps") => hashset!{"dumps.create", "*"},
("GET", "/dumps/0/status") => hashset!{"dumps.get", "*"},
("GET", "/version") => hashset!{"version", "*"},
}
});

View File

@ -6,23 +6,6 @@ use serde_json::json;
use self::data::GetDump;
#[actix_rt::test]
async fn get_unexisting_dump_status() {
let server = Server::new().await;
let (response, code) = server.get_dump_status("foobar").await;
assert_eq!(code, 404);
let expected_response = json!({
"message": "Dump `foobar` not found.",
"code": "dump_not_found",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#dump_not_found"
});
assert_eq!(response, expected_response);
}
// all the following test are ignored on windows. See #2364
#[actix_rt::test]
#[cfg_attr(target_os = "windows", ignore)]

View File

@ -187,7 +187,7 @@ impl From<(UpdateStatus, String, TaskId)> for Task {
// Dummy task
let mut task = Task {
id: task_id,
index_uid: IndexUid::new(uid).unwrap(),
index_uid: Some(IndexUid::new(uid).unwrap()),
content: TaskContent::IndexDeletion,
events: Vec::new(),
};

View File

@ -3,14 +3,10 @@ use meilisearch_error::{internal_error, Code, ErrorCode};
use crate::{index_resolver::error::IndexResolverError, tasks::error::TaskError};
pub type Result<T> = std::result::Result<T, DumpActorError>;
pub type Result<T> = std::result::Result<T, DumpError>;
#[derive(thiserror::Error, Debug)]
pub enum DumpActorError {
#[error("A dump is already processing. You must wait until the current process is finished before requesting another dump.")]
DumpAlreadyRunning,
#[error("Dump `{0}` not found.")]
DumpDoesNotExist(String),
pub enum DumpError {
#[error("An internal error has occurred. `{0}`.")]
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("{0}")]
@ -18,7 +14,7 @@ pub enum DumpActorError {
}
internal_error!(
DumpActorError: milli::heed::Error,
DumpError: milli::heed::Error,
std::io::Error,
tokio::task::JoinError,
tokio::sync::oneshot::error::RecvError,
@ -29,13 +25,11 @@ internal_error!(
TaskError
);
impl ErrorCode for DumpActorError {
impl ErrorCode for DumpError {
fn error_code(&self) -> Code {
match self {
DumpActorError::DumpAlreadyRunning => Code::DumpAlreadyInProgress,
DumpActorError::DumpDoesNotExist(_) => Code::DumpNotFound,
DumpActorError::Internal(_) => Code::Internal,
DumpActorError::IndexResolver(e) => e.error_code(),
DumpError::Internal(_) => Code::Internal,
DumpError::IndexResolver(e) => e.error_code(),
}
}
}

View File

@ -0,0 +1,180 @@
#[cfg(not(test))]
pub use real::DumpHandler;
#[cfg(test)]
pub use test::MockDumpHandler as DumpHandler;
use time::{macros::format_description, OffsetDateTime};
/// Generate uid from creation date
pub fn generate_uid() -> String {
OffsetDateTime::now_utc()
.format(format_description!(
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"
))
.unwrap()
}
mod real {
use std::{fs::File, path::PathBuf, sync::Arc};
use log::{info, trace};
use meilisearch_auth::AuthController;
use milli::heed::Env;
use tokio::fs::create_dir_all;
use crate::analytics;
use crate::compression::to_tar_gz;
use crate::dump::error::{DumpError, Result};
use crate::dump::{MetadataVersion, META_FILE_NAME};
use crate::index_resolver::{
index_store::IndexStore, meta_store::IndexMetaStore, IndexResolver,
};
use crate::tasks::TaskStore;
use crate::update_file_store::UpdateFileStore;
pub struct DumpHandler<U, I> {
dump_path: PathBuf,
db_path: PathBuf,
update_file_store: UpdateFileStore,
task_store_size: usize,
index_db_size: usize,
env: Arc<Env>,
index_resolver: Arc<IndexResolver<U, I>>,
}
impl<U, I> DumpHandler<U, I>
where
U: IndexMetaStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
pub fn new(
dump_path: PathBuf,
db_path: PathBuf,
update_file_store: UpdateFileStore,
task_store_size: usize,
index_db_size: usize,
env: Arc<Env>,
index_resolver: Arc<IndexResolver<U, I>>,
) -> Self {
Self {
dump_path,
db_path,
update_file_store,
task_store_size,
index_db_size,
env,
index_resolver,
}
}
pub async fn run(&self, uid: String) -> Result<()> {
trace!("Performing dump.");
create_dir_all(&self.dump_path).await?;
let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??;
let temp_dump_path = temp_dump_dir.path().to_owned();
let meta = MetadataVersion::new_v5(self.index_db_size, self.task_store_size);
let meta_path = temp_dump_path.join(META_FILE_NAME);
// TODO: blocking
let mut meta_file = File::create(&meta_path)?;
serde_json::to_writer(&mut meta_file, &meta)?;
analytics::copy_user_id(&self.db_path, &temp_dump_path);
create_dir_all(&temp_dump_path.join("indexes")).await?;
// TODO: this is blocking!!
AuthController::dump(&self.db_path, &temp_dump_path)?;
TaskStore::dump(
self.env.clone(),
&temp_dump_path,
self.update_file_store.clone(),
)
.await?;
self.index_resolver.dump(&temp_dump_path).await?;
let dump_path = self.dump_path.clone();
let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> {
// for now we simply copy the updates/updates_files
// FIXME: We may copy more files than necessary, if new files are added while we are
// performing the dump. We need a way to filter them out.
let temp_dump_file = tempfile::NamedTempFile::new_in(&dump_path)?;
to_tar_gz(temp_dump_path, temp_dump_file.path())
.map_err(|e| DumpError::Internal(e.into()))?;
let dump_path = dump_path.join(uid).with_extension("dump");
temp_dump_file.persist(&dump_path)?;
Ok(dump_path)
})
.await??;
info!("Created dump in {:?}.", dump_path);
Ok(())
}
}
}
#[cfg(test)]
mod test {
use std::marker::PhantomData;
use std::path::PathBuf;
use std::sync::Arc;
use milli::heed::Env;
use nelson::Mocker;
use crate::dump::error::Result;
use crate::index_resolver::IndexResolver;
use crate::index_resolver::{index_store::IndexStore, meta_store::IndexMetaStore};
use crate::update_file_store::UpdateFileStore;
use super::*;
pub enum MockDumpHandler<U, I> {
Real(super::real::DumpHandler<U, I>),
Mock(Mocker, PhantomData<(U, I)>),
}
impl<U, I> MockDumpHandler<U, I> {
pub fn mock(mocker: Mocker) -> Self {
Self::Mock(mocker, PhantomData)
}
}
impl<U, I> MockDumpHandler<U, I>
where
U: IndexMetaStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
pub fn new(
dump_path: PathBuf,
db_path: PathBuf,
update_file_store: UpdateFileStore,
task_store_size: usize,
index_db_size: usize,
env: Arc<Env>,
index_resolver: Arc<IndexResolver<U, I>>,
) -> Self {
Self::Real(super::real::DumpHandler::new(
dump_path,
db_path,
update_file_store,
task_store_size,
index_db_size,
env,
index_resolver,
))
}
pub async fn run(&self, uid: String) -> Result<()> {
match self {
DumpHandler::Real(real) => real.run(uid).await,
DumpHandler::Mock(mocker, _) => unsafe { mocker.get("run").call(uid) },
}
}
}
}

View File

@ -5,8 +5,8 @@ use std::path::{Path, PathBuf};
use serde_json::{Deserializer, Value};
use tempfile::NamedTempFile;
use crate::index_controller::dump_actor::compat::{self, v2, v3};
use crate::index_controller::dump_actor::Metadata;
use crate::dump::compat::{self, v2, v3};
use crate::dump::Metadata;
use crate::options::IndexerOpts;
/// The dump v2 reads the dump folder and patches all the needed file to make it compatible with a

View File

@ -9,8 +9,8 @@ use log::info;
use tempfile::tempdir;
use uuid::Uuid;
use crate::index_controller::dump_actor::compat::v3;
use crate::index_controller::dump_actor::Metadata;
use crate::dump::compat::v3;
use crate::dump::Metadata;
use crate::index_resolver::meta_store::{DumpEntry, IndexMeta};
use crate::options::IndexerOpts;
use crate::tasks::task::{Task, TaskId};
@ -66,6 +66,7 @@ pub fn load_dump(
index_db_size,
meta_env_size,
indexing_options,
"V5",
)
}

View File

@ -6,7 +6,7 @@ use meilisearch_auth::AuthController;
use milli::heed::EnvOpenOptions;
use crate::analytics;
use crate::index_controller::dump_actor::Metadata;
use crate::dump::Metadata;
use crate::index_resolver::IndexResolver;
use crate::options::IndexerOpts;
use crate::tasks::TaskStore;
@ -19,10 +19,11 @@ pub fn load_dump(
index_db_size: usize,
meta_env_size: usize,
indexing_options: &IndexerOpts,
version: &str,
) -> anyhow::Result<()> {
info!(
"Loading dump from {}, dump database version: {}, dump version: V4",
meta.dump_date, meta.db_version
"Loading dump from {}, dump database version: {}, dump version: {}",
meta.dump_date, meta.db_version, version
);
let mut options = EnvOpenOptions::new();

View File

@ -0,0 +1,256 @@
use std::fs::File;
use std::path::Path;
use anyhow::bail;
use log::info;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tempfile::TempDir;
use crate::compression::from_tar_gz;
use crate::options::IndexerOpts;
use self::loaders::{v2, v3, v4};
pub use handler::{generate_uid, DumpHandler};
mod compat;
pub mod error;
mod handler;
mod loaders;
const META_FILE_NAME: &str = "metadata.json";
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Metadata {
db_version: String,
index_db_size: usize,
update_db_size: usize,
#[serde(with = "time::serde::rfc3339")]
dump_date: OffsetDateTime,
}
impl Metadata {
pub fn new(index_db_size: usize, update_db_size: usize) -> Self {
Self {
db_version: env!("CARGO_PKG_VERSION").to_string(),
index_db_size,
update_db_size,
dump_date: OffsetDateTime::now_utc(),
}
}
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MetadataV1 {
pub db_version: String,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "dumpVersion")]
pub enum MetadataVersion {
V1(MetadataV1),
V2(Metadata),
V3(Metadata),
V4(Metadata),
// V5 is forward compatible with V4 but not backward compatible.
V5(Metadata),
}
impl MetadataVersion {
pub fn load_dump(
self,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
index_db_size: usize,
meta_env_size: usize,
indexing_options: &IndexerOpts,
) -> anyhow::Result<()> {
let version = self.version();
match self {
MetadataVersion::V1(_meta) => {
anyhow::bail!("The version 1 of the dumps is not supported anymore. You can re-export your dump from a version between 0.21 and 0.24, or start fresh from a version 0.25 onwards.")
}
MetadataVersion::V2(meta) => v2::load_dump(
meta,
src,
dst,
index_db_size,
meta_env_size,
indexing_options,
)?,
MetadataVersion::V3(meta) => v3::load_dump(
meta,
src,
dst,
index_db_size,
meta_env_size,
indexing_options,
)?,
MetadataVersion::V4(meta) | MetadataVersion::V5(meta) => v4::load_dump(
meta,
src,
dst,
index_db_size,
meta_env_size,
indexing_options,
version,
)?,
}
Ok(())
}
pub fn new_v5(index_db_size: usize, update_db_size: usize) -> Self {
let meta = Metadata::new(index_db_size, update_db_size);
Self::V5(meta)
}
pub fn db_version(&self) -> &str {
match self {
Self::V1(meta) => &meta.db_version,
Self::V2(meta) | Self::V3(meta) | Self::V4(meta) | Self::V5(meta) => &meta.db_version,
}
}
pub fn version(&self) -> &'static str {
match self {
MetadataVersion::V1(_) => "V1",
MetadataVersion::V2(_) => "V2",
MetadataVersion::V3(_) => "V3",
MetadataVersion::V4(_) => "V4",
MetadataVersion::V5(_) => "V5",
}
}
pub fn dump_date(&self) -> Option<&OffsetDateTime> {
match self {
MetadataVersion::V1(_) => None,
MetadataVersion::V2(meta)
| MetadataVersion::V3(meta)
| MetadataVersion::V4(meta)
| MetadataVersion::V5(meta) => Some(&meta.dump_date),
}
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
#[serde(rename_all = "snake_case")]
pub enum DumpStatus {
Done,
InProgress,
Failed,
}
pub fn load_dump(
dst_path: impl AsRef<Path>,
src_path: impl AsRef<Path>,
ignore_dump_if_db_exists: bool,
ignore_missing_dump: bool,
index_db_size: usize,
update_db_size: usize,
indexer_opts: &IndexerOpts,
) -> anyhow::Result<()> {
let empty_db = crate::is_empty_db(&dst_path);
let src_path_exists = src_path.as_ref().exists();
if empty_db && src_path_exists {
let (tmp_src, tmp_dst, meta) = extract_dump(&dst_path, &src_path)?;
meta.load_dump(
tmp_src.path(),
tmp_dst.path(),
index_db_size,
update_db_size,
indexer_opts,
)?;
persist_dump(&dst_path, tmp_dst)?;
Ok(())
} else if !empty_db && !ignore_dump_if_db_exists {
bail!(
"database already exists at {:?}, try to delete it or rename it",
dst_path
.as_ref()
.canonicalize()
.unwrap_or_else(|_| dst_path.as_ref().to_owned())
)
} else if !src_path_exists && !ignore_missing_dump {
bail!("dump doesn't exist at {:?}", src_path.as_ref())
} else {
// there is nothing to do
Ok(())
}
}
fn extract_dump(
dst_path: impl AsRef<Path>,
src_path: impl AsRef<Path>,
) -> anyhow::Result<(TempDir, TempDir, MetadataVersion)> {
// Setup a temp directory path in the same path as the database, to prevent cross devices
// references.
let temp_path = dst_path
.as_ref()
.parent()
.map(ToOwned::to_owned)
.unwrap_or_else(|| ".".into());
let tmp_src = tempfile::tempdir_in(temp_path)?;
let tmp_src_path = tmp_src.path();
from_tar_gz(&src_path, tmp_src_path)?;
let meta_path = tmp_src_path.join(META_FILE_NAME);
let mut meta_file = File::open(&meta_path)?;
let meta: MetadataVersion = serde_json::from_reader(&mut meta_file)?;
if !dst_path.as_ref().exists() {
std::fs::create_dir_all(dst_path.as_ref())?;
}
let tmp_dst = tempfile::tempdir_in(dst_path.as_ref())?;
info!(
"Loading dump {}, dump database version: {}, dump version: {}",
meta.dump_date()
.map(|t| format!("from {}", t))
.unwrap_or_else(String::new),
meta.db_version(),
meta.version()
);
Ok((tmp_src, tmp_dst, meta))
}
fn persist_dump(dst_path: impl AsRef<Path>, tmp_dst: TempDir) -> anyhow::Result<()> {
let persisted_dump = tmp_dst.into_path();
// Delete everything in the `data.ms` except the tempdir.
if dst_path.as_ref().exists() {
for file in dst_path.as_ref().read_dir().unwrap() {
let file = file.unwrap().path();
if file.file_name() == persisted_dump.file_name() {
continue;
}
if file.is_file() {
std::fs::remove_file(&file)?;
} else {
std::fs::remove_dir_all(&file)?;
}
}
}
// Move the whole content of the tempdir into the `data.ms`.
for file in persisted_dump.read_dir().unwrap() {
let file = file.unwrap().path();
std::fs::rename(&file, &dst_path.as_ref().join(file.file_name().unwrap()))?;
}
// Delete the empty tempdir.
std::fs::remove_dir_all(&persisted_dump)?;
Ok(())
}

View File

@ -1,191 +0,0 @@
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use async_stream::stream;
use futures::{lock::Mutex, stream::StreamExt};
use log::{error, trace};
use time::macros::format_description;
use time::OffsetDateTime;
use tokio::sync::{mpsc, oneshot, RwLock};
use super::error::{DumpActorError, Result};
use super::{DumpInfo, DumpJob, DumpMsg, DumpStatus};
use crate::tasks::Scheduler;
use crate::update_file_store::UpdateFileStore;
pub const CONCURRENT_DUMP_MSG: usize = 10;
pub struct DumpActor {
inbox: Option<mpsc::Receiver<DumpMsg>>,
update_file_store: UpdateFileStore,
scheduler: Arc<RwLock<Scheduler>>,
dump_path: PathBuf,
analytics_path: PathBuf,
lock: Arc<Mutex<()>>,
dump_infos: Arc<RwLock<HashMap<String, DumpInfo>>>,
update_db_size: usize,
index_db_size: usize,
}
/// Generate uid from creation date
fn generate_uid() -> String {
OffsetDateTime::now_utc()
.format(format_description!(
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"
))
.unwrap()
}
impl DumpActor {
pub fn new(
inbox: mpsc::Receiver<DumpMsg>,
update_file_store: UpdateFileStore,
scheduler: Arc<RwLock<Scheduler>>,
dump_path: impl AsRef<Path>,
analytics_path: impl AsRef<Path>,
index_db_size: usize,
update_db_size: usize,
) -> Self {
let dump_infos = Arc::new(RwLock::new(HashMap::new()));
let lock = Arc::new(Mutex::new(()));
Self {
inbox: Some(inbox),
scheduler,
update_file_store,
dump_path: dump_path.as_ref().into(),
analytics_path: analytics_path.as_ref().into(),
dump_infos,
lock,
index_db_size,
update_db_size,
}
}
pub async fn run(mut self) {
trace!("Started dump actor.");
let mut inbox = self
.inbox
.take()
.expect("Dump Actor must have a inbox at this point.");
let stream = stream! {
loop {
match inbox.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
stream
.for_each_concurrent(Some(CONCURRENT_DUMP_MSG), |msg| self.handle_message(msg))
.await;
error!("Dump actor stopped.");
}
async fn handle_message(&self, msg: DumpMsg) {
use DumpMsg::*;
match msg {
CreateDump { ret } => {
let _ = self.handle_create_dump(ret).await;
}
DumpInfo { ret, uid } => {
let _ = ret.send(self.handle_dump_info(uid).await);
}
}
}
async fn handle_create_dump(&self, ret: oneshot::Sender<Result<DumpInfo>>) {
let uid = generate_uid();
let info = DumpInfo::new(uid.clone(), DumpStatus::InProgress);
let _lock = match self.lock.try_lock() {
Some(lock) => lock,
None => {
ret.send(Err(DumpActorError::DumpAlreadyRunning))
.expect("Dump actor is dead");
return;
}
};
self.dump_infos
.write()
.await
.insert(uid.clone(), info.clone());
ret.send(Ok(info)).expect("Dump actor is dead");
let task = DumpJob {
dump_path: self.dump_path.clone(),
db_path: self.analytics_path.clone(),
update_file_store: self.update_file_store.clone(),
scheduler: self.scheduler.clone(),
uid: uid.clone(),
update_db_size: self.update_db_size,
index_db_size: self.index_db_size,
};
let task_result = tokio::task::spawn_local(task.run()).await;
let mut dump_infos = self.dump_infos.write().await;
let dump_infos = dump_infos
.get_mut(&uid)
.expect("dump entry deleted while lock was acquired");
match task_result {
Ok(Ok(())) => {
dump_infos.done();
trace!("Dump succeed");
}
Ok(Err(e)) => {
dump_infos.with_error(e.to_string());
error!("Dump failed: {}", e);
}
Err(_) => {
dump_infos.with_error("Unexpected error while performing dump.".to_string());
error!("Dump panicked. Dump status set to failed");
}
};
}
async fn handle_dump_info(&self, uid: String) -> Result<DumpInfo> {
match self.dump_infos.read().await.get(&uid) {
Some(info) => Ok(info.clone()),
_ => Err(DumpActorError::DumpDoesNotExist(uid)),
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_generate_uid() {
let current = OffsetDateTime::now_utc();
let uid = generate_uid();
let (date, time) = uid.split_once('-').unwrap();
let date = time::Date::parse(
date,
&format_description!("[year repr:full][month repr:numerical][day padding:zero]"),
)
.unwrap();
let time = time::Time::parse(
time,
&format_description!(
"[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"
),
)
.unwrap();
let datetime = time::PrimitiveDateTime::new(date, time);
let datetime = datetime.assume_utc();
assert!(current - datetime < time::Duration::SECOND);
}
}

View File

@ -1,26 +0,0 @@
use tokio::sync::{mpsc, oneshot};
use super::error::Result;
use super::{DumpActorHandle, DumpInfo, DumpMsg};
#[derive(Clone)]
pub struct DumpActorHandleImpl {
pub sender: mpsc::Sender<DumpMsg>,
}
#[async_trait::async_trait]
impl DumpActorHandle for DumpActorHandleImpl {
async fn create_dump(&self) -> Result<DumpInfo> {
let (ret, receiver) = oneshot::channel();
let msg = DumpMsg::CreateDump { ret };
let _ = self.sender.send(msg).await;
receiver.await.expect("IndexActor has been killed")
}
async fn dump_info(&self, uid: String) -> Result<DumpInfo> {
let (ret, receiver) = oneshot::channel();
let msg = DumpMsg::DumpInfo { ret, uid };
let _ = self.sender.send(msg).await;
receiver.await.expect("IndexActor has been killed")
}
}

View File

@ -1,14 +0,0 @@
use tokio::sync::oneshot;
use super::error::Result;
use super::DumpInfo;
pub enum DumpMsg {
CreateDump {
ret: oneshot::Sender<Result<DumpInfo>>,
},
DumpInfo {
uid: String,
ret: oneshot::Sender<Result<DumpInfo>>,
},
}

View File

@ -1,510 +0,0 @@
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::bail;
use log::{info, trace};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
pub use actor::DumpActor;
pub use handle_impl::*;
use meilisearch_auth::AuthController;
pub use message::DumpMsg;
use tempfile::TempDir;
use tokio::fs::create_dir_all;
use tokio::sync::{oneshot, RwLock};
use crate::analytics;
use crate::compression::{from_tar_gz, to_tar_gz};
use crate::index_controller::dump_actor::error::DumpActorError;
use crate::index_controller::dump_actor::loaders::{v2, v3, v4};
use crate::options::IndexerOpts;
use crate::tasks::task::Job;
use crate::tasks::Scheduler;
use crate::update_file_store::UpdateFileStore;
use error::Result;
mod actor;
mod compat;
pub mod error;
mod handle_impl;
mod loaders;
mod message;
const META_FILE_NAME: &str = "metadata.json";
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Metadata {
db_version: String,
index_db_size: usize,
update_db_size: usize,
#[serde(with = "time::serde::rfc3339")]
dump_date: OffsetDateTime,
}
impl Metadata {
pub fn new(index_db_size: usize, update_db_size: usize) -> Self {
Self {
db_version: env!("CARGO_PKG_VERSION").to_string(),
index_db_size,
update_db_size,
dump_date: OffsetDateTime::now_utc(),
}
}
}
#[async_trait::async_trait]
#[cfg_attr(test, mockall::automock)]
pub trait DumpActorHandle {
/// Start the creation of a dump
/// Implementation: [handle_impl::DumpActorHandleImpl::create_dump]
async fn create_dump(&self) -> Result<DumpInfo>;
/// Return the status of an already created dump
/// Implementation: [handle_impl::DumpActorHandleImpl::dump_info]
async fn dump_info(&self, uid: String) -> Result<DumpInfo>;
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MetadataV1 {
pub db_version: String,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "dumpVersion")]
pub enum MetadataVersion {
V1(MetadataV1),
V2(Metadata),
V3(Metadata),
V4(Metadata),
}
impl MetadataVersion {
pub fn load_dump(
self,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
index_db_size: usize,
meta_env_size: usize,
indexing_options: &IndexerOpts,
) -> anyhow::Result<()> {
match self {
MetadataVersion::V1(_meta) => {
anyhow::bail!("The version 1 of the dumps is not supported anymore. You can re-export your dump from a version between 0.21 and 0.24, or start fresh from a version 0.25 onwards.")
}
MetadataVersion::V2(meta) => v2::load_dump(
meta,
src,
dst,
index_db_size,
meta_env_size,
indexing_options,
)?,
MetadataVersion::V3(meta) => v3::load_dump(
meta,
src,
dst,
index_db_size,
meta_env_size,
indexing_options,
)?,
MetadataVersion::V4(meta) => v4::load_dump(
meta,
src,
dst,
index_db_size,
meta_env_size,
indexing_options,
)?,
}
Ok(())
}
pub fn new_v4(index_db_size: usize, update_db_size: usize) -> Self {
let meta = Metadata::new(index_db_size, update_db_size);
Self::V4(meta)
}
pub fn db_version(&self) -> &str {
match self {
Self::V1(meta) => &meta.db_version,
Self::V2(meta) | Self::V3(meta) | Self::V4(meta) => &meta.db_version,
}
}
pub fn version(&self) -> &str {
match self {
MetadataVersion::V1(_) => "V1",
MetadataVersion::V2(_) => "V2",
MetadataVersion::V3(_) => "V3",
MetadataVersion::V4(_) => "V4",
}
}
pub fn dump_date(&self) -> Option<&OffsetDateTime> {
match self {
MetadataVersion::V1(_) => None,
MetadataVersion::V2(meta) | MetadataVersion::V3(meta) | MetadataVersion::V4(meta) => {
Some(&meta.dump_date)
}
}
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)]
#[serde(rename_all = "snake_case")]
pub enum DumpStatus {
Done,
InProgress,
Failed,
}
#[derive(Debug, Serialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct DumpInfo {
pub uid: String,
pub status: DumpStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(with = "time::serde::rfc3339")]
started_at: OffsetDateTime,
#[serde(
skip_serializing_if = "Option::is_none",
with = "time::serde::rfc3339::option"
)]
finished_at: Option<OffsetDateTime>,
}
impl DumpInfo {
pub fn new(uid: String, status: DumpStatus) -> Self {
Self {
uid,
status,
error: None,
started_at: OffsetDateTime::now_utc(),
finished_at: None,
}
}
pub fn with_error(&mut self, error: String) {
self.status = DumpStatus::Failed;
self.finished_at = Some(OffsetDateTime::now_utc());
self.error = Some(error);
}
pub fn done(&mut self) {
self.finished_at = Some(OffsetDateTime::now_utc());
self.status = DumpStatus::Done;
}
pub fn dump_already_in_progress(&self) -> bool {
self.status == DumpStatus::InProgress
}
}
pub fn load_dump(
dst_path: impl AsRef<Path>,
src_path: impl AsRef<Path>,
ignore_dump_if_db_exists: bool,
ignore_missing_dump: bool,
index_db_size: usize,
update_db_size: usize,
indexer_opts: &IndexerOpts,
) -> anyhow::Result<()> {
let empty_db = crate::is_empty_db(&dst_path);
let src_path_exists = src_path.as_ref().exists();
if empty_db && src_path_exists {
let (tmp_src, tmp_dst, meta) = extract_dump(&dst_path, &src_path)?;
meta.load_dump(
tmp_src.path(),
tmp_dst.path(),
index_db_size,
update_db_size,
indexer_opts,
)?;
persist_dump(&dst_path, tmp_dst)?;
Ok(())
} else if !empty_db && !ignore_dump_if_db_exists {
bail!(
"database already exists at {:?}, try to delete it or rename it",
dst_path
.as_ref()
.canonicalize()
.unwrap_or_else(|_| dst_path.as_ref().to_owned())
)
} else if !src_path_exists && !ignore_missing_dump {
bail!("dump doesn't exist at {:?}", src_path.as_ref())
} else {
// there is nothing to do
Ok(())
}
}
fn extract_dump(
dst_path: impl AsRef<Path>,
src_path: impl AsRef<Path>,
) -> anyhow::Result<(TempDir, TempDir, MetadataVersion)> {
// Setup a temp directory path in the same path as the database, to prevent cross devices
// references.
let temp_path = dst_path
.as_ref()
.parent()
.map(ToOwned::to_owned)
.unwrap_or_else(|| ".".into());
let tmp_src = tempfile::tempdir_in(temp_path)?;
let tmp_src_path = tmp_src.path();
from_tar_gz(&src_path, tmp_src_path)?;
let meta_path = tmp_src_path.join(META_FILE_NAME);
let mut meta_file = File::open(&meta_path)?;
let meta: MetadataVersion = serde_json::from_reader(&mut meta_file)?;
if !dst_path.as_ref().exists() {
std::fs::create_dir_all(dst_path.as_ref())?;
}
let tmp_dst = tempfile::tempdir_in(dst_path.as_ref())?;
info!(
"Loading dump {}, dump database version: {}, dump version: {}",
meta.dump_date()
.map(|t| format!("from {}", t))
.unwrap_or_else(String::new),
meta.db_version(),
meta.version()
);
Ok((tmp_src, tmp_dst, meta))
}
fn persist_dump(dst_path: impl AsRef<Path>, tmp_dst: TempDir) -> anyhow::Result<()> {
let persisted_dump = tmp_dst.into_path();
// Delete everything in the `data.ms` except the tempdir.
if dst_path.as_ref().exists() {
for file in dst_path.as_ref().read_dir().unwrap() {
let file = file.unwrap().path();
if file.file_name() == persisted_dump.file_name() {
continue;
}
if file.is_file() {
std::fs::remove_file(&file)?;
} else {
std::fs::remove_dir_all(&file)?;
}
}
}
// Move the whole content of the tempdir into the `data.ms`.
for file in persisted_dump.read_dir().unwrap() {
let file = file.unwrap().path();
std::fs::rename(&file, &dst_path.as_ref().join(file.file_name().unwrap()))?;
}
// Delete the empty tempdir.
std::fs::remove_dir_all(&persisted_dump)?;
Ok(())
}
struct DumpJob {
dump_path: PathBuf,
db_path: PathBuf,
update_file_store: UpdateFileStore,
scheduler: Arc<RwLock<Scheduler>>,
uid: String,
update_db_size: usize,
index_db_size: usize,
}
impl DumpJob {
async fn run(self) -> Result<()> {
trace!("Performing dump.");
create_dir_all(&self.dump_path).await?;
let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??;
let temp_dump_path = temp_dump_dir.path().to_owned();
let meta = MetadataVersion::new_v4(self.index_db_size, self.update_db_size);
let meta_path = temp_dump_path.join(META_FILE_NAME);
let mut meta_file = File::create(&meta_path)?;
serde_json::to_writer(&mut meta_file, &meta)?;
analytics::copy_user_id(&self.db_path, &temp_dump_path);
create_dir_all(&temp_dump_path.join("indexes")).await?;
let (sender, receiver) = oneshot::channel();
self.scheduler
.write()
.await
.schedule_job(Job::Dump {
ret: sender,
path: temp_dump_path.clone(),
})
.await;
// wait until the job has started performing before finishing the dump process
let sender = receiver.await??;
AuthController::dump(&self.db_path, &temp_dump_path)?;
//TODO(marin): this is not right, the scheduler should dump itself, not do it here...
self.scheduler
.read()
.await
.dump(&temp_dump_path, self.update_file_store.clone())
.await?;
let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> {
// for now we simply copy the updates/updates_files
// FIXME: We may copy more files than necessary, if new files are added while we are
// performing the dump. We need a way to filter them out.
let temp_dump_file = tempfile::NamedTempFile::new_in(&self.dump_path)?;
to_tar_gz(temp_dump_path, temp_dump_file.path())
.map_err(|e| DumpActorError::Internal(e.into()))?;
let dump_path = self.dump_path.join(self.uid).with_extension("dump");
temp_dump_file.persist(&dump_path)?;
Ok(dump_path)
})
.await??;
// notify the update loop that we are finished performing the dump.
let _ = sender.send(());
info!("Created dump in {:?}.", dump_path);
Ok(())
}
}
#[cfg(test)]
mod test {
use nelson::Mocker;
use once_cell::sync::Lazy;
use super::*;
use crate::index_resolver::error::IndexResolverError;
use crate::options::SchedulerConfig;
use crate::tasks::error::Result as TaskResult;
use crate::tasks::task::{Task, TaskId};
use crate::tasks::{MockTaskPerformer, TaskFilter, TaskStore};
use crate::update_file_store::UpdateFileStore;
fn setup() {
static SETUP: Lazy<()> = Lazy::new(|| {
if cfg!(windows) {
std::env::set_var("TMP", ".");
} else {
std::env::set_var("TMPDIR", ".");
}
});
// just deref to make sure the env is setup
*SETUP
}
#[actix_rt::test]
async fn test_dump_normal() {
setup();
let tmp = tempfile::tempdir().unwrap();
let mocker = Mocker::default();
let update_file_store = UpdateFileStore::mock(mocker);
let mut performer = MockTaskPerformer::new();
performer
.expect_process_job()
.once()
.returning(|j| match j {
Job::Dump { ret, .. } => {
let (sender, _receiver) = oneshot::channel();
ret.send(Ok(sender)).unwrap();
}
_ => unreachable!(),
});
let performer = Arc::new(performer);
let mocker = Mocker::default();
mocker
.when::<(&Path, UpdateFileStore), TaskResult<()>>("dump")
.then(|_| Ok(()));
mocker
.when::<(Option<TaskId>, Option<TaskFilter>, Option<usize>), TaskResult<Vec<Task>>>(
"list_tasks",
)
.then(|_| Ok(Vec::new()));
let store = TaskStore::mock(mocker);
let config = SchedulerConfig::default();
let scheduler = Scheduler::new(store, performer, config).unwrap();
let task = DumpJob {
dump_path: tmp.path().into(),
// this should do nothing
update_file_store,
db_path: tmp.path().into(),
uid: String::from("test"),
update_db_size: 4096 * 10,
index_db_size: 4096 * 10,
scheduler,
};
task.run().await.unwrap();
}
#[actix_rt::test]
async fn error_performing_dump() {
let tmp = tempfile::tempdir().unwrap();
let mocker = Mocker::default();
let file_store = UpdateFileStore::mock(mocker);
let mocker = Mocker::default();
mocker
.when::<(Option<TaskId>, Option<TaskFilter>, Option<usize>), TaskResult<Vec<Task>>>(
"list_tasks",
)
.then(|_| Ok(Vec::new()));
let task_store = TaskStore::mock(mocker);
let mut performer = MockTaskPerformer::new();
performer
.expect_process_job()
.once()
.returning(|job| match job {
Job::Dump { ret, .. } => drop(ret.send(Err(IndexResolverError::BadlyFormatted(
"blabla".to_string(),
)))),
_ => unreachable!(),
});
let performer = Arc::new(performer);
let scheduler = Scheduler::new(task_store, performer, SchedulerConfig::default()).unwrap();
let task = DumpJob {
dump_path: tmp.path().into(),
// this should do nothing
db_path: tmp.path().into(),
update_file_store: file_store,
uid: String::from("test"),
update_db_size: 4096 * 10,
index_db_size: 4096 * 10,
scheduler,
};
assert!(task.run().await.is_err());
}
}

View File

@ -6,11 +6,11 @@ use tokio::task::JoinError;
use super::DocumentAdditionFormat;
use crate::document_formats::DocumentFormatError;
use crate::dump::error::DumpError;
use crate::index::error::IndexError;
use crate::tasks::error::TaskError;
use crate::update_file_store::UpdateFileStoreError;
use super::dump_actor::error::DumpActorError;
use crate::index_resolver::error::IndexResolverError;
pub type Result<T> = std::result::Result<T, IndexControllerError>;
@ -28,7 +28,7 @@ pub enum IndexControllerError {
#[error("{0}")]
TaskError(#[from] TaskError),
#[error("{0}")]
DumpError(#[from] DumpActorError),
DumpError(#[from] DumpError),
#[error("{0}")]
DocumentFormatError(#[from] DocumentFormatError),
#[error("A {0} payload is missing.")]

View File

@ -13,31 +13,31 @@ use futures::StreamExt;
use milli::update::IndexDocumentsMethod;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tokio::sync::{mpsc, RwLock};
use tokio::sync::RwLock;
use tokio::task::spawn_blocking;
use tokio::time::sleep;
use uuid::Uuid;
use crate::document_formats::{read_csv, read_json, read_ndjson};
use crate::dump::{self, load_dump, DumpHandler};
use crate::index::{
Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked,
};
use crate::index_controller::dump_actor::{load_dump, DumpActor, DumpActorHandleImpl};
use crate::options::{IndexerOpts, SchedulerConfig};
use crate::snapshot::{load_snapshot, SnapshotService};
use crate::tasks::error::TaskError;
use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskId};
use crate::tasks::{Scheduler, TaskFilter, TaskStore};
use crate::tasks::{
BatchHandler, EmptyBatchHandler, Scheduler, SnapshotHandler, TaskFilter, TaskStore,
};
use error::Result;
use self::dump_actor::{DumpActorHandle, DumpInfo};
use self::error::IndexControllerError;
use crate::index_resolver::index_store::{IndexStore, MapIndexStore};
use crate::index_resolver::meta_store::{HeedMetaStore, IndexMetaStore};
use crate::index_resolver::{create_index_resolver, IndexResolver, IndexUid};
use crate::update_file_store::UpdateFileStore;
mod dump_actor;
pub mod error;
pub mod versioning;
@ -73,11 +73,10 @@ pub struct IndexSettings {
}
pub struct IndexController<U, I> {
index_resolver: Arc<IndexResolver<U, I>>,
pub index_resolver: Arc<IndexResolver<U, I>>,
scheduler: Arc<RwLock<Scheduler>>,
task_store: TaskStore,
dump_handle: dump_actor::DumpActorHandleImpl,
update_file_store: UpdateFileStore,
pub update_file_store: UpdateFileStore,
}
/// Need a custom implementation for clone because deriving require that U and I are clone.
@ -86,7 +85,6 @@ impl<U, I> Clone for IndexController<U, I> {
Self {
index_resolver: self.index_resolver.clone(),
scheduler: self.scheduler.clone(),
dump_handle: self.dump_handle.clone(),
update_file_store: self.update_file_store.clone(),
task_store: self.task_store.clone(),
}
@ -220,30 +218,30 @@ impl IndexControllerBuilder {
update_file_store.clone(),
)?);
let task_store = TaskStore::new(meta_env)?;
let scheduler =
Scheduler::new(task_store.clone(), index_resolver.clone(), scheduler_config)?;
let dump_path = self
.dump_dst
.ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?;
let dump_handle = {
let analytics_path = &db_path;
let (sender, receiver) = mpsc::channel(10);
let actor = DumpActor::new(
receiver,
update_file_store.clone(),
scheduler.clone(),
dump_path,
analytics_path,
index_size,
task_store_size,
);
tokio::task::spawn_local(actor.run());
let dump_handler = Arc::new(DumpHandler::new(
dump_path,
db_path.as_ref().into(),
update_file_store.clone(),
task_store_size,
index_size,
meta_env.clone(),
index_resolver.clone(),
));
let task_store = TaskStore::new(meta_env)?;
DumpActorHandleImpl { sender }
};
// register all the batch handlers for use with the scheduler.
let handlers: Vec<Arc<dyn BatchHandler + Sync + Send + 'static>> = vec![
index_resolver.clone(),
dump_handler,
Arc::new(SnapshotHandler),
// dummy handler to catch all empty batches
Arc::new(EmptyBatchHandler),
];
let scheduler = Scheduler::new(task_store.clone(), handlers, scheduler_config)?;
if self.schedule_snapshot {
let snapshot_period = self
@ -268,7 +266,6 @@ impl IndexControllerBuilder {
Ok(IndexController {
index_resolver,
scheduler,
dump_handle,
update_file_store,
task_store,
})
@ -419,12 +416,20 @@ where
Update::UpdateIndex { primary_key } => TaskContent::IndexUpdate { primary_key },
};
let task = self.task_store.register(uid, content).await?;
let task = self.task_store.register(Some(uid), content).await?;
self.scheduler.read().await.notify();
Ok(task)
}
pub async fn register_dump_task(&self) -> Result<Task> {
let uid = dump::generate_uid();
let content = TaskContent::Dump { uid };
let task = self.task_store.register(None, content).await?;
self.scheduler.read().await.notify();
Ok(task)
}
pub async fn get_task(&self, id: TaskId, filter: Option<TaskFilter>) -> Result<Task> {
let task = self.scheduler.read().await.get_task(id, filter).await?;
Ok(task)
@ -569,7 +574,12 @@ where
// Check if the currently indexing update is from our index.
let is_indexing = processing_tasks
.first()
.map(|task| task.index_uid.as_str() == uid)
.map(|task| {
task.index_uid
.as_ref()
.map(|u| u.as_str() == uid)
.unwrap_or(false)
})
.unwrap_or_default();
let index = self.index_resolver.get_index(uid).await?;
@ -605,7 +615,7 @@ where
// Check if the currently indexing update is from our index.
stats.is_indexing = processing_tasks
.first()
.map(|p| p.index_uid.as_str() == index_uid)
.and_then(|p| p.index_uid.as_ref().map(|u| u.as_str() == index_uid))
.or(Some(false));
indexes.insert(index_uid, stats);
@ -617,14 +627,6 @@ where
indexes,
})
}
pub async fn create_dump(&self) -> Result<DumpInfo> {
Ok(self.dump_handle.create_dump().await?)
}
pub async fn dump_info(&self, uid: String) -> Result<DumpInfo> {
Ok(self.dump_handle.dump_info(uid).await?)
}
}
pub async fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T {
@ -662,13 +664,11 @@ mod test {
index_resolver: Arc<IndexResolver<MockIndexMetaStore, MockIndexStore>>,
task_store: TaskStore,
update_file_store: UpdateFileStore,
dump_handle: DumpActorHandleImpl,
scheduler: Arc<RwLock<Scheduler>>,
) -> Self {
IndexController {
index_resolver,
task_store,
dump_handle,
update_file_store,
scheduler,
}
@ -752,19 +752,12 @@ mod test {
let task_store = TaskStore::mock(task_store_mocker);
let scheduler = Scheduler::new(
task_store.clone(),
index_resolver.clone(),
vec![index_resolver.clone()],
SchedulerConfig::default(),
)
.unwrap();
let (sender, _) = mpsc::channel(1);
let dump_handle = DumpActorHandleImpl { sender };
let index_controller = IndexController::mock(
index_resolver,
task_store,
update_file_store,
dump_handle,
scheduler,
);
let index_controller =
IndexController::mock(index_resolver, task_store, update_file_store, scheduler);
let r = index_controller
.search(index_uid.to_owned(), query.clone())

View File

@ -1,37 +0,0 @@
use std::{collections::HashSet, path::PathBuf};
use tokio::sync::oneshot;
use uuid::Uuid;
use crate::index::Index;
use super::error::Result;
pub enum IndexResolverMsg {
Get {
uid: String,
ret: oneshot::Sender<Result<Index>>,
},
Delete {
uid: String,
ret: oneshot::Sender<Result<Index>>,
},
List {
ret: oneshot::Sender<Result<Vec<(String, Index)>>>,
},
Insert {
uuid: Uuid,
name: String,
ret: oneshot::Sender<Result<()>>,
},
SnapshotRequest {
path: PathBuf,
ret: oneshot::Sender<Result<HashSet<Index>>>,
},
GetSize {
ret: oneshot::Sender<Result<u64>>,
},
DumpRequest {
path: PathBuf,
ret: oneshot::Sender<Result<HashSet<Index>>>,
},
}

View File

@ -14,15 +14,12 @@ use milli::heed::Env;
use milli::update::{DocumentDeletionResult, IndexerConfig};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tokio::sync::oneshot;
use tokio::task::spawn_blocking;
use uuid::Uuid;
use crate::index::{error::Result as IndexResult, Index};
use crate::options::IndexerOpts;
use crate::tasks::batch::Batch;
use crate::tasks::task::{DocumentDeletion, Job, Task, TaskContent, TaskEvent, TaskId, TaskResult};
use crate::tasks::TaskPerformer;
use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskEvent, TaskId, TaskResult};
use crate::update_file_store::UpdateFileStore;
use self::meta_store::IndexMeta;
@ -91,69 +88,10 @@ impl TryInto<IndexUid> for String {
}
}
#[async_trait::async_trait]
impl<U, I> TaskPerformer for IndexResolver<U, I>
where
U: IndexMetaStore + Send + Sync + 'static,
I: IndexStore + Send + Sync + 'static,
{
async fn process_batch(&self, mut batch: Batch) -> Batch {
// If a batch contains multiple tasks, then it must be a document addition batch
if let Some(Task {
content: TaskContent::DocumentAddition { .. },
..
}) = batch.tasks.first()
{
debug_assert!(batch.tasks.iter().all(|t| matches!(
t,
Task {
content: TaskContent::DocumentAddition { .. },
..
}
)));
self.process_document_addition_batch(batch).await
} else {
if let Some(task) = batch.tasks.first_mut() {
task.events
.push(TaskEvent::Processing(OffsetDateTime::now_utc()));
match self.process_task(task).await {
Ok(success) => {
task.events.push(TaskEvent::Succeded {
result: success,
timestamp: OffsetDateTime::now_utc(),
});
}
Err(err) => task.events.push(TaskEvent::Failed {
error: err.into(),
timestamp: OffsetDateTime::now_utc(),
}),
}
}
batch
}
}
async fn process_job(&self, job: Job) {
self.process_job(job).await;
}
async fn finish(&self, batch: &Batch) {
for task in &batch.tasks {
if let Some(content_uuid) = task.get_content_uuid() {
if let Err(e) = self.file_store.delete(content_uuid).await {
log::error!("error deleting update file: {}", e);
}
}
}
}
}
pub struct IndexResolver<U, I> {
index_uuid_store: U,
index_store: I,
file_store: UpdateFileStore,
pub file_store: UpdateFileStore,
}
impl IndexResolver<HeedMetaStore, MapIndexStore> {
@ -189,7 +127,7 @@ where
}
}
async fn process_document_addition_batch(&self, mut batch: Batch) -> Batch {
pub async fn process_document_addition_batch(&self, mut tasks: Vec<Task>) -> Vec<Task> {
fn get_content_uuid(task: &Task) -> Uuid {
match task {
Task {
@ -200,11 +138,11 @@ where
}
}
let content_uuids = batch.tasks.iter().map(get_content_uuid).collect::<Vec<_>>();
let content_uuids = tasks.iter().map(get_content_uuid).collect::<Vec<_>>();
match batch.tasks.first() {
match tasks.first() {
Some(Task {
index_uid,
index_uid: Some(ref index_uid),
id,
content:
TaskContent::DocumentAddition {
@ -231,13 +169,13 @@ where
Ok(index) => index,
Err(e) => {
let error = ResponseError::from(e);
for task in batch.tasks.iter_mut() {
for task in tasks.iter_mut() {
task.events.push(TaskEvent::Failed {
error: error.clone(),
timestamp: now,
});
}
return batch;
return tasks;
}
};
@ -269,23 +207,23 @@ where
},
};
for task in batch.tasks.iter_mut() {
for task in tasks.iter_mut() {
task.events.push(event.clone());
}
batch
tasks
}
_ => panic!("invalid batch!"),
}
}
async fn process_task(&self, task: &Task) -> Result<TaskResult> {
pub async fn process_task(&self, task: &Task) -> Result<TaskResult> {
let index_uid = task.index_uid.clone();
match &task.content {
TaskContent::DocumentAddition { .. } => panic!("updates should be handled by batch"),
TaskContent::DocumentDeletion(DocumentDeletion::Ids(ids)) => {
let ids = ids.clone();
let index = self.get_index(index_uid.into_inner()).await?;
let index = self.get_index(index_uid.unwrap().into_inner()).await?;
let DocumentDeletionResult {
deleted_documents, ..
@ -294,7 +232,7 @@ where
Ok(TaskResult::DocumentDeletion { deleted_documents })
}
TaskContent::DocumentDeletion(DocumentDeletion::Clear) => {
let index = self.get_index(index_uid.into_inner()).await?;
let index = self.get_index(index_uid.unwrap().into_inner()).await?;
let deleted_documents = spawn_blocking(move || -> IndexResult<u64> {
let number_documents = index.stats()?.number_of_documents;
index.clear_documents()?;
@ -310,9 +248,10 @@ where
allow_index_creation,
} => {
let index = if *is_deletion || !*allow_index_creation {
self.get_index(index_uid.into_inner()).await?
self.get_index(index_uid.unwrap().into_inner()).await?
} else {
self.get_or_create_index(index_uid, task.id).await?
self.get_or_create_index(index_uid.unwrap(), task.id)
.await?
};
let settings = settings.clone();
@ -321,7 +260,7 @@ where
Ok(TaskResult::Other)
}
TaskContent::IndexDeletion => {
let index = self.delete_index(index_uid.into_inner()).await?;
let index = self.delete_index(index_uid.unwrap().into_inner()).await?;
let deleted_documents = spawn_blocking(move || -> IndexResult<u64> {
Ok(index.stats()?.number_of_documents)
@ -331,7 +270,7 @@ where
Ok(TaskResult::ClearAll { deleted_documents })
}
TaskContent::IndexCreation { primary_key } => {
let index = self.create_index(index_uid, task.id).await?;
let index = self.create_index(index_uid.unwrap(), task.id).await?;
if let Some(primary_key) = primary_key {
let primary_key = primary_key.clone();
@ -341,7 +280,7 @@ where
Ok(TaskResult::Other)
}
TaskContent::IndexUpdate { primary_key } => {
let index = self.get_index(index_uid.into_inner()).await?;
let index = self.get_index(index_uid.unwrap().into_inner()).await?;
if let Some(primary_key) = primary_key {
let primary_key = primary_key.clone();
@ -350,28 +289,7 @@ where
Ok(TaskResult::Other)
}
}
}
async fn process_job(&self, job: Job) {
match job {
Job::Dump { ret, path } => {
log::trace!("The Dump task is getting executed");
let (sender, receiver) = oneshot::channel();
if ret.send(self.dump(path).await.map(|_| sender)).is_err() {
log::error!("The dump actor died.");
}
// wait until the dump has finished performing.
let _ = receiver.await;
}
Job::Empty => log::error!("Tried to process an empty task."),
Job::Snapshot(job) => {
if let Err(e) = job.run().await {
log::error!("Error performing snapshot: {}", e);
}
}
_ => unreachable!("Invalid task for index resolver"),
}
}
@ -493,17 +411,23 @@ mod test {
use nelson::Mocker;
use proptest::prelude::*;
use crate::index::{
error::{IndexError, Result as IndexResult},
Checked, IndexMeta, IndexStats, Settings,
use crate::{
index::{
error::{IndexError, Result as IndexResult},
Checked, IndexMeta, IndexStats, Settings,
},
tasks::{batch::Batch, BatchHandler},
};
use index_store::MockIndexStore;
use meta_store::MockIndexMetaStore;
// TODO: ignoring this test, it has become too complex to maintain, and rather implement
// handler logic test.
proptest! {
#[test]
#[ignore]
fn test_process_task(
task in any::<Task>(),
task in any::<Task>().prop_filter("IndexUid should be Some", |s| s.index_uid.is_some()),
index_exists in any::<bool>(),
index_op_fails in any::<bool>(),
any_int in any::<u64>(),
@ -579,6 +503,7 @@ mod test {
.then(move |_| result());
}
}
TaskContent::Dump { .. } => { }
}
mocker.when::<(), IndexResult<IndexStats>>("stats")
@ -607,6 +532,7 @@ mod test {
}
// if index already exists, create index will return an error
TaskContent::IndexCreation { .. } if index_exists => (),
TaskContent::Dump { .. } => (),
// The index exists and get should be called
_ if index_exists => {
index_store
@ -641,24 +567,26 @@ mod test {
let update_file_store = UpdateFileStore::mock(mocker);
let index_resolver = IndexResolver::new(uuid_store, index_store, update_file_store);
let batch = Batch { id: 1, created_at: OffsetDateTime::now_utc(), tasks: vec![task.clone()] };
let result = index_resolver.process_batch(batch).await;
let batch = Batch { id: Some(1), created_at: OffsetDateTime::now_utc(), content: crate::tasks::batch::BatchContent::IndexUpdate(task.clone()) };
if index_resolver.accept(&batch) {
let result = index_resolver.process_batch(batch).await;
// Test for some expected output scenarios:
// Index creation and deletion cannot fail because of a failed index op, since they
// don't perform index ops.
if index_op_fails && !matches!(task.content, TaskContent::IndexDeletion | TaskContent::IndexCreation { primary_key: None } | TaskContent::IndexUpdate { primary_key: None })
|| (index_exists && matches!(task.content, TaskContent::IndexCreation { .. }))
|| (!index_exists && matches!(task.content, TaskContent::IndexDeletion
| TaskContent::DocumentDeletion(_)
| TaskContent::SettingsUpdate { is_deletion: true, ..}
| TaskContent::SettingsUpdate { allow_index_creation: false, ..}
| TaskContent::DocumentAddition { allow_index_creation: false, ..}
| TaskContent::IndexUpdate { .. } ))
{
assert!(matches!(result.tasks[0].events.last().unwrap(), TaskEvent::Failed { .. }), "{:?}", result);
} else {
assert!(matches!(result.tasks[0].events.last().unwrap(), TaskEvent::Succeded { .. }), "{:?}", result);
// Test for some expected output scenarios:
// Index creation and deletion cannot fail because of a failed index op, since they
// don't perform index ops.
if index_op_fails && !matches!(task.content, TaskContent::IndexDeletion | TaskContent::IndexCreation { primary_key: None } | TaskContent::IndexUpdate { primary_key: None } | TaskContent::Dump { .. })
|| (index_exists && matches!(task.content, TaskContent::IndexCreation { .. }))
|| (!index_exists && matches!(task.content, TaskContent::IndexDeletion
| TaskContent::DocumentDeletion(_)
| TaskContent::SettingsUpdate { is_deletion: true, ..}
| TaskContent::SettingsUpdate { allow_index_creation: false, ..}
| TaskContent::DocumentAddition { allow_index_creation: false, ..}
| TaskContent::IndexUpdate { .. } ))
{
assert!(matches!(result.content.first().unwrap().events.last().unwrap(), TaskEvent::Failed { .. }), "{:?}", result);
} else {
assert!(matches!(result.content.first().unwrap().events.last().unwrap(), TaskEvent::Succeded { .. }), "{:?}", result);
}
}
});
}

View File

@ -3,6 +3,7 @@ pub mod error;
pub mod options;
mod analytics;
mod dump;
pub mod index;
pub mod index_controller;
mod index_resolver;

View File

@ -14,7 +14,6 @@ use walkdir::WalkDir;
use crate::compression::from_tar_gz;
use crate::index_controller::open_meta_env;
use crate::index_controller::versioning::VERSION_FILE_NAME;
use crate::tasks::task::Job;
use crate::tasks::Scheduler;
pub struct SnapshotService {
@ -39,8 +38,7 @@ impl SnapshotService {
meta_env_size: self.meta_env_size,
index_size: self.index_size,
};
let job = Job::Snapshot(snapshot_job);
self.scheduler.write().await.schedule_job(job).await;
self.scheduler.write().await.schedule_snapshot(snapshot_job);
sleep(self.snapshot_period).await;
}
}

View File

@ -1,22 +1,75 @@
use time::OffsetDateTime;
use super::task::Task;
use crate::snapshot::SnapshotJob;
use super::task::{Task, TaskEvent};
pub type BatchId = u64;
#[derive(Debug)]
pub enum BatchContent {
DocumentsAdditionBatch(Vec<Task>),
IndexUpdate(Task),
Dump(Task),
Snapshot(SnapshotJob),
// Symbolizes a empty batch. This can occur when we were woken, but there wasn't any work to do.
Empty,
}
impl BatchContent {
pub fn first(&self) -> Option<&Task> {
match self {
BatchContent::DocumentsAdditionBatch(ts) => ts.first(),
BatchContent::Dump(t) | BatchContent::IndexUpdate(t) => Some(t),
BatchContent::Snapshot(_) | BatchContent::Empty => None,
}
}
pub fn push_event(&mut self, event: TaskEvent) {
match self {
BatchContent::DocumentsAdditionBatch(ts) => {
ts.iter_mut().for_each(|t| t.events.push(event.clone()))
}
BatchContent::IndexUpdate(t) | BatchContent::Dump(t) => t.events.push(event),
BatchContent::Snapshot(_) | BatchContent::Empty => (),
}
}
}
#[derive(Debug)]
pub struct Batch {
pub id: BatchId,
// Only batches that contains a persistant tasks are given an id. Snapshot batches don't have
// an id.
pub id: Option<BatchId>,
pub created_at: OffsetDateTime,
pub tasks: Vec<Task>,
pub content: BatchContent,
}
impl Batch {
pub fn new(id: Option<BatchId>, content: BatchContent) -> Self {
Self {
id,
created_at: OffsetDateTime::now_utc(),
content,
}
}
pub fn len(&self) -> usize {
self.tasks.len()
match self.content {
BatchContent::DocumentsAdditionBatch(ref ts) => ts.len(),
BatchContent::IndexUpdate(_) | BatchContent::Dump(_) | BatchContent::Snapshot(_) => 1,
BatchContent::Empty => 0,
}
}
pub fn is_empty(&self) -> bool {
self.tasks.is_empty()
self.len() == 0
}
pub fn empty() -> Self {
Self {
id: None,
created_at: OffsetDateTime::now_utc(),
content: BatchContent::Empty,
}
}
}

View File

@ -0,0 +1,132 @@
use crate::dump::DumpHandler;
use crate::index_resolver::index_store::IndexStore;
use crate::index_resolver::meta_store::IndexMetaStore;
use crate::tasks::batch::{Batch, BatchContent};
use crate::tasks::task::{Task, TaskContent, TaskEvent, TaskResult};
use crate::tasks::BatchHandler;
#[async_trait::async_trait]
impl<U, I> BatchHandler for DumpHandler<U, I>
where
U: IndexMetaStore + Sync + Send + 'static,
I: IndexStore + Sync + Send + 'static,
{
fn accept(&self, batch: &Batch) -> bool {
matches!(batch.content, BatchContent::Dump { .. })
}
async fn process_batch(&self, mut batch: Batch) -> Batch {
match &batch.content {
BatchContent::Dump(Task {
content: TaskContent::Dump { uid },
..
}) => {
match self.run(uid.clone()).await {
Ok(_) => {
batch
.content
.push_event(TaskEvent::succeeded(TaskResult::Other));
}
Err(e) => batch.content.push_event(TaskEvent::failed(e.into())),
}
batch
}
_ => unreachable!("invalid batch content for dump"),
}
}
async fn finish(&self, _: &Batch) {}
}
#[cfg(test)]
mod test {
use crate::dump::error::{DumpError, Result as DumpResult};
use crate::index_resolver::{index_store::MockIndexStore, meta_store::MockIndexMetaStore};
use crate::tasks::handlers::test::task_to_batch;
use super::*;
use nelson::Mocker;
use proptest::prelude::*;
proptest! {
#[test]
fn finish_does_nothing(
task in any::<Task>(),
) {
let rt = tokio::runtime::Runtime::new().unwrap();
let handle = rt.spawn(async {
let batch = task_to_batch(task);
let mocker = Mocker::default();
let dump_handler = DumpHandler::<MockIndexMetaStore, MockIndexStore>::mock(mocker);
dump_handler.finish(&batch).await;
});
rt.block_on(handle).unwrap();
}
#[test]
fn test_handle_dump_success(
task in any::<Task>(),
) {
let rt = tokio::runtime::Runtime::new().unwrap();
let handle = rt.spawn(async {
let batch = task_to_batch(task);
let should_accept = matches!(batch.content, BatchContent::Dump { .. });
let mocker = Mocker::default();
if should_accept {
mocker.when::<String, DumpResult<()>>("run")
.once()
.then(|_| Ok(()));
}
let dump_handler = DumpHandler::<MockIndexMetaStore, MockIndexStore>::mock(mocker);
let accept = dump_handler.accept(&batch);
assert_eq!(accept, should_accept);
if accept {
let batch = dump_handler.process_batch(batch).await;
let last_event = batch.content.first().unwrap().events.last().unwrap();
assert!(matches!(last_event, TaskEvent::Succeded { .. }));
}
});
rt.block_on(handle).unwrap();
}
#[test]
fn test_handle_dump_error(
task in any::<Task>(),
) {
let rt = tokio::runtime::Runtime::new().unwrap();
let handle = rt.spawn(async {
let batch = task_to_batch(task);
let should_accept = matches!(batch.content, BatchContent::Dump { .. });
let mocker = Mocker::default();
if should_accept {
mocker.when::<String, DumpResult<()>>("run")
.once()
.then(|_| Err(DumpError::Internal("error".into())));
}
let dump_handler = DumpHandler::<MockIndexMetaStore, MockIndexStore>::mock(mocker);
let accept = dump_handler.accept(&batch);
assert_eq!(accept, should_accept);
if accept {
let batch = dump_handler.process_batch(batch).await;
let last_event = batch.content.first().unwrap().events.last().unwrap();
assert!(matches!(last_event, TaskEvent::Failed { .. }));
}
});
rt.block_on(handle).unwrap();
}
}
}

View File

@ -0,0 +1,18 @@
use crate::tasks::batch::{Batch, BatchContent};
use crate::tasks::BatchHandler;
/// A sink handler for empty tasks.
pub struct EmptyBatchHandler;
#[async_trait::async_trait]
impl BatchHandler for EmptyBatchHandler {
fn accept(&self, batch: &Batch) -> bool {
matches!(batch.content, BatchContent::Empty)
}
async fn process_batch(&self, batch: Batch) -> Batch {
batch
}
async fn finish(&self, _: &Batch) {}
}

View File

@ -0,0 +1,146 @@
use crate::index_resolver::IndexResolver;
use crate::index_resolver::{index_store::IndexStore, meta_store::IndexMetaStore};
use crate::tasks::batch::{Batch, BatchContent};
use crate::tasks::task::TaskEvent;
use crate::tasks::BatchHandler;
#[async_trait::async_trait]
impl<U, I> BatchHandler for IndexResolver<U, I>
where
U: IndexMetaStore + Send + Sync + 'static,
I: IndexStore + Send + Sync + 'static,
{
fn accept(&self, batch: &Batch) -> bool {
matches!(
batch.content,
BatchContent::DocumentsAdditionBatch(_) | BatchContent::IndexUpdate(_)
)
}
async fn process_batch(&self, mut batch: Batch) -> Batch {
match batch.content {
BatchContent::DocumentsAdditionBatch(ref mut tasks) => {
*tasks = self
.process_document_addition_batch(std::mem::take(tasks))
.await;
}
BatchContent::IndexUpdate(ref mut task) => match self.process_task(task).await {
Ok(success) => task.events.push(TaskEvent::succeeded(success)),
Err(err) => task.events.push(TaskEvent::failed(err.into())),
},
_ => unreachable!(),
}
batch
}
async fn finish(&self, batch: &Batch) {
if let BatchContent::DocumentsAdditionBatch(ref tasks) = batch.content {
for task in tasks {
if let Some(content_uuid) = task.get_content_uuid() {
if let Err(e) = self.file_store.delete(content_uuid).await {
log::error!("error deleting update file: {}", e);
}
}
}
}
}
}
#[cfg(test)]
mod test {
use crate::index_resolver::{index_store::MockIndexStore, meta_store::MockIndexMetaStore};
use crate::tasks::{
handlers::test::task_to_batch,
task::{Task, TaskContent},
};
use crate::update_file_store::{Result as FileStoreResult, UpdateFileStore};
use super::*;
use milli::update::IndexDocumentsMethod;
use nelson::Mocker;
use proptest::prelude::*;
use uuid::Uuid;
proptest! {
#[test]
fn test_accept_task(
task in any::<Task>(),
) {
let batch = task_to_batch(task);
let index_store = MockIndexStore::new();
let meta_store = MockIndexMetaStore::new();
let mocker = Mocker::default();
let update_file_store = UpdateFileStore::mock(mocker);
let index_resolver = IndexResolver::new(meta_store, index_store, update_file_store);
match batch.content {
BatchContent::DocumentsAdditionBatch(_)
| BatchContent::IndexUpdate(_) => assert!(index_resolver.accept(&batch)),
BatchContent::Dump(_)
| BatchContent::Snapshot(_)
| BatchContent::Empty => assert!(!index_resolver.accept(&batch)),
}
}
}
#[actix_rt::test]
async fn finisher_called_on_document_update() {
let index_store = MockIndexStore::new();
let meta_store = MockIndexMetaStore::new();
let mocker = Mocker::default();
let content_uuid = Uuid::new_v4();
mocker
.when::<Uuid, FileStoreResult<()>>("delete")
.once()
.then(move |uuid| {
assert_eq!(uuid, content_uuid);
Ok(())
});
let update_file_store = UpdateFileStore::mock(mocker);
let index_resolver = IndexResolver::new(meta_store, index_store, update_file_store);
let task = Task {
id: 1,
index_uid: None,
content: TaskContent::DocumentAddition {
content_uuid,
merge_strategy: IndexDocumentsMethod::ReplaceDocuments,
primary_key: None,
documents_count: 100,
allow_index_creation: true,
},
events: Vec::new(),
};
let batch = task_to_batch(task);
index_resolver.finish(&batch).await;
}
#[actix_rt::test]
#[should_panic]
async fn panic_when_passed_unsupported_batch() {
let index_store = MockIndexStore::new();
let meta_store = MockIndexMetaStore::new();
let mocker = Mocker::default();
let update_file_store = UpdateFileStore::mock(mocker);
let index_resolver = IndexResolver::new(meta_store, index_store, update_file_store);
let task = Task {
id: 1,
index_uid: None,
content: TaskContent::Dump {
uid: String::from("hello"),
},
events: Vec::new(),
};
let batch = task_to_batch(task);
index_resolver.process_batch(batch).await;
}
// TODO: test perform_batch. We need a Mocker for IndexResolver.
}

View File

@ -0,0 +1,34 @@
pub mod dump_handler;
pub mod empty_handler;
mod index_resolver_handler;
pub mod snapshot_handler;
#[cfg(test)]
mod test {
use time::OffsetDateTime;
use crate::tasks::{
batch::{Batch, BatchContent},
task::{Task, TaskContent},
};
pub fn task_to_batch(task: Task) -> Batch {
let content = match task.content {
TaskContent::DocumentAddition { .. } => {
BatchContent::DocumentsAdditionBatch(vec![task])
}
TaskContent::DocumentDeletion(_)
| TaskContent::SettingsUpdate { .. }
| TaskContent::IndexDeletion
| TaskContent::IndexCreation { .. }
| TaskContent::IndexUpdate { .. } => BatchContent::IndexUpdate(task),
TaskContent::Dump { .. } => BatchContent::Dump(task),
};
Batch {
id: Some(1),
created_at: OffsetDateTime::now_utc(),
content,
}
}
}

View File

@ -0,0 +1,26 @@
use crate::tasks::batch::{Batch, BatchContent};
use crate::tasks::BatchHandler;
pub struct SnapshotHandler;
#[async_trait::async_trait]
impl BatchHandler for SnapshotHandler {
fn accept(&self, batch: &Batch) -> bool {
matches!(batch.content, BatchContent::Snapshot(_))
}
async fn process_batch(&self, batch: Batch) -> Batch {
match batch.content {
BatchContent::Snapshot(job) => {
if let Err(e) = job.run().await {
log::error!("snapshot error: {e}");
}
}
_ => unreachable!(),
}
Batch::empty()
}
async fn finish(&self, _: &Batch) {}
}

View File

@ -1,5 +1,7 @@
use async_trait::async_trait;
pub use handlers::empty_handler::EmptyBatchHandler;
pub use handlers::snapshot_handler::SnapshotHandler;
pub use scheduler::Scheduler;
pub use task_store::TaskFilter;
@ -11,10 +13,9 @@ pub use task_store::TaskStore;
use batch::Batch;
use error::Result;
use self::task::Job;
pub mod batch;
pub mod error;
mod handlers;
mod scheduler;
pub mod task;
mod task_store;
@ -22,11 +23,15 @@ pub mod update_loop;
#[cfg_attr(test, mockall::automock(type Error=test::DebugError;))]
#[async_trait]
pub trait TaskPerformer: Sync + Send + 'static {
/// Processes the `Task` batch returning the batch with the `Task` updated.
async fn process_batch(&self, batch: Batch) -> Batch;
pub trait BatchHandler: Sync + Send + 'static {
/// return whether this handler can accept this batch
fn accept(&self, batch: &Batch) -> bool;
async fn process_job(&self, job: Job);
/// Processes the `Task` batch returning the batch with the `Task` updated.
///
/// It is ok for this function to panic if a batch is handed that hasn't been verified by
/// `accept` beforehand.
async fn process_batch(&self, batch: Batch) -> Batch;
/// `finish` is called when the result of `process` has been commited to the task store. This
/// method can be used to perform cleanup after the update has been completed for example.

View File

@ -1,7 +1,7 @@
use std::cmp::Ordering;
use std::collections::{hash_map::Entry, BinaryHeap, HashMap, VecDeque};
use std::ops::{Deref, DerefMut};
use std::path::Path;
use std::slice;
use std::sync::Arc;
use std::time::Duration;
@ -11,19 +11,20 @@ use time::OffsetDateTime;
use tokio::sync::{watch, RwLock};
use crate::options::SchedulerConfig;
use crate::update_file_store::UpdateFileStore;
use crate::snapshot::SnapshotJob;
use super::batch::Batch;
use super::batch::{Batch, BatchContent};
use super::error::Result;
use super::task::{Job, Task, TaskContent, TaskEvent, TaskId};
use super::task::{Task, TaskContent, TaskEvent, TaskId};
use super::update_loop::UpdateLoop;
use super::{TaskFilter, TaskPerformer, TaskStore};
use super::{BatchHandler, TaskFilter, TaskStore};
#[derive(Eq, Debug, Clone, Copy)]
enum TaskType {
DocumentAddition { number: usize },
DocumentUpdate { number: usize },
Other,
IndexUpdate,
Dump,
}
/// Two tasks are equal if they have the same type.
@ -63,7 +64,7 @@ impl Ord for PendingTask {
#[derive(Debug)]
struct TaskList {
index: String,
id: TaskListIdentifier,
tasks: BinaryHeap<PendingTask>,
}
@ -82,9 +83,9 @@ impl DerefMut for TaskList {
}
impl TaskList {
fn new(index: String) -> Self {
fn new(id: TaskListIdentifier) -> Self {
Self {
index,
id,
tasks: Default::default(),
}
}
@ -92,7 +93,7 @@ impl TaskList {
impl PartialEq for TaskList {
fn eq(&self, other: &Self) -> bool {
self.index == other.index
self.id == other.id
}
}
@ -100,11 +101,20 @@ impl Eq for TaskList {}
impl Ord for TaskList {
fn cmp(&self, other: &Self) -> Ordering {
match (self.peek(), other.peek()) {
(None, None) => Ordering::Equal,
(None, Some(_)) => Ordering::Less,
(Some(_), None) => Ordering::Greater,
(Some(lhs), Some(rhs)) => lhs.cmp(rhs),
match (&self.id, &other.id) {
(TaskListIdentifier::Index(_), TaskListIdentifier::Index(_)) => {
match (self.peek(), other.peek()) {
(None, None) => Ordering::Equal,
(None, Some(_)) => Ordering::Less,
(Some(_), None) => Ordering::Greater,
(Some(lhs), Some(rhs)) => lhs.cmp(rhs),
}
}
(TaskListIdentifier::Index(_), TaskListIdentifier::Dump) => Ordering::Less,
(TaskListIdentifier::Dump, TaskListIdentifier::Index(_)) => Ordering::Greater,
(TaskListIdentifier::Dump, TaskListIdentifier::Dump) => {
unreachable!("There should be only one Dump task list")
}
}
}
}
@ -115,18 +125,28 @@ impl PartialOrd for TaskList {
}
}
#[derive(PartialEq, Eq, Hash, Debug, Clone)]
enum TaskListIdentifier {
Index(String),
Dump,
}
#[derive(Default)]
struct TaskQueue {
/// Maps index uids to their TaskList, for quick access
index_tasks: HashMap<String, Arc<AtomicRefCell<TaskList>>>,
index_tasks: HashMap<TaskListIdentifier, Arc<AtomicRefCell<TaskList>>>,
/// A queue that orders TaskList by the priority of their fist update
queue: BinaryHeap<Arc<AtomicRefCell<TaskList>>>,
}
impl TaskQueue {
fn insert(&mut self, task: Task) {
let uid = task.index_uid.into_inner();
let id = task.id;
let uid = match task.index_uid {
Some(uid) => TaskListIdentifier::Index(uid.into_inner()),
None if matches!(task.content, TaskContent::Dump { .. }) => TaskListIdentifier::Dump,
None => unreachable!("invalid task state"),
};
let kind = match task.content {
TaskContent::DocumentAddition {
documents_count,
@ -142,7 +162,13 @@ impl TaskQueue {
} => TaskType::DocumentUpdate {
number: documents_count,
},
_ => TaskType::Other,
TaskContent::Dump { .. } => TaskType::Dump,
TaskContent::DocumentDeletion(_)
| TaskContent::SettingsUpdate { .. }
| TaskContent::IndexDeletion
| TaskContent::IndexCreation { .. }
| TaskContent::IndexUpdate { .. } => TaskType::IndexUpdate,
_ => unreachable!("unhandled task type"),
};
let task = PendingTask { kind, id };
@ -160,7 +186,7 @@ impl TaskQueue {
list.push(task);
}
Entry::Vacant(entry) => {
let mut task_list = TaskList::new(entry.key().to_owned());
let mut task_list = TaskList::new(entry.key().clone());
task_list.push(task);
let task_list = Arc::new(AtomicRefCell::new(task_list));
entry.insert(task_list.clone());
@ -181,7 +207,7 @@ impl TaskQueue {
// After being mutated, the head is reinserted to the correct position.
self.queue.push(head);
} else {
self.index_tasks.remove(&head.borrow().index);
self.index_tasks.remove(&head.borrow().id);
}
Some(result)
@ -193,11 +219,12 @@ impl TaskQueue {
}
pub struct Scheduler {
jobs: VecDeque<Job>,
// TODO: currently snapshots are non persistent tasks, and are treated differently.
snapshots: VecDeque<SnapshotJob>,
tasks: TaskQueue,
store: TaskStore,
processing: Vec<TaskId>,
processing: Processing,
next_fetched_task_id: TaskId,
config: SchedulerConfig,
/// Notifies the update loop that a new task was received
@ -205,14 +232,11 @@ pub struct Scheduler {
}
impl Scheduler {
pub fn new<P>(
pub fn new(
store: TaskStore,
performer: Arc<P>,
performers: Vec<Arc<dyn BatchHandler + Sync + Send + 'static>>,
mut config: SchedulerConfig,
) -> Result<Arc<RwLock<Self>>>
where
P: TaskPerformer,
{
) -> Result<Arc<RwLock<Self>>> {
let (notifier, rcv) = watch::channel(());
let debounce_time = config.debounce_duration_sec;
@ -223,11 +247,11 @@ impl Scheduler {
}
let this = Self {
jobs: VecDeque::new(),
snapshots: VecDeque::new(),
tasks: TaskQueue::default(),
store,
processing: Vec::new(),
processing: Processing::Nothing,
next_fetched_task_id: 0,
config,
notifier,
@ -240,7 +264,7 @@ impl Scheduler {
let update_loop = UpdateLoop::new(
this.clone(),
performer,
performers,
debounce_time.filter(|&v| v > 0).map(Duration::from_secs),
rcv,
);
@ -250,10 +274,6 @@ impl Scheduler {
Ok(this)
}
pub async fn dump(&self, path: &Path, file_store: UpdateFileStore) -> Result<()> {
self.store.dump(path, file_store).await
}
fn register_task(&mut self, task: Task) {
assert!(!task.is_finished());
self.tasks.insert(task);
@ -261,7 +281,7 @@ impl Scheduler {
/// Clears the processing list, this method should be called when the processing of a batch is finished.
pub fn finish(&mut self) {
self.processing.clear();
self.processing = Processing::Nothing;
}
pub fn notify(&self) {
@ -269,13 +289,27 @@ impl Scheduler {
}
fn notify_if_not_empty(&self) {
if !self.jobs.is_empty() || !self.tasks.is_empty() {
if !self.snapshots.is_empty() || !self.tasks.is_empty() {
self.notify();
}
}
pub async fn update_tasks(&self, tasks: Vec<Task>) -> Result<Vec<Task>> {
self.store.update_tasks(tasks).await
pub async fn update_tasks(&self, content: BatchContent) -> Result<BatchContent> {
match content {
BatchContent::DocumentsAdditionBatch(tasks) => {
let tasks = self.store.update_tasks(tasks).await?;
Ok(BatchContent::DocumentsAdditionBatch(tasks))
}
BatchContent::IndexUpdate(t) => {
let mut tasks = self.store.update_tasks(vec![t]).await?;
Ok(BatchContent::IndexUpdate(tasks.remove(0)))
}
BatchContent::Dump(t) => {
let mut tasks = self.store.update_tasks(vec![t]).await?;
Ok(BatchContent::Dump(tasks.remove(0)))
}
other => Ok(other),
}
}
pub async fn get_task(&self, id: TaskId, filter: Option<TaskFilter>) -> Result<Task> {
@ -294,16 +328,16 @@ impl Scheduler {
pub async fn get_processing_tasks(&self) -> Result<Vec<Task>> {
let mut tasks = Vec::new();
for id in self.processing.iter() {
let task = self.store.get_task(*id, None).await?;
for id in self.processing.ids() {
let task = self.store.get_task(id, None).await?;
tasks.push(task);
}
Ok(tasks)
}
pub async fn schedule_job(&mut self, job: Job) {
self.jobs.push_back(job);
pub fn schedule_snapshot(&mut self, job: SnapshotJob) {
self.snapshots.push_back(job);
self.notify();
}
@ -329,106 +363,168 @@ impl Scheduler {
}
/// Prepare the next batch, and set `processing` to the ids in that batch.
pub async fn prepare(&mut self) -> Result<Pending> {
pub async fn prepare(&mut self) -> Result<Batch> {
// If there is a job to process, do it first.
if let Some(job) = self.jobs.pop_front() {
if let Some(job) = self.snapshots.pop_front() {
// There is more work to do, notify the update loop
self.notify_if_not_empty();
return Ok(Pending::Job(job));
let batch = Batch::new(None, BatchContent::Snapshot(job));
return Ok(batch);
}
// Try to fill the queue with pending tasks.
self.fetch_pending_tasks().await?;
make_batch(&mut self.tasks, &mut self.processing, &self.config);
self.processing = make_batch(&mut self.tasks, &self.config);
log::debug!("prepared batch with {} tasks", self.processing.len());
if !self.processing.is_empty() {
let ids = std::mem::take(&mut self.processing);
if !self.processing.is_nothing() {
let (processing, mut content) = self
.store
.get_processing_tasks(std::mem::take(&mut self.processing))
.await?;
let (ids, mut tasks) = self.store.get_pending_tasks(ids).await?;
// The batch id is the id of the first update it contains
let id = match tasks.first() {
// The batch id is the id of the first update it contains. At this point we must have a
// valid batch that contains at least 1 task.
let id = match content.first() {
Some(Task { id, .. }) => *id,
_ => panic!("invalid batch"),
};
tasks.iter_mut().for_each(|t| {
t.events.push(TaskEvent::Batched {
batch_id: id,
timestamp: OffsetDateTime::now_utc(),
})
content.push_event(TaskEvent::Batched {
batch_id: id,
timestamp: OffsetDateTime::now_utc(),
});
self.processing = ids;
self.processing = processing;
let batch = Batch {
id,
created_at: OffsetDateTime::now_utc(),
tasks,
};
let batch = Batch::new(Some(id), content);
// There is more work to do, notify the update loop
self.notify_if_not_empty();
Ok(Pending::Batch(batch))
Ok(batch)
} else {
Ok(Pending::Nothing)
Ok(Batch::empty())
}
}
}
#[derive(Debug)]
pub enum Pending {
Batch(Batch),
Job(Job),
#[derive(Debug, PartialEq)]
pub enum Processing {
DocumentAdditions(Vec<TaskId>),
IndexUpdate(TaskId),
Dump(TaskId),
/// Variant used when there is nothing to process.
Nothing,
}
fn make_batch(tasks: &mut TaskQueue, processing: &mut Vec<TaskId>, config: &SchedulerConfig) {
processing.clear();
impl Default for Processing {
fn default() -> Self {
Self::Nothing
}
}
let mut doc_count = 0;
tasks.head_mut(|list| match list.peek().copied() {
Some(PendingTask {
kind: TaskType::Other,
id,
}) => {
processing.push(id);
list.pop();
enum ProcessingIter<'a> {
Many(slice::Iter<'a, TaskId>),
Single(Option<TaskId>),
}
impl<'a> Iterator for ProcessingIter<'a> {
type Item = TaskId;
fn next(&mut self) -> Option<Self::Item> {
match self {
ProcessingIter::Many(iter) => iter.next().copied(),
ProcessingIter::Single(val) => val.take(),
}
Some(PendingTask { kind, .. }) => loop {
match list.peek() {
Some(pending) if pending.kind == kind => {
// We always need to process at least one task for the scheduler to make progress.
if processing.len() >= config.max_batch_size.unwrap_or(usize::MAX).max(1) {
break;
}
let pending = list.pop().unwrap();
processing.push(pending.id);
}
}
// We add the number of documents to the count if we are scheduling document additions and
// stop adding if we already have enough.
//
// We check that bound only after adding the current task to the batch, so that a batch contains at least one task.
match pending.kind {
TaskType::DocumentUpdate { number }
| TaskType::DocumentAddition { number } => {
doc_count += number;
impl Processing {
fn is_nothing(&self) -> bool {
matches!(self, Processing::Nothing)
}
if doc_count >= config.max_documents_per_batch.unwrap_or(usize::MAX) {
pub fn ids(&self) -> impl Iterator<Item = TaskId> + '_ {
match self {
Processing::DocumentAdditions(v) => ProcessingIter::Many(v.iter()),
Processing::IndexUpdate(id) | Processing::Dump(id) => ProcessingIter::Single(Some(*id)),
Processing::Nothing => ProcessingIter::Single(None),
}
}
pub fn len(&self) -> usize {
match self {
Processing::DocumentAdditions(v) => v.len(),
Processing::IndexUpdate(_) | Processing::Dump(_) => 1,
Processing::Nothing => 0,
}
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
fn make_batch(tasks: &mut TaskQueue, config: &SchedulerConfig) -> Processing {
let mut doc_count = 0;
tasks
.head_mut(|list| match list.peek().copied() {
Some(PendingTask {
kind: TaskType::IndexUpdate,
id,
}) => {
list.pop();
Processing::IndexUpdate(id)
}
Some(PendingTask {
kind: TaskType::Dump,
id,
}) => {
list.pop();
Processing::Dump(id)
}
Some(PendingTask { kind, .. }) => {
let mut task_list = Vec::new();
loop {
match list.peek() {
Some(pending) if pending.kind == kind => {
// We always need to process at least one task for the scheduler to make progress.
if task_list.len() >= config.max_batch_size.unwrap_or(usize::MAX).max(1)
{
break;
}
let pending = list.pop().unwrap();
task_list.push(pending.id);
// We add the number of documents to the count if we are scheduling document additions and
// stop adding if we already have enough.
//
// We check that bound only after adding the current task to the batch, so that a batch contains at least one task.
match pending.kind {
TaskType::DocumentUpdate { number }
| TaskType::DocumentAddition { number } => {
doc_count += number;
if doc_count
>= config.max_documents_per_batch.unwrap_or(usize::MAX)
{
break;
}
}
_ => (),
}
}
_ => (),
_ => break,
}
}
_ => break,
Processing::DocumentAdditions(task_list)
}
},
None => (),
});
None => Processing::Nothing,
})
.unwrap_or(Processing::Nothing)
}
#[cfg(test)]
@ -440,10 +536,10 @@ mod test {
use super::*;
fn gen_task(id: TaskId, index_uid: &str, content: TaskContent) -> Task {
fn gen_task(id: TaskId, index_uid: Option<&str>, content: TaskContent) -> Task {
Task {
id,
index_uid: IndexUid::new_unchecked(index_uid),
index_uid: index_uid.map(IndexUid::new_unchecked),
content,
events: vec![],
}
@ -452,13 +548,13 @@ mod test {
#[test]
fn register_updates_multiples_indexes() {
let mut queue = TaskQueue::default();
queue.insert(gen_task(0, "test1", TaskContent::IndexDeletion));
queue.insert(gen_task(1, "test2", TaskContent::IndexDeletion));
queue.insert(gen_task(2, "test2", TaskContent::IndexDeletion));
queue.insert(gen_task(3, "test2", TaskContent::IndexDeletion));
queue.insert(gen_task(4, "test1", TaskContent::IndexDeletion));
queue.insert(gen_task(5, "test1", TaskContent::IndexDeletion));
queue.insert(gen_task(6, "test2", TaskContent::IndexDeletion));
queue.insert(gen_task(0, Some("test1"), TaskContent::IndexDeletion));
queue.insert(gen_task(1, Some("test2"), TaskContent::IndexDeletion));
queue.insert(gen_task(2, Some("test2"), TaskContent::IndexDeletion));
queue.insert(gen_task(3, Some("test2"), TaskContent::IndexDeletion));
queue.insert(gen_task(4, Some("test1"), TaskContent::IndexDeletion));
queue.insert(gen_task(5, Some("test1"), TaskContent::IndexDeletion));
queue.insert(gen_task(6, Some("test2"), TaskContent::IndexDeletion));
let test1_tasks = queue
.head_mut(|tasks| tasks.drain().map(|t| t.id).collect::<Vec<_>>())
@ -486,40 +582,45 @@ mod test {
documents_count: 0,
allow_index_creation: true,
};
queue.insert(gen_task(0, "test1", content.clone()));
queue.insert(gen_task(1, "test2", content.clone()));
queue.insert(gen_task(2, "test2", TaskContent::IndexDeletion));
queue.insert(gen_task(3, "test2", content.clone()));
queue.insert(gen_task(4, "test1", content.clone()));
queue.insert(gen_task(5, "test1", TaskContent::IndexDeletion));
queue.insert(gen_task(6, "test2", content.clone()));
queue.insert(gen_task(7, "test1", content));
let mut batch = Vec::new();
queue.insert(gen_task(0, Some("test1"), content.clone()));
queue.insert(gen_task(1, Some("test2"), content.clone()));
queue.insert(gen_task(2, Some("test2"), TaskContent::IndexDeletion));
queue.insert(gen_task(3, Some("test2"), content.clone()));
queue.insert(gen_task(4, Some("test1"), content.clone()));
queue.insert(gen_task(5, Some("test1"), TaskContent::IndexDeletion));
queue.insert(gen_task(6, Some("test2"), content.clone()));
queue.insert(gen_task(7, Some("test1"), content));
queue.insert(gen_task(
8,
None,
TaskContent::Dump {
uid: "adump".to_owned(),
},
));
let config = SchedulerConfig::default();
make_batch(&mut queue, &mut batch, &config);
assert_eq!(batch, &[0, 4]);
batch.clear();
make_batch(&mut queue, &mut batch, &config);
assert_eq!(batch, &[1]);
// Make sure that the dump is processed before everybody else.
let batch = make_batch(&mut queue, &config);
assert_eq!(batch, Processing::Dump(8));
batch.clear();
make_batch(&mut queue, &mut batch, &config);
assert_eq!(batch, &[2]);
let batch = make_batch(&mut queue, &config);
assert_eq!(batch, Processing::DocumentAdditions(vec![0, 4]));
batch.clear();
make_batch(&mut queue, &mut batch, &config);
assert_eq!(batch, &[3, 6]);
let batch = make_batch(&mut queue, &config);
assert_eq!(batch, Processing::DocumentAdditions(vec![1]));
batch.clear();
make_batch(&mut queue, &mut batch, &config);
assert_eq!(batch, &[5]);
let batch = make_batch(&mut queue, &config);
assert_eq!(batch, Processing::IndexUpdate(2));
batch.clear();
make_batch(&mut queue, &mut batch, &config);
assert_eq!(batch, &[7]);
let batch = make_batch(&mut queue, &config);
assert_eq!(batch, Processing::DocumentAdditions(vec![3, 6]));
let batch = make_batch(&mut queue, &config);
assert_eq!(batch, Processing::IndexUpdate(5));
let batch = make_batch(&mut queue, &config);
assert_eq!(batch, Processing::DocumentAdditions(vec![7]));
assert!(queue.is_empty());
}

View File

@ -1,17 +1,13 @@
use std::path::PathBuf;
use meilisearch_error::ResponseError;
use milli::update::{DocumentAdditionResult, IndexDocumentsMethod};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tokio::sync::oneshot;
use uuid::Uuid;
use super::batch::BatchId;
use crate::{
index::{Settings, Unchecked},
index_resolver::{error::IndexResolverError, IndexUid},
snapshot::SnapshotJob,
index_resolver::IndexUid,
};
pub type TaskId = u64;
@ -66,6 +62,22 @@ pub enum TaskEvent {
},
}
impl TaskEvent {
pub fn succeeded(result: TaskResult) -> Self {
Self::Succeded {
result,
timestamp: OffsetDateTime::now_utc(),
}
}
pub fn failed(error: ResponseError) -> Self {
Self::Failed {
error,
timestamp: OffsetDateTime::now_utc(),
}
}
}
/// A task represents an operation that Meilisearch must do.
/// It's stored on disk and executed from the lowest to highest Task id.
/// Everytime a new task is created it has a higher Task id than the previous one.
@ -74,7 +86,17 @@ pub enum TaskEvent {
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
pub struct Task {
pub id: TaskId,
pub index_uid: IndexUid,
/// The name of the index the task is targeting. If it isn't targeting any index (i.e Dump task)
/// then this is None
// TODO: when next forward breaking dumps, it would be a good idea to move this field inside of
// the TaskContent.
#[cfg_attr(
test,
proptest(
strategy = "proptest::option::weighted(proptest::option::Probability::new(0.99), IndexUid::arbitrary())"
)
)]
pub index_uid: Option<IndexUid>,
pub content: TaskContent,
pub events: Vec<TaskEvent>,
}
@ -100,33 +122,6 @@ impl Task {
}
}
/// A job is like a volatile priority `Task`.
/// It should be processed as fast as possible and is not stored on disk.
/// This means, when Meilisearch is closed all your unprocessed jobs will disappear.
#[derive(Debug, derivative::Derivative)]
#[derivative(PartialEq)]
pub enum Job {
Dump {
#[derivative(PartialEq = "ignore")]
ret: oneshot::Sender<Result<oneshot::Sender<()>, IndexResolverError>>,
path: PathBuf,
},
Snapshot(#[derivative(PartialEq = "ignore")] SnapshotJob),
Empty,
}
impl Default for Job {
fn default() -> Self {
Self::Empty
}
}
impl Job {
pub fn take(&mut self) -> Self {
std::mem::take(self)
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
pub enum DocumentDeletion {
@ -161,6 +156,9 @@ pub enum TaskContent {
IndexUpdate {
primary_key: Option<String>,
},
Dump {
uid: String,
},
}
#[cfg(test)]

View File

@ -9,7 +9,9 @@ use log::debug;
use milli::heed::{Env, RwTxn};
use time::OffsetDateTime;
use super::batch::BatchContent;
use super::error::TaskError;
use super::scheduler::Processing;
use super::task::{Task, TaskContent, TaskId};
use super::Result;
use crate::index_resolver::IndexUid;
@ -30,10 +32,13 @@ pub struct TaskFilter {
impl TaskFilter {
fn pass(&self, task: &Task) -> bool {
self.indexes
.as_ref()
.map(|indexes| indexes.contains(&*task.index_uid))
.unwrap_or(true)
match task.index_uid {
Some(ref index_uid) => self
.indexes
.as_ref()
.map_or(true, |indexes| indexes.contains(index_uid.as_str())),
None => false,
}
}
/// Adds an index to the filter, so the filter must match this index.
@ -66,7 +71,11 @@ impl TaskStore {
Ok(Self { store })
}
pub async fn register(&self, index_uid: IndexUid, content: TaskContent) -> Result<Task> {
pub async fn register(
&self,
index_uid: Option<IndexUid>,
content: TaskContent,
) -> Result<Task> {
debug!("registering update: {:?}", content);
let store = self.store.clone();
let task = tokio::task::spawn_blocking(move || -> Result<Task> {
@ -114,19 +123,44 @@ impl TaskStore {
}
}
pub async fn get_pending_tasks(&self, ids: Vec<TaskId>) -> Result<(Vec<TaskId>, Vec<Task>)> {
/// This methods takes a `Processing` which contains the next task ids to process, and returns
/// the coresponding tasks along with the ownership to the passed processing.
///
/// We need get_processing_tasks to take ownership over `Processing` because we need it to be
/// valid for 'static.
pub async fn get_processing_tasks(
&self,
processing: Processing,
) -> Result<(Processing, BatchContent)> {
let store = self.store.clone();
let tasks = tokio::task::spawn_blocking(move || -> Result<_> {
let mut tasks = Vec::new();
let txn = store.rtxn()?;
for id in ids.iter() {
let task = store
.get(&txn, *id)?
.ok_or(TaskError::UnexistingTask(*id))?;
tasks.push(task);
}
Ok((ids, tasks))
let content = match processing {
Processing::DocumentAdditions(ref ids) => {
let mut tasks = Vec::new();
for id in ids.iter() {
let task = store
.get(&txn, *id)?
.ok_or(TaskError::UnexistingTask(*id))?;
tasks.push(task);
}
BatchContent::DocumentsAdditionBatch(tasks)
}
Processing::IndexUpdate(id) => {
let task = store.get(&txn, id)?.ok_or(TaskError::UnexistingTask(id))?;
BatchContent::IndexUpdate(task)
}
Processing::Dump(id) => {
let task = store.get(&txn, id)?.ok_or(TaskError::UnexistingTask(id))?;
debug_assert!(matches!(task.content, TaskContent::Dump { .. }));
BatchContent::Dump(task)
}
Processing::Nothing => BatchContent::Empty,
};
Ok((processing, content))
})
.await??;
@ -169,13 +203,14 @@ impl TaskStore {
}
pub async fn dump(
&self,
env: Arc<Env>,
dir_path: impl AsRef<Path>,
update_file_store: UpdateFileStore,
) -> Result<()> {
let store = Self::new(env)?;
let update_dir = dir_path.as_ref().join("updates");
let updates_file = update_dir.join("data.jsonl");
let tasks = self.list_tasks(None, None, None).await?;
let tasks = store.list_tasks(None, None, None).await?;
let dir_path = dir_path.as_ref().to_path_buf();
tokio::task::spawn_blocking(move || -> Result<()> {
@ -223,7 +258,7 @@ impl TaskStore {
#[cfg(test)]
pub mod test {
use crate::tasks::task_store::store::test::tmp_env;
use crate::tasks::{scheduler::Processing, task_store::store::test::tmp_env};
use super::*;
@ -252,6 +287,14 @@ pub mod test {
Ok(Self::Real(TaskStore::new(env)?))
}
pub async fn dump(
env: Arc<milli::heed::Env>,
path: impl AsRef<Path>,
update_file_store: UpdateFileStore,
) -> Result<()> {
TaskStore::dump(env, path, update_file_store).await
}
pub fn mock(mocker: Mocker) -> Self {
Self::Mock(Arc::new(mocker))
}
@ -272,12 +315,12 @@ pub mod test {
}
}
pub async fn get_pending_tasks(
pub async fn get_processing_tasks(
&self,
tasks: Vec<TaskId>,
) -> Result<(Vec<TaskId>, Vec<Task>)> {
tasks: Processing,
) -> Result<(Processing, BatchContent)> {
match self {
Self::Real(s) => s.get_pending_tasks(tasks).await,
Self::Real(s) => s.get_processing_tasks(tasks).await,
Self::Mock(m) => unsafe { m.get("get_pending_task").call(tasks) },
}
}
@ -294,18 +337,11 @@ pub mod test {
}
}
pub async fn dump(
pub async fn register(
&self,
path: impl AsRef<Path>,
update_file_store: UpdateFileStore,
) -> Result<()> {
match self {
Self::Real(s) => s.dump(path, update_file_store).await,
Self::Mock(m) => unsafe { m.get("dump").call((path, update_file_store)) },
}
}
pub async fn register(&self, index_uid: IndexUid, content: TaskContent) -> Result<Task> {
index_uid: Option<IndexUid>,
content: TaskContent,
) -> Result<Task> {
match self {
Self::Real(s) => s.register(index_uid, content).await,
Self::Mock(_m) => todo!(),
@ -335,7 +371,7 @@ pub mod test {
let gen_task = |id: TaskId| Task {
id,
index_uid: IndexUid::new_unchecked("test"),
index_uid: Some(IndexUid::new_unchecked("test")),
content: TaskContent::IndexCreation { primary_key: None },
events: Vec::new(),
};

View File

@ -108,8 +108,10 @@ impl Store {
pub fn put(&self, txn: &mut RwTxn, task: &Task) -> Result<()> {
self.tasks.put(txn, &BEU64::new(task.id), task)?;
self.uids_task_ids
.put(txn, &(&task.index_uid, task.id), &())?;
// only add the task to the indexes index if it has an index_uid
if let Some(ref index_uid) = task.index_uid {
self.uids_task_ids.put(txn, &(index_uid, task.id), &())?;
}
Ok(())
}
@ -325,7 +327,7 @@ pub mod test {
let tasks = (0..100)
.map(|_| Task {
id: rand::random(),
index_uid: IndexUid::new_unchecked("test"),
index_uid: Some(IndexUid::new_unchecked("test")),
content: TaskContent::IndexDeletion,
events: vec![],
})
@ -356,14 +358,14 @@ pub mod test {
let task_1 = Task {
id: 1,
index_uid: IndexUid::new_unchecked("test"),
index_uid: Some(IndexUid::new_unchecked("test")),
content: TaskContent::IndexDeletion,
events: vec![],
};
let task_2 = Task {
id: 0,
index_uid: IndexUid::new_unchecked("test1"),
index_uid: Some(IndexUid::new_unchecked("test1")),
content: TaskContent::IndexDeletion,
events: vec![],
};
@ -379,18 +381,28 @@ pub mod test {
txn.abort().unwrap();
assert_eq!(tasks.len(), 1);
assert_eq!(&*tasks.first().unwrap().index_uid, "test");
assert_eq!(
tasks
.first()
.as_ref()
.unwrap()
.index_uid
.as_ref()
.unwrap()
.as_str(),
"test"
);
// same thing but invert the ids
let task_1 = Task {
id: 0,
index_uid: IndexUid::new_unchecked("test"),
index_uid: Some(IndexUid::new_unchecked("test")),
content: TaskContent::IndexDeletion,
events: vec![],
};
let task_2 = Task {
id: 1,
index_uid: IndexUid::new_unchecked("test1"),
index_uid: Some(IndexUid::new_unchecked("test1")),
content: TaskContent::IndexDeletion,
events: vec![],
};
@ -405,7 +417,17 @@ pub mod test {
let tasks = store.list_tasks(&txn, None, Some(filter), None).unwrap();
assert_eq!(tasks.len(), 1);
assert_eq!(&*tasks.first().unwrap().index_uid, "test");
assert_eq!(
&*tasks
.first()
.as_ref()
.unwrap()
.index_uid
.as_ref()
.unwrap()
.as_str(),
"test"
);
}
proptest! {

View File

@ -7,33 +7,29 @@ use tokio::time::interval_at;
use super::batch::Batch;
use super::error::Result;
use super::scheduler::Pending;
use super::{Scheduler, TaskPerformer};
use super::{BatchHandler, Scheduler};
use crate::tasks::task::TaskEvent;
/// The update loop sequentially performs batches of updates by asking the scheduler for a batch,
/// and handing it to the `TaskPerformer`.
pub struct UpdateLoop<P: TaskPerformer> {
pub struct UpdateLoop {
scheduler: Arc<RwLock<Scheduler>>,
performer: Arc<P>,
performers: Vec<Arc<dyn BatchHandler + Send + Sync + 'static>>,
notifier: Option<watch::Receiver<()>>,
debounce_duration: Option<Duration>,
}
impl<P> UpdateLoop<P>
where
P: TaskPerformer + Send + Sync + 'static,
{
impl UpdateLoop {
pub fn new(
scheduler: Arc<RwLock<Scheduler>>,
performer: Arc<P>,
performers: Vec<Arc<dyn BatchHandler + Send + Sync + 'static>>,
debuf_duration: Option<Duration>,
notifier: watch::Receiver<()>,
) -> Self {
Self {
scheduler,
performer,
performers,
debounce_duration: debuf_duration,
notifier: Some(notifier),
}
@ -59,34 +55,29 @@ where
}
async fn process_next_batch(&self) -> Result<()> {
let pending = { self.scheduler.write().await.prepare().await? };
match pending {
Pending::Batch(mut batch) => {
for task in &mut batch.tasks {
task.events
.push(TaskEvent::Processing(OffsetDateTime::now_utc()));
}
let mut batch = { self.scheduler.write().await.prepare().await? };
let performer = self
.performers
.iter()
.find(|p| p.accept(&batch))
.expect("No performer found for batch")
.clone();
batch.tasks = {
self.scheduler
.read()
.await
.update_tasks(batch.tasks)
.await?
};
batch
.content
.push_event(TaskEvent::Processing(OffsetDateTime::now_utc()));
let performer = self.performer.clone();
batch.content = {
self.scheduler
.read()
.await
.update_tasks(batch.content)
.await?
};
let batch = performer.process_batch(batch).await;
let batch = performer.process_batch(batch).await;
self.handle_batch_result(batch).await?;
}
Pending::Job(job) => {
let performer = self.performer.clone();
performer.process_job(job).await;
}
Pending::Nothing => (),
}
self.handle_batch_result(batch, performer).await?;
Ok(())
}
@ -96,13 +87,17 @@ where
/// When a task is processed, the result of the process is pushed to its event list. The
/// `handle_batch_result` make sure that the new state is saved to the store.
/// The tasks are then removed from the processing queue.
async fn handle_batch_result(&self, mut batch: Batch) -> Result<()> {
async fn handle_batch_result(
&self,
mut batch: Batch,
performer: Arc<dyn BatchHandler + Sync + Send + 'static>,
) -> Result<()> {
let mut scheduler = self.scheduler.write().await;
let tasks = scheduler.update_tasks(batch.tasks).await?;
let content = scheduler.update_tasks(batch.content).await?;
scheduler.finish();
drop(scheduler);
batch.tasks = tasks;
self.performer.finish(&batch).await;
batch.content = content;
performer.finish(&batch).await;
Ok(())
}
}

View File

@ -26,7 +26,7 @@ pub struct UpdateFile {
#[error("Error while persisting update to disk: {0}")]
pub struct UpdateFileStoreError(Box<dyn std::error::Error + Sync + Send + 'static>);
type Result<T> = std::result::Result<T, UpdateFileStoreError>;
pub type Result<T> = std::result::Result<T, UpdateFileStoreError>;
macro_rules! into_update_store_error {
($($other:path),*) => {
@ -249,7 +249,7 @@ mod test {
pub async fn delete(&self, uuid: Uuid) -> Result<()> {
match self {
MockUpdateFileStore::Real(s) => s.delete(uuid).await,
MockUpdateFileStore::Mock(_) => todo!(),
MockUpdateFileStore::Mock(mocker) => unsafe { mocker.get("delete").call(uuid) },
}
}
}