From fc098022c705ddf55fbe680d22b50622b39309bc Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 14 Sep 2022 16:16:53 +0200 Subject: [PATCH] start integrating the index-scheduler in the meilisearch codebase --- Cargo.lock | 3 + index-scheduler/src/batch.rs | 8 +- index-scheduler/src/error.rs | 4 +- index-scheduler/src/index_mapper.rs | 21 +- index-scheduler/src/index_scheduler.rs | 435 +++++++++++ index-scheduler/src/lib.rs | 378 +--------- index-scheduler/src/task.rs | 141 +++- index-scheduler/src/utils.rs | 4 +- meilisearch-lib/Cargo.toml | 3 + meilisearch-lib/src/index/error.rs | 61 -- meilisearch-lib/src/index_controller/error.rs | 34 +- meilisearch-lib/src/index_controller/mod.rs | 224 ++---- meilisearch-lib/src/index_resolver/error.rs | 71 -- .../src/index_resolver/meta_store.rs | 223 ------ meilisearch-lib/src/index_resolver/mod.rs | 685 ------------------ meilisearch-lib/src/lib.rs | 13 +- meilisearch-lib/src/snapshot.rs | 7 +- meilisearch-lib/src/tasks/batch.rs | 75 -- meilisearch-lib/src/tasks/error.rs | 34 - .../src/tasks/handlers/dump_handler.rs | 132 ---- .../src/tasks/handlers/empty_handler.rs | 18 - .../tasks/handlers/index_resolver_handler.rs | 199 ----- meilisearch-lib/src/tasks/handlers/mod.rs | 34 - .../src/tasks/handlers/snapshot_handler.rs | 26 - meilisearch-lib/src/tasks/mod.rs | 56 -- meilisearch-lib/src/tasks/scheduler.rs | 609 ---------------- meilisearch-lib/src/tasks/update_loop.rs | 93 --- meilisearch-lib/src/update_file_store.rs | 258 ------- 28 files changed, 679 insertions(+), 3170 deletions(-) create mode 100644 index-scheduler/src/index_scheduler.rs delete mode 100644 meilisearch-lib/src/index/error.rs delete mode 100644 meilisearch-lib/src/index_resolver/error.rs delete mode 100644 meilisearch-lib/src/index_resolver/meta_store.rs delete mode 100644 meilisearch-lib/src/index_resolver/mod.rs delete mode 100644 meilisearch-lib/src/tasks/batch.rs delete mode 100644 meilisearch-lib/src/tasks/error.rs delete mode 100644 meilisearch-lib/src/tasks/handlers/dump_handler.rs delete mode 100644 meilisearch-lib/src/tasks/handlers/empty_handler.rs delete mode 100644 meilisearch-lib/src/tasks/handlers/index_resolver_handler.rs delete mode 100644 meilisearch-lib/src/tasks/handlers/mod.rs delete mode 100644 meilisearch-lib/src/tasks/handlers/snapshot_handler.rs delete mode 100644 meilisearch-lib/src/tasks/mod.rs delete mode 100644 meilisearch-lib/src/tasks/scheduler.rs delete mode 100644 meilisearch-lib/src/tasks/update_loop.rs delete mode 100644 meilisearch-lib/src/update_file_store.rs diff --git a/Cargo.lock b/Cargo.lock index 43e15d05f..5495075e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2360,12 +2360,15 @@ dependencies = [ "csv", "derivative", "either", + "file-store", "flate2", "fs_extra", "fst", "futures", "futures-util", "http", + "index", + "index-scheduler", "indexmap", "itertools", "lazy_static", diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 851fba1e6..9742116fb 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -1,6 +1,6 @@ use crate::{ autobatcher::BatchKind, - task::{KindWithContent, Status}, + task::{Kind, KindWithContent, Status, Task}, Error, IndexScheduler, Result, }; use index::{Settings, Unchecked}; @@ -10,8 +10,6 @@ use milli::{ }; use uuid::Uuid; -use crate::{task::Kind, Task}; - pub(crate) enum Batch { Cancel(Task), Snapshot(Vec), @@ -230,8 +228,8 @@ impl IndexScheduler { for (ret, mut task) in ret.iter().zip(document_addition_tasks.into_iter()) { match ret { - Ok(ret) => task.info = Some(format!("{:?}", ret)), - Err(err) => task.error = Some(err.to_string()), + Ok(ret) => todo!(), // task.info = Some(format!("{:?}", ret)), + Err(err) => todo!(), // task.error = Some(err.to_string()), } updated_tasks.push(task); } diff --git a/index-scheduler/src/error.rs b/index-scheduler/src/error.rs index 10bf90974..1caeff33d 100644 --- a/index-scheduler/src/error.rs +++ b/index-scheduler/src/error.rs @@ -13,9 +13,11 @@ pub enum Error { Heed(#[from] heed::Error), #[error(transparent)] Milli(#[from] milli::Error), - #[error("{0}")] + #[error(transparent)] IndexError(#[from] index::error::IndexError), #[error(transparent)] + FileStore(#[from] file_store::Error), + #[error(transparent)] IoError(#[from] std::io::Error), #[error(transparent)] diff --git a/index-scheduler/src/index_mapper.rs b/index-scheduler/src/index_mapper.rs index 43d72b51d..e57c5d00b 100644 --- a/index-scheduler/src/index_mapper.rs +++ b/index-scheduler/src/index_mapper.rs @@ -8,11 +8,13 @@ use index::Index; use milli::heed::types::SerdeBincode; use milli::heed::types::Str; use milli::heed::Database; +use milli::heed::Env; use milli::heed::RoTxn; use milli::heed::RwTxn; use milli::update::IndexerConfig; use uuid::Uuid; +use crate::index_scheduler::db_name; use crate::Error; use crate::Result; @@ -31,9 +33,24 @@ pub struct IndexMapper { } impl IndexMapper { + pub fn new( + env: &Env, + base_path: PathBuf, + index_size: usize, + indexer_config: IndexerConfig, + ) -> Result { + Ok(Self { + index_map: Arc::default(), + index_mapping: env.create_database(Some(db_name::INDEX_MAPPING))?, + base_path, + index_size, + indexer_config: Arc::new(indexer_config), + }) + } + /// Get or create the index. - pub fn create_index(&self, rwtxn: &mut RwTxn, name: &str) -> Result { - let index = match self.index(rwtxn, name) { + pub fn create_index(&self, wtxn: &mut RwTxn, name: &str) -> Result { + let index = match self.index(wtxn, name) { Ok(index) => index, Err(Error::IndexNotFound(_)) => { let uuid = Uuid::new_v4(); diff --git a/index-scheduler/src/index_scheduler.rs b/index-scheduler/src/index_scheduler.rs new file mode 100644 index 000000000..752808d88 --- /dev/null +++ b/index-scheduler/src/index_scheduler.rs @@ -0,0 +1,435 @@ +use crate::index_mapper::IndexMapper; +use crate::task::{Kind, KindWithContent, Status, Task, TaskView}; +use crate::Result; +use file_store::FileStore; +use index::Index; +use milli::update::IndexerConfig; +use synchronoise::SignalEvent; + +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::RwLock; + +use milli::heed::types::{OwnedType, SerdeBincode, Str}; +use milli::heed::{self, Database, Env}; + +use milli::{RoaringBitmapCodec, BEU32}; +use roaring::RoaringBitmap; +use serde::Deserialize; + +const DEFAULT_LIMIT: fn() -> u32 = || 20; + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Query { + #[serde(default = "DEFAULT_LIMIT")] + limit: u32, + from: Option, + status: Option>, + #[serde(rename = "type")] + kind: Option>, + index_uid: Option>, +} + +pub mod db_name { + pub const ALL_TASKS: &str = "all-tasks"; + pub const STATUS: &str = "status"; + pub const KIND: &str = "kind"; + pub const INDEX_TASKS: &str = "index-tasks"; + + pub const INDEX_MAPPING: &str = "index-mapping"; +} + +/// This module is responsible for two things; +/// 1. Resolve the name of the indexes. +/// 2. Schedule the tasks. +#[derive(Clone)] +pub struct IndexScheduler { + /// The list of tasks currently processing. + pub(crate) processing_tasks: Arc>, + + pub(crate) file_store: FileStore, + + /// The LMDB environment which the DBs are associated with. + pub(crate) env: Env, + + // The main database, it contains all the tasks accessible by their Id. + pub(crate) all_tasks: Database, SerdeBincode>, + + /// All the tasks ids grouped by their status. + pub(crate) status: Database, RoaringBitmapCodec>, + /// All the tasks ids grouped by their kind. + pub(crate) kind: Database, RoaringBitmapCodec>, + /// Store the tasks associated to an index. + pub(crate) index_tasks: Database, + + /// In charge of creating, opening, storing and returning indexes. + pub(crate) index_mapper: IndexMapper, + + // set to true when there is work to do. + pub(crate) wake_up: Arc, +} + +impl IndexScheduler { + pub fn new( + db_path: PathBuf, + update_file_path: PathBuf, + indexes_path: PathBuf, + index_size: usize, + indexer_config: IndexerConfig, + ) -> Result { + std::fs::create_dir_all(&db_path)?; + std::fs::create_dir_all(&update_file_path)?; + std::fs::create_dir_all(&indexes_path)?; + + let mut options = heed::EnvOpenOptions::new(); + options.max_dbs(6); + + let env = options.open(db_path)?; + // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things + let wake_up = SignalEvent::auto(true); + + Ok(Self { + // by default there is no processing tasks + processing_tasks: Arc::default(), + file_store: FileStore::new(update_file_path)?, + all_tasks: env.create_database(Some(db_name::ALL_TASKS))?, + status: env.create_database(Some(db_name::STATUS))?, + kind: env.create_database(Some(db_name::KIND))?, + index_tasks: env.create_database(Some(db_name::INDEX_TASKS))?, + index_mapper: IndexMapper::new(&env, indexes_path, index_size, indexer_config)?, + env, + wake_up: Arc::new(wake_up), + }) + } + + /// Return the index corresponding to the name. If it wasn't opened before + /// it'll be opened. But if it doesn't exist on disk it'll throw an + /// `IndexNotFound` error. + pub fn index(&self, name: &str) -> Result { + let rtxn = self.env.read_txn()?; + self.index_mapper.index(&rtxn, name) + } + + /// Returns the tasks corresponding to the query. + pub fn get_tasks(&self, query: Query) -> Result> { + let rtxn = self.env.read_txn()?; + let last_task_id = match self.last_task_id(&rtxn)? { + Some(tid) => query.from.map(|from| from.min(tid)).unwrap_or(tid), + None => return Ok(Vec::new()), + }; + + // This is the list of all the tasks. + let mut tasks = RoaringBitmap::from_iter(0..last_task_id); + + if let Some(status) = query.status { + let mut status_tasks = RoaringBitmap::new(); + for status in status { + status_tasks |= self.get_status(&rtxn, status)?; + } + tasks &= status_tasks; + } + + if let Some(kind) = query.kind { + let mut kind_tasks = RoaringBitmap::new(); + for kind in kind { + kind_tasks |= self.get_kind(&rtxn, kind)?; + } + tasks &= kind_tasks; + } + + if let Some(index) = query.index_uid { + let mut index_tasks = RoaringBitmap::new(); + for index in index { + index_tasks |= self.get_index(&rtxn, &index)?; + } + tasks &= index_tasks; + } + + let tasks = + self.get_existing_tasks(&rtxn, tasks.into_iter().rev().take(query.limit as usize))?; + Ok(tasks.into_iter().map(|task| task.as_task_view()).collect()) + } + + /// Register a new task in the scheduler. If it fails and data was associated with the task + /// it tries to delete the file. + pub fn register(&self, task: KindWithContent) -> Result { + let mut wtxn = self.env.write_txn()?; + + let task = Task { + uid: self.next_task_id(&wtxn)?, + enqueued_at: time::OffsetDateTime::now_utc(), + started_at: None, + finished_at: None, + error: None, + details: None, + status: Status::Enqueued, + kind: task, + }; + + self.all_tasks + .append(&mut wtxn, &BEU32::new(task.uid), &task)?; + + if let Some(indexes) = task.indexes() { + for index in indexes { + self.update_index(&mut wtxn, index, |bitmap| drop(bitmap.insert(task.uid)))?; + } + } + + self.update_status(&mut wtxn, Status::Enqueued, |bitmap| { + bitmap.insert(task.uid); + })?; + + self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| { + (bitmap.insert(task.uid)); + })?; + + // we persist the file in last to be sure everything before was applied successfuly + task.persist()?; + + match wtxn.commit() { + Ok(()) => (), + e @ Err(_) => { + task.remove_data()?; + e?; + } + } + + self.notify(); + + Ok(task.as_task_view()) + } + + /// This worker function must be run in a different thread and must be run only once. + fn run(&self) { + loop { + self.wake_up.wait(); + + let mut wtxn = match self.env.write_txn() { + Ok(wtxn) => wtxn, + Err(e) => { + log::error!("{}", e); + continue; + } + }; + let batch = match self.create_next_batch(&wtxn) { + Ok(Some(batch)) => batch, + Ok(None) => continue, + Err(e) => { + log::error!("{}", e); + continue; + } + }; + // 1. store the starting date with the bitmap of processing tasks + // 2. update the tasks with a starting date *but* do not write anything on disk + + // 3. process the tasks + let _res = self.process_batch(&mut wtxn, batch); + + // 4. store the updated tasks on disk + + // TODO: TAMO: do this later + // must delete the file on disk + // in case of error, must update the tasks with the error + // in case of « success » we must update all the task on disk + // self.handle_batch_result(res); + + match wtxn.commit() { + Ok(()) => log::info!("A batch of tasks was successfully completed."), + Err(e) => { + log::error!("{}", e); + continue; + } + } + } + } + + #[cfg(truc)] + fn process_batch(&self, wtxn: &mut RwTxn, batch: &mut Batch) -> Result<()> { + match batch { + Batch::One(task) => match &task.kind { + KindWithContent::ClearAllDocuments { index_name } => { + self.index(&index_name)?.clear_documents()?; + } + KindWithContent::RenameIndex { + index_name: _, + new_name, + } => { + if self.available_index.get(wtxn, &new_name)?.unwrap_or(false) { + return Err(Error::IndexAlreadyExists(new_name.to_string())); + } + todo!("wait for @guigui insight"); + } + KindWithContent::CreateIndex { + index_name, + primary_key, + } => { + if self + .available_index + .get(wtxn, &index_name)? + .unwrap_or(false) + { + return Err(Error::IndexAlreadyExists(index_name.to_string())); + } + + self.available_index.put(wtxn, &index_name, &true)?; + // TODO: TAMO: give real info to the index + let index = Index::open( + index_name.to_string(), + index_name.to_string(), + 100_000_000, + Arc::default(), + )?; + if let Some(primary_key) = primary_key { + index.update_primary_key(primary_key.to_string())?; + } + self.index_map + .write() + .map_err(|_| Error::CorruptedTaskQueue)? + .insert(index_name.to_string(), index.clone()); + } + KindWithContent::DeleteIndex { index_name } => { + if !self.available_index.delete(wtxn, &index_name)? { + return Err(Error::IndexNotFound(index_name.to_string())); + } + if let Some(index) = self + .index_map + .write() + .map_err(|_| Error::CorruptedTaskQueue)? + .remove(index_name) + { + index.delete()?; + } else { + // TODO: TAMO: fix the path + std::fs::remove_file(index_name)?; + } + } + KindWithContent::SwapIndex { lhs, rhs } => { + if !self.available_index.get(wtxn, &lhs)?.unwrap_or(false) { + return Err(Error::IndexNotFound(lhs.to_string())); + } + if !self.available_index.get(wtxn, &rhs)?.unwrap_or(false) { + return Err(Error::IndexNotFound(rhs.to_string())); + } + + let lhs_bitmap = self.index_tasks.get(wtxn, lhs)?; + let rhs_bitmap = self.index_tasks.get(wtxn, rhs)?; + // the bitmap are lazily created and thus may not exists. + if let Some(bitmap) = rhs_bitmap { + self.index_tasks.put(wtxn, lhs, &bitmap)?; + } + if let Some(bitmap) = lhs_bitmap { + self.index_tasks.put(wtxn, rhs, &bitmap)?; + } + + let mut index_map = self + .index_map + .write() + .map_err(|_| Error::CorruptedTaskQueue)?; + + let lhs_index = index_map.remove(lhs).unwrap(); + let rhs_index = index_map.remove(rhs).unwrap(); + + index_map.insert(lhs.to_string(), rhs_index); + index_map.insert(rhs.to_string(), lhs_index); + } + _ => unreachable!(), + }, + Batch::Cancel(_) => todo!(), + Batch::Snapshot(_) => todo!(), + Batch::Dump(_) => todo!(), + Batch::Contiguous { tasks, kind } => { + // it's safe because you can't batch 0 contiguous tasks. + let first_task = &tasks[0]; + // and the two kind of tasks we batch MUST have ONE index name. + let index_name = first_task.indexes().unwrap()[0]; + let index = self.index(index_name)?; + + match kind { + Kind::DocumentAddition => { + let content_files = tasks.iter().map(|task| match &task.kind { + KindWithContent::DocumentAddition { content_file, .. } => { + content_file.clone() + } + k => unreachable!( + "Internal error, `{:?}` is not supposed to be reachable here", + k.as_kind() + ), + }); + let results = index.update_documents( + IndexDocumentsMethod::UpdateDocuments, + None, + self.file_store.clone(), + content_files, + )?; + + for (task, result) in tasks.iter_mut().zip(results) { + task.finished_at = Some(OffsetDateTime::now_utc()); + match result { + Ok(_) => task.status = Status::Succeeded, + Err(_) => task.status = Status::Succeeded, + } + } + } + Kind::DocumentDeletion => { + let ids: Vec<_> = tasks + .iter() + .flat_map(|task| match &task.kind { + KindWithContent::DocumentDeletion { documents_ids, .. } => { + documents_ids.clone() + } + k => unreachable!( + "Internal error, `{:?}` is not supposed to be reachable here", + k.as_kind() + ), + }) + .collect(); + + let result = index.delete_documents(&ids); + + for task in tasks.iter_mut() { + task.finished_at = Some(OffsetDateTime::now_utc()); + match result { + Ok(_) => task.status = Status::Succeeded, + Err(_) => task.status = Status::Succeeded, + } + } + } + _ => unreachable!(), + } + } + Batch::Empty => todo!(), + } + + Ok(()) + } + + /// Notify the scheduler there is or may be work to do. + pub fn notify(&self) { + self.wake_up.signal() + } +} + +#[cfg(test)] +mod tests { + use tempfile::TempDir; + + use super::*; + + fn new() -> IndexScheduler { + let dir = TempDir::new().unwrap(); + IndexScheduler::new( + dir.path().join("db_path"), + dir.path().join("file_store"), + dir.path().join("indexes"), + 100_000_000, + IndexerConfig::default(), + ) + .unwrap() + } + + #[test] + fn simple_new() { + new(); + } +} diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 011b8ea38..3503c1ca9 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -2,380 +2,16 @@ mod autobatcher; mod batch; pub mod error; mod index_mapper; +mod index_scheduler; pub mod task; mod utils; - -pub use error::Error; -use file_store::FileStore; -use index::Index; -use index_mapper::IndexMapper; -use synchronoise::SignalEvent; -pub use task::Task; -use task::{Kind, Status}; - - - - - -use std::sync::Arc; -use std::{sync::RwLock}; - -use milli::heed::types::{OwnedType, SerdeBincode, Str}; -use milli::heed::{Database, Env}; - -use milli::{RoaringBitmapCodec, BEU32}; -use roaring::RoaringBitmap; -use serde::Deserialize; - pub type Result = std::result::Result; pub type TaskId = u32; -type IndexName = String; -type IndexUuid = String; -const DEFAULT_LIMIT: fn() -> u32 = || 20; - -#[derive(Debug, Clone, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Query { - #[serde(default = "DEFAULT_LIMIT")] - limit: u32, - from: Option, - status: Option>, - #[serde(rename = "type")] - kind: Option>, - index_uid: Option>, -} - -/// This module is responsible for two things; -/// 1. Resolve the name of the indexes. -/// 2. Schedule the tasks. -#[derive(Clone)] -pub struct IndexScheduler { - /// The list of tasks currently processing. - processing_tasks: Arc>, - - file_store: FileStore, - - /// The LMDB environment which the DBs are associated with. - env: Env, - - // The main database, it contains all the tasks accessible by their Id. - all_tasks: Database, SerdeBincode>, - - /// All the tasks ids grouped by their status. - status: Database, RoaringBitmapCodec>, - /// All the tasks ids grouped by their kind. - kind: Database, RoaringBitmapCodec>, - /// Store the tasks associated to an index. - index_tasks: Database, - - /// In charge of creating and returning indexes. - index_mapper: IndexMapper, - - // set to true when there is work to do. - wake_up: Arc, -} - -impl IndexScheduler { - pub fn new() -> Self { - // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things - let _wake_up = SignalEvent::auto(true); - todo!() - } - - /// Return the index corresponding to the name. If it wasn't opened before - /// it'll be opened. But if it doesn't exist on disk it'll throw an - /// `IndexNotFound` error. - pub fn index(&self, name: &str) -> Result { - let rtxn = self.env.read_txn()?; - self.index_mapper.index(&rtxn, name) - } - - /// Returns the tasks corresponding to the query. - pub fn get_tasks(&self, query: Query) -> Result> { - let rtxn = self.env.read_txn()?; - let last_task_id = match self.last_task_id(&rtxn)? { - Some(tid) => query.from.map(|from| from.min(tid)).unwrap_or(tid), - None => return Ok(Vec::new()), - }; - - // This is the list of all the tasks. - let mut tasks = RoaringBitmap::from_iter(0..last_task_id); - - if let Some(status) = query.status { - let mut status_tasks = RoaringBitmap::new(); - for status in status { - status_tasks |= self.get_status(&rtxn, status)?; - } - tasks &= status_tasks; - } - - if let Some(kind) = query.kind { - let mut kind_tasks = RoaringBitmap::new(); - for kind in kind { - kind_tasks |= self.get_kind(&rtxn, kind)?; - } - tasks &= kind_tasks; - } - - if let Some(index) = query.index_uid { - let mut index_tasks = RoaringBitmap::new(); - for index in index { - index_tasks |= self.get_index(&rtxn, &index)?; - } - tasks &= index_tasks; - } - - self.get_existing_tasks(&rtxn, tasks.into_iter().rev().take(query.limit as usize)) - } - - /// Register a new task in the scheduler. If it fails and data was associated with the task - /// it tries to delete the file. - pub fn register(&self, task: Task) -> Result<()> { - let mut wtxn = self.env.write_txn()?; - - let task_id = self.next_task_id(&wtxn)?; - - self.all_tasks - .append(&mut wtxn, &BEU32::new(task_id), &task)?; - - if let Some(indexes) = task.indexes() { - for index in indexes { - self.update_index(&mut wtxn, index, |bitmap| drop(bitmap.insert(task_id)))?; - } - } - - self.update_status(&mut wtxn, Status::Enqueued, |bitmap| { - bitmap.insert(task_id); - })?; - - self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| { - (bitmap.insert(task_id)); - })?; - - // we persist the file in last to be sure everything before was applied successfuly - task.persist()?; - - match wtxn.commit() { - Ok(()) => (), - e @ Err(_) => { - task.remove_data()?; - e?; - } - } - - self.notify(); - - Ok(()) - } - - /// This worker function must be run in a different thread and must be run only once. - fn run(&self) { - loop { - self.wake_up.wait(); - - let mut wtxn = match self.env.write_txn() { - Ok(wtxn) => wtxn, - Err(e) => { - log::error!("{}", e); - continue; - } - }; - let batch = match self.create_next_batch(&wtxn) { - Ok(Some(batch)) => batch, - Ok(None) => continue, - Err(e) => { - log::error!("{}", e); - continue; - } - }; - // 1. store the starting date with the bitmap of processing tasks - // 2. update the tasks with a starting date *but* do not write anything on disk - - // 3. process the tasks - let _res = self.process_batch(&mut wtxn, batch); - - // 4. store the updated tasks on disk - - // TODO: TAMO: do this later - // must delete the file on disk - // in case of error, must update the tasks with the error - // in case of « success » we must update all the task on disk - // self.handle_batch_result(res); - - match wtxn.commit() { - Ok(()) => log::info!("A batch of tasks was successfully completed."), - Err(e) => { - log::error!("{}", e); - continue; - } - } - } - } - - #[cfg(truc)] - fn process_batch(&self, wtxn: &mut RwTxn, batch: &mut Batch) -> Result<()> { - match batch { - Batch::One(task) => match &task.kind { - KindWithContent::ClearAllDocuments { index_name } => { - self.index(&index_name)?.clear_documents()?; - } - KindWithContent::RenameIndex { - index_name: _, - new_name, - } => { - if self.available_index.get(wtxn, &new_name)?.unwrap_or(false) { - return Err(Error::IndexAlreadyExists(new_name.to_string())); - } - todo!("wait for @guigui insight"); - } - KindWithContent::CreateIndex { - index_name, - primary_key, - } => { - if self - .available_index - .get(wtxn, &index_name)? - .unwrap_or(false) - { - return Err(Error::IndexAlreadyExists(index_name.to_string())); - } - - self.available_index.put(wtxn, &index_name, &true)?; - // TODO: TAMO: give real info to the index - let index = Index::open( - index_name.to_string(), - index_name.to_string(), - 100_000_000, - Arc::default(), - )?; - if let Some(primary_key) = primary_key { - index.update_primary_key(primary_key.to_string())?; - } - self.index_map - .write() - .map_err(|_| Error::CorruptedTaskQueue)? - .insert(index_name.to_string(), index.clone()); - } - KindWithContent::DeleteIndex { index_name } => { - if !self.available_index.delete(wtxn, &index_name)? { - return Err(Error::IndexNotFound(index_name.to_string())); - } - if let Some(index) = self - .index_map - .write() - .map_err(|_| Error::CorruptedTaskQueue)? - .remove(index_name) - { - index.delete()?; - } else { - // TODO: TAMO: fix the path - std::fs::remove_file(index_name)?; - } - } - KindWithContent::SwapIndex { lhs, rhs } => { - if !self.available_index.get(wtxn, &lhs)?.unwrap_or(false) { - return Err(Error::IndexNotFound(lhs.to_string())); - } - if !self.available_index.get(wtxn, &rhs)?.unwrap_or(false) { - return Err(Error::IndexNotFound(rhs.to_string())); - } - - let lhs_bitmap = self.index_tasks.get(wtxn, lhs)?; - let rhs_bitmap = self.index_tasks.get(wtxn, rhs)?; - // the bitmap are lazily created and thus may not exists. - if let Some(bitmap) = rhs_bitmap { - self.index_tasks.put(wtxn, lhs, &bitmap)?; - } - if let Some(bitmap) = lhs_bitmap { - self.index_tasks.put(wtxn, rhs, &bitmap)?; - } - - let mut index_map = self - .index_map - .write() - .map_err(|_| Error::CorruptedTaskQueue)?; - - let lhs_index = index_map.remove(lhs).unwrap(); - let rhs_index = index_map.remove(rhs).unwrap(); - - index_map.insert(lhs.to_string(), rhs_index); - index_map.insert(rhs.to_string(), lhs_index); - } - _ => unreachable!(), - }, - Batch::Cancel(_) => todo!(), - Batch::Snapshot(_) => todo!(), - Batch::Dump(_) => todo!(), - Batch::Contiguous { tasks, kind } => { - // it's safe because you can't batch 0 contiguous tasks. - let first_task = &tasks[0]; - // and the two kind of tasks we batch MUST have ONE index name. - let index_name = first_task.indexes().unwrap()[0]; - let index = self.index(index_name)?; - - match kind { - Kind::DocumentAddition => { - let content_files = tasks.iter().map(|task| match &task.kind { - KindWithContent::DocumentAddition { content_file, .. } => { - content_file.clone() - } - k => unreachable!( - "Internal error, `{:?}` is not supposed to be reachable here", - k.as_kind() - ), - }); - let results = index.update_documents( - IndexDocumentsMethod::UpdateDocuments, - None, - self.file_store.clone(), - content_files, - )?; - - for (task, result) in tasks.iter_mut().zip(results) { - task.finished_at = Some(OffsetDateTime::now_utc()); - match result { - Ok(_) => task.status = Status::Succeeded, - Err(_) => task.status = Status::Succeeded, - } - } - } - Kind::DocumentDeletion => { - let ids: Vec<_> = tasks - .iter() - .flat_map(|task| match &task.kind { - KindWithContent::DocumentDeletion { documents_ids, .. } => { - documents_ids.clone() - } - k => unreachable!( - "Internal error, `{:?}` is not supposed to be reachable here", - k.as_kind() - ), - }) - .collect(); - - let result = index.delete_documents(&ids); - - for task in tasks.iter_mut() { - task.finished_at = Some(OffsetDateTime::now_utc()); - match result { - Ok(_) => task.status = Status::Succeeded, - Err(_) => task.status = Status::Succeeded, - } - } - } - _ => unreachable!(), - } - } - Batch::Empty => todo!(), - } - - Ok(()) - } - - /// Notify the scheduler there is or may be work to do. - pub fn notify(&self) { - self.wake_up.signal() - } -} +pub use crate::index_scheduler::IndexScheduler; +pub use error::Error; +/// from the exterior you don't need to know there is multiple type of `Kind` +pub use task::KindWithContent as TaskKind; +/// from the exterior you don't need to know there is multiple type of `Task` +pub use task::TaskView as Task; diff --git a/index-scheduler/src/task.rs b/index-scheduler/src/task.rs index 37ffa0e78..6d51d33cb 100644 --- a/index-scheduler/src/task.rs +++ b/index-scheduler/src/task.rs @@ -1,9 +1,9 @@ use anyhow::Result; use index::{Settings, Unchecked}; -use serde::{Deserialize, Serialize}; -use std::path::PathBuf; -use time::OffsetDateTime; +use serde::{Deserialize, Serialize, Serializer}; +use std::{fmt::Write, path::PathBuf}; +use time::{Duration, OffsetDateTime}; use uuid::Uuid; use crate::TaskId; @@ -17,6 +17,38 @@ pub enum Status { Failed, } +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Error { + message: String, + code: String, + #[serde(rename = "type")] + kind: String, + link: String, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TaskView { + pub uid: TaskId, + pub index_uid: Option, + pub status: Status, + #[serde(rename = "type")] + pub kind: Kind, + + pub details: Option
, + pub error: Option, + + #[serde(serialize_with = "serialize_duration")] + pub duration: Option, + #[serde(with = "time::serde::rfc3339")] + pub enqueued_at: OffsetDateTime, + #[serde(with = "time::serde::rfc3339::option")] + pub started_at: Option, + #[serde(with = "time::serde::rfc3339::option")] + pub finished_at: Option, +} + #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Task { @@ -29,8 +61,8 @@ pub struct Task { #[serde(with = "time::serde::rfc3339::option")] pub finished_at: Option, - pub error: Option, - pub info: Option, + pub error: Option, + pub details: Option
, pub status: Status, pub kind: KindWithContent, @@ -51,6 +83,27 @@ impl Task { pub fn indexes(&self) -> Option> { self.kind.indexes() } + + /// Convert a Task to a TaskView + pub fn as_task_view(&self) -> TaskView { + TaskView { + uid: self.uid, + index_uid: self + .indexes() + .and_then(|vec| vec.first().map(|i| i.to_string())), + status: self.status, + kind: self.kind.as_kind(), + details: self.details.clone(), + error: self.error.clone(), + duration: self + .started_at + .zip(self.finished_at) + .map(|(start, end)| end - start), + enqueued_at: self.enqueued_at, + started_at: self.started_at, + finished_at: self.finished_at, + } + } } #[derive(Debug, Serialize, Deserialize)] @@ -215,3 +268,81 @@ pub enum Kind { DumpExport, Snapshot, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] +#[allow(clippy::large_enum_variant)] +pub enum Details { + #[serde(rename_all = "camelCase")] + DocumentAddition { + received_documents: usize, + indexed_documents: Option, + }, + #[serde(rename_all = "camelCase")] + Settings { + #[serde(flatten)] + settings: Settings, + }, + #[serde(rename_all = "camelCase")] + IndexInfo { primary_key: Option }, + #[serde(rename_all = "camelCase")] + DocumentDeletion { + received_document_ids: usize, + deleted_documents: Option, + }, + #[serde(rename_all = "camelCase")] + ClearAll { deleted_documents: Option }, + #[serde(rename_all = "camelCase")] + Dump { dump_uid: String }, +} + +/// Serialize a `time::Duration` as a best effort ISO 8601 while waiting for +/// https://github.com/time-rs/time/issues/378. +/// This code is a port of the old code of time that was removed in 0.2. +fn serialize_duration( + duration: &Option, + serializer: S, +) -> Result { + match duration { + Some(duration) => { + // technically speaking, negative duration is not valid ISO 8601 + if duration.is_negative() { + return serializer.serialize_none(); + } + + const SECS_PER_DAY: i64 = Duration::DAY.whole_seconds(); + let secs = duration.whole_seconds(); + let days = secs / SECS_PER_DAY; + let secs = secs - days * SECS_PER_DAY; + let hasdate = days != 0; + let nanos = duration.subsec_nanoseconds(); + let hastime = (secs != 0 || nanos != 0) || !hasdate; + + // all the following unwrap can't fail + let mut res = String::new(); + write!(&mut res, "P").unwrap(); + + if hasdate { + write!(&mut res, "{}D", days).unwrap(); + } + + const NANOS_PER_MILLI: i32 = Duration::MILLISECOND.subsec_nanoseconds(); + const NANOS_PER_MICRO: i32 = Duration::MICROSECOND.subsec_nanoseconds(); + + if hastime { + if nanos == 0 { + write!(&mut res, "T{}S", secs).unwrap(); + } else if nanos % NANOS_PER_MILLI == 0 { + write!(&mut res, "T{}.{:03}S", secs, nanos / NANOS_PER_MILLI).unwrap(); + } else if nanos % NANOS_PER_MICRO == 0 { + write!(&mut res, "T{}.{:06}S", secs, nanos / NANOS_PER_MICRO).unwrap(); + } else { + write!(&mut res, "T{}.{:09}S", secs, nanos).unwrap(); + } + } + + serializer.serialize_str(&res) + } + None => serializer.serialize_none(), + } +} diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs index ca52de038..effb81a33 100644 --- a/index-scheduler/src/utils.rs +++ b/index-scheduler/src/utils.rs @@ -7,8 +7,8 @@ use milli::{ use roaring::RoaringBitmap; use crate::{ - task::{Kind, Status}, - Error, IndexScheduler, Result, Task, TaskId, + task::{Kind, Status, Task}, + Error, IndexScheduler, Result, TaskId, }; impl IndexScheduler { diff --git a/meilisearch-lib/Cargo.toml b/meilisearch-lib/Cargo.toml index dbaf8faa2..7883b6490 100644 --- a/meilisearch-lib/Cargo.toml +++ b/meilisearch-lib/Cargo.toml @@ -55,6 +55,9 @@ tokio = { version = "1.21.2", features = ["full"] } uuid = { version = "1.1.2", features = ["serde", "v4"] } walkdir = "2.3.2" whoami = { version = "1.2.3", optional = true } +index-scheduler = { path = "../index-scheduler" } +index = { path = "../index" } +file-store = { path = "../file-store" } [dev-dependencies] actix-rt = "2.7.0" diff --git a/meilisearch-lib/src/index/error.rs b/meilisearch-lib/src/index/error.rs deleted file mode 100644 index f795ceaa4..000000000 --- a/meilisearch-lib/src/index/error.rs +++ /dev/null @@ -1,61 +0,0 @@ -use std::error::Error; - -use meilisearch_types::error::{Code, ErrorCode}; -use meilisearch_types::internal_error; -use serde_json::Value; - -use crate::{error::MilliError, update_file_store}; - -pub type Result = std::result::Result; - -#[derive(Debug, thiserror::Error)] -pub enum IndexError { - #[error("An internal error has occurred. `{0}`.")] - Internal(Box), - #[error("Document `{0}` not found.")] - DocumentNotFound(String), - #[error("{0}")] - Facet(#[from] FacetError), - #[error("{0}")] - Milli(#[from] milli::Error), -} - -internal_error!( - IndexError: std::io::Error, - milli::heed::Error, - fst::Error, - serde_json::Error, - update_file_store::UpdateFileStoreError, - milli::documents::Error -); - -impl ErrorCode for IndexError { - fn error_code(&self) -> Code { - match self { - IndexError::Internal(_) => Code::Internal, - IndexError::DocumentNotFound(_) => Code::DocumentNotFound, - IndexError::Facet(e) => e.error_code(), - IndexError::Milli(e) => MilliError(e).error_code(), - } - } -} - -impl From for IndexError { - fn from(error: milli::UserError) -> IndexError { - IndexError::Milli(error.into()) - } -} - -#[derive(Debug, thiserror::Error)] -pub enum FacetError { - #[error("Invalid syntax for the filter parameter: `expected {}, found: {1}`.", .0.join(", "))] - InvalidExpression(&'static [&'static str], Value), -} - -impl ErrorCode for FacetError { - fn error_code(&self) -> Code { - match self { - FacetError::InvalidExpression(_, _) => Code::Filter, - } - } -} diff --git a/meilisearch-lib/src/index_controller/error.rs b/meilisearch-lib/src/index_controller/error.rs index ab2dd142d..2e74298b6 100644 --- a/meilisearch-lib/src/index_controller/error.rs +++ b/meilisearch-lib/src/index_controller/error.rs @@ -7,12 +7,8 @@ use tokio::task::JoinError; use super::DocumentAdditionFormat; use crate::document_formats::DocumentFormatError; -use crate::dump::error::DumpError; -use crate::index::error::IndexError; -use crate::tasks::error::TaskError; -use crate::update_file_store::UpdateFileStoreError; - -use crate::index_resolver::error::IndexResolverError; +// use crate::dump::error::DumpError; +use index::error::IndexError; pub type Result = std::result::Result; @@ -20,17 +16,15 @@ pub type Result = std::result::Result; pub enum IndexControllerError { #[error("Index creation must have an uid")] MissingUid, - #[error("{0}")] - IndexResolver(#[from] IndexResolverError), - #[error("{0}")] + #[error(transparent)] + IndexResolver(#[from] index_scheduler::Error), + #[error(transparent)] IndexError(#[from] IndexError), #[error("An internal error has occurred. `{0}`.")] Internal(Box), - #[error("{0}")] - TaskError(#[from] TaskError), - #[error("{0}")] - DumpError(#[from] DumpError), - #[error("{0}")] + // #[error("{0}")] + // DumpError(#[from] DumpError), + #[error(transparent)] DocumentFormatError(#[from] DocumentFormatError), #[error("A {0} payload is missing.")] MissingPayload(DocumentAdditionFormat), @@ -38,7 +32,7 @@ pub enum IndexControllerError { PayloadTooLarge, } -internal_error!(IndexControllerError: JoinError, UpdateFileStoreError); +internal_error!(IndexControllerError: JoinError, file_store::Error); impl From for IndexControllerError { fn from(other: actix_web::error::PayloadError) -> Self { @@ -53,20 +47,20 @@ impl ErrorCode for IndexControllerError { fn error_code(&self) -> Code { match self { IndexControllerError::MissingUid => Code::BadRequest, - IndexControllerError::IndexResolver(e) => e.error_code(), - IndexControllerError::IndexError(e) => e.error_code(), IndexControllerError::Internal(_) => Code::Internal, - IndexControllerError::TaskError(e) => e.error_code(), IndexControllerError::DocumentFormatError(e) => e.error_code(), IndexControllerError::MissingPayload(_) => Code::MissingPayload, IndexControllerError::PayloadTooLarge => Code::PayloadTooLarge, - IndexControllerError::DumpError(e) => e.error_code(), + IndexControllerError::IndexResolver(_) => todo!(), + IndexControllerError::IndexError(_) => todo!(), } } } +/* impl From for IndexControllerError { fn from(err: IndexUidFormatError) -> Self { - IndexResolverError::from(err).into() + index_scheduler::Error::from(err).into() } } +*/ diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index 87644a44a..ab5372908 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -1,4 +1,3 @@ -use meilisearch_auth::SearchRules; use std::collections::BTreeMap; use std::fmt; use std::io::Cursor; @@ -9,10 +8,14 @@ use std::time::Duration; use actix_web::error::PayloadError; use bytes::Bytes; +use file_store::FileStore; use futures::Stream; use futures::StreamExt; +use index_scheduler::IndexScheduler; +use index_scheduler::TaskKind; +use meilisearch_auth::SearchRules; use meilisearch_types::index_uid::IndexUid; -use milli::update::IndexDocumentsMethod; +use milli::update::{IndexDocumentsMethod, IndexerConfig}; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use tokio::sync::RwLock; @@ -21,32 +24,19 @@ use tokio::time::sleep; use uuid::Uuid; use crate::document_formats::{read_csv, read_json, read_ndjson}; -use crate::dump::{self, load_dump, DumpHandler}; -use crate::index::{ - Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked, -}; -use crate::index_resolver::error::IndexResolverError; +// use crate::dump::{self, load_dump, DumpHandler}; use crate::options::{IndexerOpts, SchedulerConfig}; use crate::snapshot::{load_snapshot, SnapshotService}; -use crate::tasks::error::TaskError; -use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskId}; -use crate::tasks::{ - BatchHandler, EmptyBatchHandler, Scheduler, SnapshotHandler, TaskFilter, TaskStore, -}; use error::Result; +use index::{ + Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked, +}; use self::error::IndexControllerError; -use crate::index_resolver::index_store::{IndexStore, MapIndexStore}; -use crate::index_resolver::meta_store::{HeedMetaStore, IndexMetaStore}; -use crate::index_resolver::{create_index_resolver, IndexResolver}; -use crate::update_file_store::UpdateFileStore; pub mod error; pub mod versioning; -/// Concrete implementation of the IndexController, exposed by meilisearch-lib -pub type MeiliSearch = IndexController; - pub type Payload = Box< dyn Stream> + Send + Sync + 'static + Unpin, >; @@ -74,23 +64,9 @@ pub struct IndexSettings { pub primary_key: Option, } -pub struct IndexController { - pub index_resolver: Arc>, - scheduler: Arc>, - task_store: TaskStore, - pub update_file_store: UpdateFileStore, -} - -/// Need a custom implementation for clone because deriving require that U and I are clone. -impl Clone for IndexController { - fn clone(&self) -> Self { - Self { - index_resolver: self.index_resolver.clone(), - scheduler: self.scheduler.clone(), - update_file_store: self.update_file_store.clone(), - task_store: self.task_store.clone(), - } - } +#[derive(Clone)] +pub struct Meilisearch { + index_scheduler: IndexScheduler, } #[derive(Debug)] @@ -170,7 +146,7 @@ impl IndexControllerBuilder { db_path: impl AsRef, indexer_options: IndexerOpts, scheduler_config: SchedulerConfig, - ) -> anyhow::Result { + ) -> anyhow::Result { let index_size = self .max_index_size .ok_or_else(|| anyhow::anyhow!("Missing index size"))?; @@ -178,6 +154,8 @@ impl IndexControllerBuilder { .max_task_store_size .ok_or_else(|| anyhow::anyhow!("Missing update database size"))?; + /* + TODO: TAMO: enable dumps and snapshots to happens if let Some(ref path) = self.import_snapshot { log::info!("Loading from snapshot {:?}", path); load_snapshot( @@ -203,47 +181,35 @@ impl IndexControllerBuilder { versioning::check_version_file(db_path.as_ref())?; } } + */ std::fs::create_dir_all(db_path.as_ref())?; let meta_env = Arc::new(open_meta_env(db_path.as_ref(), task_store_size)?); - let update_file_store = UpdateFileStore::new(&db_path)?; + let file_store = FileStore::new(&db_path)?; // Create or overwrite the version file for this DB versioning::create_version_file(db_path.as_ref())?; - let index_resolver = Arc::new(create_index_resolver( - &db_path, + let indexer_config = IndexerConfig { + log_every_n: Some(indexer_options.log_every_n), + max_nb_chunks: indexer_options.max_nb_chunks, + documents_chunk_size: None, + // TODO: TAMO: Fix this thing + max_memory: None, // Some(indexer_options.max_indexing_memory.into()), + chunk_compression_type: milli::CompressionType::None, + chunk_compression_level: None, + // TODO: TAMO: do something with the indexing_config.max_indexing_threads + thread_pool: None, + max_positions_per_attributes: None, + }; + + let scheduler = IndexScheduler::new( + db_path.as_ref().to_path_buf(), index_size, - &indexer_options, - meta_env.clone(), - update_file_store.clone(), - )?); - - let dump_path = self - .dump_dst - .ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?; - - let dump_handler = Arc::new(DumpHandler::new( - dump_path, - db_path.as_ref().into(), - update_file_store.clone(), - task_store_size, - index_size, - meta_env.clone(), - index_resolver.clone(), - )); - let task_store = TaskStore::new(meta_env)?; - - // register all the batch handlers for use with the scheduler. - let handlers: Vec> = vec![ - index_resolver.clone(), - dump_handler, - Arc::new(SnapshotHandler), - // dummy handler to catch all empty batches - Arc::new(EmptyBatchHandler), - ]; - let scheduler = Scheduler::new(task_store.clone(), handlers, scheduler_config)?; + indexer_config, + file_store, + ); if self.schedule_snapshot { let snapshot_period = self @@ -265,11 +231,8 @@ impl IndexControllerBuilder { tokio::task::spawn_local(snapshot_service.run()); } - Ok(IndexController { - index_resolver, - scheduler, - update_file_store, - task_store, + Ok(Meilisearch { + index_scheduler: scheduler, }) } @@ -350,100 +313,13 @@ impl IndexControllerBuilder { } } -impl IndexController -where - U: IndexMetaStore, - I: IndexStore, -{ +impl Meilisearch { pub fn builder() -> IndexControllerBuilder { IndexControllerBuilder::default() } - pub async fn register_update(&self, uid: String, update: Update) -> Result { - let index_uid = IndexUid::from_str(&uid).map_err(IndexResolverError::from)?; - let content = match update { - Update::DeleteDocuments(ids) => TaskContent::DocumentDeletion { - index_uid, - deletion: DocumentDeletion::Ids(ids), - }, - Update::ClearDocuments => TaskContent::DocumentDeletion { - index_uid, - deletion: DocumentDeletion::Clear, - }, - Update::Settings { - settings, - is_deletion, - allow_index_creation, - } => TaskContent::SettingsUpdate { - settings, - is_deletion, - allow_index_creation, - index_uid, - }, - Update::DocumentAddition { - mut payload, - primary_key, - format, - method, - allow_index_creation, - } => { - let mut buffer = Vec::new(); - while let Some(bytes) = payload.next().await { - let bytes = bytes?; - buffer.extend_from_slice(&bytes); - } - let (content_uuid, mut update_file) = self.update_file_store.new_update()?; - let documents_count = tokio::task::spawn_blocking(move || -> Result<_> { - // check if the payload is empty, and return an error - if buffer.is_empty() { - return Err(IndexControllerError::MissingPayload(format)); - } - - let reader = Cursor::new(buffer); - let count = match format { - DocumentAdditionFormat::Json => read_json(reader, &mut *update_file)?, - DocumentAdditionFormat::Csv => read_csv(reader, &mut *update_file)?, - DocumentAdditionFormat::Ndjson => read_ndjson(reader, &mut *update_file)?, - }; - - update_file.persist()?; - - Ok(count) - }) - .await??; - - TaskContent::DocumentAddition { - content_uuid, - merge_strategy: method, - primary_key, - documents_count, - allow_index_creation, - index_uid, - } - } - Update::DeleteIndex => TaskContent::IndexDeletion { index_uid }, - Update::CreateIndex { primary_key } => TaskContent::IndexCreation { - primary_key, - index_uid, - }, - Update::UpdateIndex { primary_key } => TaskContent::IndexUpdate { - primary_key, - index_uid, - }, - }; - - let task = self.task_store.register(content).await?; - self.scheduler.read().await.notify(); - - Ok(task) - } - - pub async fn register_dump_task(&self) -> Result { - let uid = dump::generate_uid(); - let content = TaskContent::Dump { uid }; - let task = self.task_store.register(content).await?; - self.scheduler.read().await.notify(); - Ok(task) + pub async fn register_task(&self, task: TaskKind) -> Result { + Ok(self.index_scheduler.register(task).await?) } pub async fn get_task(&self, id: TaskId, filter: Option) -> Result { @@ -652,6 +528,9 @@ fn clamp_to_page_size(size: usize) -> usize { size / page_size::get() * page_size::get() } +/* +TODO: TAMO: uncomment this test + #[cfg(test)] mod test { use futures::future::ok; @@ -669,22 +548,6 @@ mod test { use super::*; - impl IndexController { - pub fn mock( - index_resolver: Arc>, - task_store: TaskStore, - update_file_store: UpdateFileStore, - scheduler: Arc>, - ) -> Self { - IndexController { - index_resolver, - task_store, - update_file_store, - scheduler, - } - } - } - #[actix_rt::test] async fn test_search_simple() { let index_uid = "test"; @@ -781,3 +644,4 @@ mod test { assert_eq!(r, result); } } +*/ diff --git a/meilisearch-lib/src/index_resolver/error.rs b/meilisearch-lib/src/index_resolver/error.rs deleted file mode 100644 index d973d2229..000000000 --- a/meilisearch-lib/src/index_resolver/error.rs +++ /dev/null @@ -1,71 +0,0 @@ -use std::fmt; - -use meilisearch_types::error::{Code, ErrorCode}; -use meilisearch_types::index_uid::IndexUidFormatError; -use meilisearch_types::internal_error; -use tokio::sync::mpsc::error::SendError as MpscSendError; -use tokio::sync::oneshot::error::RecvError as OneshotRecvError; -use uuid::Uuid; - -use crate::{error::MilliError, index::error::IndexError, update_file_store::UpdateFileStoreError}; - -pub type Result = std::result::Result; - -#[derive(thiserror::Error, Debug)] -pub enum IndexResolverError { - #[error("{0}")] - IndexError(#[from] IndexError), - #[error("Index `{0}` already exists.")] - IndexAlreadyExists(String), - #[error("Index `{0}` not found.")] - UnexistingIndex(String), - #[error("A primary key is already present. It's impossible to update it")] - ExistingPrimaryKey, - #[error("An internal error has occurred. `{0}`.")] - Internal(Box), - #[error("The creation of the `{0}` index has failed due to `Index uuid is already assigned`.")] - UuidAlreadyExists(Uuid), - #[error("{0}")] - Milli(#[from] milli::Error), - #[error("{0}")] - BadlyFormatted(#[from] IndexUidFormatError), -} - -impl From> for IndexResolverError -where - T: Send + Sync + 'static + fmt::Debug, -{ - fn from(other: tokio::sync::mpsc::error::SendError) -> Self { - Self::Internal(Box::new(other)) - } -} - -impl From for IndexResolverError { - fn from(other: tokio::sync::oneshot::error::RecvError) -> Self { - Self::Internal(Box::new(other)) - } -} - -internal_error!( - IndexResolverError: milli::heed::Error, - uuid::Error, - std::io::Error, - tokio::task::JoinError, - serde_json::Error, - UpdateFileStoreError -); - -impl ErrorCode for IndexResolverError { - fn error_code(&self) -> Code { - match self { - IndexResolverError::IndexError(e) => e.error_code(), - IndexResolverError::IndexAlreadyExists(_) => Code::IndexAlreadyExists, - IndexResolverError::UnexistingIndex(_) => Code::IndexNotFound, - IndexResolverError::ExistingPrimaryKey => Code::PrimaryKeyAlreadyPresent, - IndexResolverError::Internal(_) => Code::Internal, - IndexResolverError::UuidAlreadyExists(_) => Code::CreateIndex, - IndexResolverError::Milli(e) => MilliError(e).error_code(), - IndexResolverError::BadlyFormatted(_) => Code::InvalidIndexUid, - } - } -} diff --git a/meilisearch-lib/src/index_resolver/meta_store.rs b/meilisearch-lib/src/index_resolver/meta_store.rs deleted file mode 100644 index f335d9923..000000000 --- a/meilisearch-lib/src/index_resolver/meta_store.rs +++ /dev/null @@ -1,223 +0,0 @@ -use std::collections::HashSet; -use std::fs::{create_dir_all, File}; -use std::io::{BufRead, BufReader, Write}; -use std::path::{Path, PathBuf}; -use std::sync::Arc; -use walkdir::WalkDir; - -use milli::heed::types::{SerdeBincode, Str}; -use milli::heed::{CompactionOption, Database, Env}; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -use super::error::{IndexResolverError, Result}; -use crate::tasks::task::TaskId; - -#[derive(Serialize, Deserialize)] -pub struct DumpEntry { - pub uid: String, - pub index_meta: IndexMeta, -} - -const UUIDS_DB_PATH: &str = "index_uuids"; - -#[async_trait::async_trait] -#[cfg_attr(test, mockall::automock)] -pub trait IndexMetaStore: Sized { - // Create a new entry for `name`. Return an error if `err` and the entry already exists, return - // the uuid otherwise. - async fn get(&self, uid: String) -> Result<(String, Option)>; - async fn delete(&self, uid: String) -> Result>; - async fn list(&self) -> Result>; - async fn insert(&self, name: String, meta: IndexMeta) -> Result<()>; - async fn snapshot(&self, path: PathBuf) -> Result>; - async fn get_size(&self) -> Result; - async fn dump(&self, path: PathBuf) -> Result<()>; -} - -#[derive(Serialize, Deserialize, Debug, Clone)] -pub struct IndexMeta { - pub uuid: Uuid, - pub creation_task_id: TaskId, -} - -#[derive(Clone)] -pub struct HeedMetaStore { - env: Arc, - db: Database>, -} - -impl Drop for HeedMetaStore { - fn drop(&mut self) { - if Arc::strong_count(&self.env) == 1 { - self.env.as_ref().clone().prepare_for_closing(); - } - } -} - -impl HeedMetaStore { - pub fn new(env: Arc) -> Result { - let db = env.create_database(Some("uuids"))?; - Ok(Self { env, db }) - } - - fn get(&self, name: &str) -> Result> { - let env = self.env.clone(); - let db = self.db; - let txn = env.read_txn()?; - match db.get(&txn, name)? { - Some(meta) => Ok(Some(meta)), - None => Ok(None), - } - } - - fn delete(&self, uid: String) -> Result> { - let env = self.env.clone(); - let db = self.db; - let mut txn = env.write_txn()?; - match db.get(&txn, &uid)? { - Some(meta) => { - db.delete(&mut txn, &uid)?; - txn.commit()?; - Ok(Some(meta)) - } - None => Ok(None), - } - } - - fn list(&self) -> Result> { - let env = self.env.clone(); - let db = self.db; - let txn = env.read_txn()?; - let mut entries = Vec::new(); - for entry in db.iter(&txn)? { - let (name, meta) = entry?; - entries.push((name.to_string(), meta)) - } - Ok(entries) - } - - pub(crate) fn insert(&self, name: String, meta: IndexMeta) -> Result<()> { - let env = self.env.clone(); - let db = self.db; - let mut txn = env.write_txn()?; - - if db.get(&txn, &name)?.is_some() { - return Err(IndexResolverError::IndexAlreadyExists(name)); - } - - db.put(&mut txn, &name, &meta)?; - txn.commit()?; - Ok(()) - } - - fn snapshot(&self, mut path: PathBuf) -> Result> { - // Write transaction to acquire a lock on the database. - let txn = self.env.write_txn()?; - let mut entries = HashSet::new(); - for entry in self.db.iter(&txn)? { - let (_, IndexMeta { uuid, .. }) = entry?; - entries.insert(uuid); - } - - // only perform snapshot if there are indexes - if !entries.is_empty() { - path.push(UUIDS_DB_PATH); - create_dir_all(&path).unwrap(); - path.push("data.mdb"); - self.env.copy_to_path(path, CompactionOption::Enabled)?; - } - Ok(entries) - } - - fn get_size(&self) -> Result { - Ok(WalkDir::new(self.env.path()) - .into_iter() - .filter_map(|entry| entry.ok()) - .filter_map(|entry| entry.metadata().ok()) - .filter(|metadata| metadata.is_file()) - .fold(0, |acc, m| acc + m.len())) - } - - pub fn dump(&self, path: PathBuf) -> Result<()> { - let dump_path = path.join(UUIDS_DB_PATH); - create_dir_all(&dump_path)?; - let dump_file_path = dump_path.join("data.jsonl"); - let mut dump_file = File::create(&dump_file_path)?; - - let txn = self.env.read_txn()?; - for entry in self.db.iter(&txn)? { - let (uid, index_meta) = entry?; - let uid = uid.to_string(); - - let entry = DumpEntry { uid, index_meta }; - serde_json::to_writer(&mut dump_file, &entry)?; - dump_file.write_all(b"\n").unwrap(); - } - - Ok(()) - } - - pub fn load_dump(src: impl AsRef, env: Arc) -> Result<()> { - let src_indexes = src.as_ref().join(UUIDS_DB_PATH).join("data.jsonl"); - let indexes = File::open(&src_indexes)?; - let mut indexes = BufReader::new(indexes); - let mut line = String::new(); - - let db = Self::new(env)?; - let mut txn = db.env.write_txn()?; - - loop { - match indexes.read_line(&mut line) { - Ok(0) => break, - Ok(_) => { - let DumpEntry { uid, index_meta } = serde_json::from_str(&line)?; - db.db.put(&mut txn, &uid, &index_meta)?; - } - Err(e) => return Err(e.into()), - } - - line.clear(); - } - txn.commit()?; - - Ok(()) - } -} - -#[async_trait::async_trait] -impl IndexMetaStore for HeedMetaStore { - async fn get(&self, name: String) -> Result<(String, Option)> { - let this = self.clone(); - tokio::task::spawn_blocking(move || this.get(&name).map(|res| (name, res))).await? - } - - async fn delete(&self, uid: String) -> Result> { - let this = self.clone(); - tokio::task::spawn_blocking(move || this.delete(uid)).await? - } - - async fn list(&self) -> Result> { - let this = self.clone(); - tokio::task::spawn_blocking(move || this.list()).await? - } - - async fn insert(&self, name: String, meta: IndexMeta) -> Result<()> { - let this = self.clone(); - tokio::task::spawn_blocking(move || this.insert(name, meta)).await? - } - - async fn snapshot(&self, path: PathBuf) -> Result> { - let this = self.clone(); - tokio::task::spawn_blocking(move || this.snapshot(path)).await? - } - - async fn get_size(&self) -> Result { - self.get_size() - } - - async fn dump(&self, path: PathBuf) -> Result<()> { - let this = self.clone(); - Ok(tokio::task::spawn_blocking(move || this.dump(path)).await??) - } -} diff --git a/meilisearch-lib/src/index_resolver/mod.rs b/meilisearch-lib/src/index_resolver/mod.rs deleted file mode 100644 index 284f64942..000000000 --- a/meilisearch-lib/src/index_resolver/mod.rs +++ /dev/null @@ -1,685 +0,0 @@ -pub mod error; -pub mod index_store; -pub mod meta_store; - -use std::convert::TryFrom; -use std::path::Path; -use std::sync::Arc; - -use error::{IndexResolverError, Result}; -use index_store::{IndexStore, MapIndexStore}; -use meilisearch_types::error::ResponseError; -use meilisearch_types::index_uid::IndexUid; -use meta_store::{HeedMetaStore, IndexMetaStore}; -use milli::heed::Env; -use milli::update::{DocumentDeletionResult, IndexerConfig}; -use time::OffsetDateTime; -use tokio::task::spawn_blocking; -use uuid::Uuid; - -use crate::index::{error::Result as IndexResult, Index}; -use crate::options::IndexerOpts; -use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskEvent, TaskId, TaskResult}; -use crate::update_file_store::UpdateFileStore; - -use self::meta_store::IndexMeta; - -pub type HardStateIndexResolver = IndexResolver; - -#[cfg(not(test))] -pub use real::IndexResolver; - -#[cfg(test)] -pub use test::MockIndexResolver as IndexResolver; - -pub fn create_index_resolver( - path: impl AsRef, - index_size: usize, - indexer_opts: &IndexerOpts, - meta_env: Arc, - file_store: UpdateFileStore, -) -> anyhow::Result { - let uuid_store = HeedMetaStore::new(meta_env)?; - let index_store = MapIndexStore::new(&path, index_size, indexer_opts)?; - Ok(IndexResolver::new(uuid_store, index_store, file_store)) -} - -mod real { - use super::*; - - pub struct IndexResolver { - pub(super) index_uuid_store: U, - pub(super) index_store: I, - pub(super) file_store: UpdateFileStore, - } - - impl IndexResolver { - pub fn load_dump( - src: impl AsRef, - dst: impl AsRef, - index_db_size: usize, - env: Arc, - indexer_opts: &IndexerOpts, - ) -> anyhow::Result<()> { - HeedMetaStore::load_dump(&src, env)?; - let indexes_path = src.as_ref().join("indexes"); - let indexes = indexes_path.read_dir()?; - let indexer_config = IndexerConfig::try_from(indexer_opts)?; - for index in indexes { - Index::load_dump(&index?.path(), &dst, index_db_size, &indexer_config)?; - } - - Ok(()) - } - } - - impl IndexResolver - where - U: IndexMetaStore, - I: IndexStore, - { - pub fn new(index_uuid_store: U, index_store: I, file_store: UpdateFileStore) -> Self { - Self { - index_uuid_store, - index_store, - file_store, - } - } - - pub async fn process_document_addition_batch(&self, tasks: &mut [Task]) { - fn get_content_uuid(task: &Task) -> Uuid { - match task { - Task { - content: TaskContent::DocumentAddition { content_uuid, .. }, - .. - } => *content_uuid, - _ => panic!("unexpected task in the document addition batch"), - } - } - - let content_uuids = tasks.iter().map(get_content_uuid).collect::>(); - - match tasks.first() { - Some(Task { - id, - content: - TaskContent::DocumentAddition { - merge_strategy, - primary_key, - allow_index_creation, - index_uid, - .. - }, - .. - }) => { - let primary_key = primary_key.clone(); - let method = *merge_strategy; - - let index = if *allow_index_creation { - self.get_or_create_index(index_uid.clone(), *id).await - } else { - self.get_index(index_uid.as_str().to_string()).await - }; - - // If the index doesn't exist and we are not allowed to create it with the first - // task, we must fails the whole batch. - let now = OffsetDateTime::now_utc(); - let index = match index { - Ok(index) => index, - Err(e) => { - let error = ResponseError::from(e); - for task in tasks.iter_mut() { - task.events.push(TaskEvent::Failed { - error: error.clone(), - timestamp: now, - }); - } - - return; - } - }; - - let file_store = self.file_store.clone(); - let result = spawn_blocking(move || { - index.update_documents( - method, - primary_key, - file_store, - content_uuids.into_iter(), - ) - }) - .await; - - match result { - Ok(Ok(results)) => { - for (task, result) in tasks.iter_mut().zip(results) { - let event = match result { - Ok(addition) => { - TaskEvent::succeeded(TaskResult::DocumentAddition { - indexed_documents: addition.indexed_documents, - }) - } - Err(error) => { - TaskEvent::failed(IndexResolverError::from(error)) - } - }; - task.events.push(event); - } - } - Ok(Err(e)) => { - let event = TaskEvent::failed(e); - for task in tasks.iter_mut() { - task.events.push(event.clone()); - } - } - Err(e) => { - let event = TaskEvent::failed(IndexResolverError::from(e)); - for task in tasks.iter_mut() { - task.events.push(event.clone()); - } - } - } - } - _ => panic!("invalid batch!"), - } - } - - pub async fn delete_content_file(&self, content_uuid: Uuid) -> Result<()> { - self.file_store.delete(content_uuid).await?; - Ok(()) - } - - async fn process_task_inner(&self, task: &Task) -> Result { - match &task.content { - TaskContent::DocumentAddition { .. } => { - panic!("updates should be handled by batch") - } - TaskContent::DocumentDeletion { - deletion: DocumentDeletion::Ids(ids), - index_uid, - } => { - let ids = ids.clone(); - let index = self.get_index(index_uid.clone().into_inner()).await?; - - let DocumentDeletionResult { - deleted_documents, .. - } = spawn_blocking(move || index.delete_documents(&ids)).await??; - - Ok(TaskResult::DocumentDeletion { deleted_documents }) - } - TaskContent::DocumentDeletion { - deletion: DocumentDeletion::Clear, - index_uid, - } => { - let index = self.get_index(index_uid.clone().into_inner()).await?; - let deleted_documents = spawn_blocking(move || -> IndexResult { - let number_documents = index.stats()?.number_of_documents; - index.clear_documents()?; - Ok(number_documents) - }) - .await??; - - Ok(TaskResult::ClearAll { deleted_documents }) - } - TaskContent::SettingsUpdate { - settings, - is_deletion, - allow_index_creation, - index_uid, - } => { - let index = if *is_deletion || !*allow_index_creation { - self.get_index(index_uid.clone().into_inner()).await? - } else { - self.get_or_create_index(index_uid.clone(), task.id).await? - }; - - let settings = settings.clone(); - spawn_blocking(move || index.update_settings(&settings.check())).await??; - - Ok(TaskResult::Other) - } - TaskContent::IndexDeletion { index_uid } => { - let index = self.delete_index(index_uid.clone().into_inner()).await?; - - let deleted_documents = spawn_blocking(move || -> IndexResult { - Ok(index.stats()?.number_of_documents) - }) - .await??; - - Ok(TaskResult::ClearAll { deleted_documents }) - } - TaskContent::IndexCreation { - primary_key, - index_uid, - } => { - let index = self.create_index(index_uid.clone(), task.id).await?; - - if let Some(primary_key) = primary_key { - let primary_key = primary_key.clone(); - spawn_blocking(move || index.update_primary_key(primary_key)).await??; - } - - Ok(TaskResult::Other) - } - TaskContent::IndexUpdate { - primary_key, - index_uid, - } => { - let index = self.get_index(index_uid.clone().into_inner()).await?; - - if let Some(primary_key) = primary_key { - let primary_key = primary_key.clone(); - spawn_blocking(move || index.update_primary_key(primary_key)).await??; - } - - Ok(TaskResult::Other) - } - _ => unreachable!("Invalid task for index resolver"), - } - } - - pub async fn process_task(&self, task: &mut Task) { - match self.process_task_inner(task).await { - Ok(res) => task.events.push(TaskEvent::succeeded(res)), - Err(e) => task.events.push(TaskEvent::failed(e)), - } - } - - pub async fn dump(&self, path: impl AsRef) -> Result<()> { - for (_, index) in self.list().await? { - index.dump(&path)?; - } - self.index_uuid_store.dump(path.as_ref().to_owned()).await?; - Ok(()) - } - - async fn create_index(&self, uid: IndexUid, creation_task_id: TaskId) -> Result { - match self.index_uuid_store.get(uid.into_inner()).await? { - (uid, Some(_)) => Err(IndexResolverError::IndexAlreadyExists(uid)), - (uid, None) => { - let uuid = Uuid::new_v4(); - let index = self.index_store.create(uuid).await?; - match self - .index_uuid_store - .insert( - uid, - IndexMeta { - uuid, - creation_task_id, - }, - ) - .await - { - Err(e) => { - match self.index_store.delete(uuid).await { - Ok(Some(index)) => { - index.close(); - } - Ok(None) => (), - Err(e) => log::error!("Error while deleting index: {:?}", e), - } - Err(e) - } - Ok(()) => Ok(index), - } - } - } - } - - /// Get or create an index with name `uid`. - pub async fn get_or_create_index(&self, uid: IndexUid, task_id: TaskId) -> Result { - match self.create_index(uid, task_id).await { - Ok(index) => Ok(index), - Err(IndexResolverError::IndexAlreadyExists(uid)) => self.get_index(uid).await, - Err(e) => Err(e), - } - } - - pub async fn list(&self) -> Result> { - let uuids = self.index_uuid_store.list().await?; - let mut indexes = Vec::new(); - for (name, IndexMeta { uuid, .. }) in uuids { - match self.index_store.get(uuid).await? { - Some(index) => indexes.push((name, index)), - None => { - // we found an unexisting index, we remove it from the uuid store - let _ = self.index_uuid_store.delete(name).await; - } - } - } - - Ok(indexes) - } - - pub async fn delete_index(&self, uid: String) -> Result { - match self.index_uuid_store.delete(uid.clone()).await? { - Some(IndexMeta { uuid, .. }) => match self.index_store.delete(uuid).await? { - Some(index) => { - index.clone().close(); - Ok(index) - } - None => Err(IndexResolverError::UnexistingIndex(uid)), - }, - None => Err(IndexResolverError::UnexistingIndex(uid)), - } - } - - pub async fn get_index(&self, uid: String) -> Result { - match self.index_uuid_store.get(uid).await? { - (name, Some(IndexMeta { uuid, .. })) => { - match self.index_store.get(uuid).await? { - Some(index) => Ok(index), - None => { - // For some reason we got a uuid to an unexisting index, we return an error, - // and remove the uuid from the uuid store. - let _ = self.index_uuid_store.delete(name.clone()).await; - Err(IndexResolverError::UnexistingIndex(name)) - } - } - } - (name, _) => Err(IndexResolverError::UnexistingIndex(name)), - } - } - - pub async fn get_index_creation_task_id(&self, index_uid: String) -> Result { - let (uid, meta) = self.index_uuid_store.get(index_uid).await?; - meta.map( - |IndexMeta { - creation_task_id, .. - }| creation_task_id, - ) - .ok_or(IndexResolverError::UnexistingIndex(uid)) - } - } -} - -#[cfg(test)] -mod test { - use crate::index::IndexStats; - - use super::index_store::MockIndexStore; - use super::meta_store::MockIndexMetaStore; - use super::*; - - use futures::future::ok; - use milli::FieldDistribution; - use nelson::Mocker; - - pub enum MockIndexResolver { - Real(super::real::IndexResolver), - Mock(Mocker), - } - - impl MockIndexResolver { - pub fn load_dump( - src: impl AsRef, - dst: impl AsRef, - index_db_size: usize, - env: Arc, - indexer_opts: &IndexerOpts, - ) -> anyhow::Result<()> { - super::real::IndexResolver::load_dump(src, dst, index_db_size, env, indexer_opts) - } - } - - impl MockIndexResolver - where - U: IndexMetaStore, - I: IndexStore, - { - pub fn new(index_uuid_store: U, index_store: I, file_store: UpdateFileStore) -> Self { - Self::Real(super::real::IndexResolver { - index_uuid_store, - index_store, - file_store, - }) - } - - pub fn mock(mocker: Mocker) -> Self { - Self::Mock(mocker) - } - - pub async fn process_document_addition_batch(&self, tasks: &mut [Task]) { - match self { - IndexResolver::Real(r) => r.process_document_addition_batch(tasks).await, - IndexResolver::Mock(m) => unsafe { - m.get("process_document_addition_batch").call(tasks) - }, - } - } - - pub async fn process_task(&self, task: &mut Task) { - match self { - IndexResolver::Real(r) => r.process_task(task).await, - IndexResolver::Mock(m) => unsafe { m.get("process_task").call(task) }, - } - } - - pub async fn dump(&self, path: impl AsRef) -> Result<()> { - match self { - IndexResolver::Real(r) => r.dump(path).await, - IndexResolver::Mock(_) => todo!(), - } - } - - /// Get or create an index with name `uid`. - pub async fn get_or_create_index(&self, uid: IndexUid, task_id: TaskId) -> Result { - match self { - IndexResolver::Real(r) => r.get_or_create_index(uid, task_id).await, - IndexResolver::Mock(_) => todo!(), - } - } - - pub async fn list(&self) -> Result> { - match self { - IndexResolver::Real(r) => r.list().await, - IndexResolver::Mock(_) => todo!(), - } - } - - pub async fn delete_index(&self, uid: String) -> Result { - match self { - IndexResolver::Real(r) => r.delete_index(uid).await, - IndexResolver::Mock(_) => todo!(), - } - } - - pub async fn get_index(&self, uid: String) -> Result { - match self { - IndexResolver::Real(r) => r.get_index(uid).await, - IndexResolver::Mock(_) => todo!(), - } - } - - pub async fn get_index_creation_task_id(&self, index_uid: String) -> Result { - match self { - IndexResolver::Real(r) => r.get_index_creation_task_id(index_uid).await, - IndexResolver::Mock(_) => todo!(), - } - } - - pub async fn delete_content_file(&self, content_uuid: Uuid) -> Result<()> { - match self { - IndexResolver::Real(r) => r.delete_content_file(content_uuid).await, - IndexResolver::Mock(m) => unsafe { - m.get("delete_content_file").call(content_uuid) - }, - } - } - } - - #[actix_rt::test] - async fn test_remove_unknown_index() { - let mut meta_store = MockIndexMetaStore::new(); - meta_store - .expect_delete() - .once() - .returning(|_| Box::pin(ok(None))); - - let index_store = MockIndexStore::new(); - - let mocker = Mocker::default(); - let file_store = UpdateFileStore::mock(mocker); - - let index_resolver = IndexResolver::new(meta_store, index_store, file_store); - - let mut task = Task { - id: 1, - content: TaskContent::IndexDeletion { - index_uid: IndexUid::new_unchecked("test"), - }, - events: Vec::new(), - }; - - index_resolver.process_task(&mut task).await; - - assert!(matches!(task.events[0], TaskEvent::Failed { .. })); - } - - #[actix_rt::test] - async fn test_remove_index() { - let mut meta_store = MockIndexMetaStore::new(); - meta_store.expect_delete().once().returning(|_| { - Box::pin(ok(Some(IndexMeta { - uuid: Uuid::new_v4(), - creation_task_id: 1, - }))) - }); - - let mut index_store = MockIndexStore::new(); - index_store.expect_delete().once().returning(|_| { - let mocker = Mocker::default(); - mocker.when::<(), ()>("close").then(|_| ()); - mocker - .when::<(), IndexResult>("stats") - .then(|_| { - Ok(IndexStats { - size: 10, - number_of_documents: 10, - is_indexing: None, - field_distribution: FieldDistribution::default(), - }) - }); - Box::pin(ok(Some(Index::mock(mocker)))) - }); - - let mocker = Mocker::default(); - let file_store = UpdateFileStore::mock(mocker); - - let index_resolver = IndexResolver::new(meta_store, index_store, file_store); - - let mut task = Task { - id: 1, - content: TaskContent::IndexDeletion { - index_uid: IndexUid::new_unchecked("test"), - }, - events: Vec::new(), - }; - - index_resolver.process_task(&mut task).await; - - assert!(matches!(task.events[0], TaskEvent::Succeeded { .. })); - } - - #[actix_rt::test] - async fn test_delete_documents() { - let mut meta_store = MockIndexMetaStore::new(); - meta_store.expect_get().once().returning(|_| { - Box::pin(ok(( - "test".to_string(), - Some(IndexMeta { - uuid: Uuid::new_v4(), - creation_task_id: 1, - }), - ))) - }); - - let mut index_store = MockIndexStore::new(); - index_store.expect_get().once().returning(|_| { - let mocker = Mocker::default(); - mocker - .when::<(), IndexResult<()>>("clear_documents") - .once() - .then(|_| Ok(())); - mocker - .when::<(), IndexResult>("stats") - .once() - .then(|_| { - Ok(IndexStats { - size: 10, - number_of_documents: 10, - is_indexing: None, - field_distribution: FieldDistribution::default(), - }) - }); - Box::pin(ok(Some(Index::mock(mocker)))) - }); - - let mocker = Mocker::default(); - let file_store = UpdateFileStore::mock(mocker); - - let index_resolver = IndexResolver::new(meta_store, index_store, file_store); - - let mut task = Task { - id: 1, - content: TaskContent::DocumentDeletion { - deletion: DocumentDeletion::Clear, - index_uid: IndexUid::new_unchecked("test"), - }, - events: Vec::new(), - }; - - index_resolver.process_task(&mut task).await; - - assert!(matches!(task.events[0], TaskEvent::Succeeded { .. })); - } - - #[actix_rt::test] - async fn test_index_update() { - let mut meta_store = MockIndexMetaStore::new(); - meta_store.expect_get().once().returning(|_| { - Box::pin(ok(( - "test".to_string(), - Some(IndexMeta { - uuid: Uuid::new_v4(), - creation_task_id: 1, - }), - ))) - }); - - let mut index_store = MockIndexStore::new(); - index_store.expect_get().once().returning(|_| { - let mocker = Mocker::default(); - - mocker - .when::>("update_primary_key") - .once() - .then(|_| { - Ok(crate::index::IndexMeta { - created_at: OffsetDateTime::now_utc(), - updated_at: OffsetDateTime::now_utc(), - primary_key: Some("key".to_string()), - }) - }); - Box::pin(ok(Some(Index::mock(mocker)))) - }); - - let mocker = Mocker::default(); - let file_store = UpdateFileStore::mock(mocker); - - let index_resolver = IndexResolver::new(meta_store, index_store, file_store); - - let mut task = Task { - id: 1, - content: TaskContent::IndexUpdate { - primary_key: Some("key".to_string()), - index_uid: IndexUid::new_unchecked("test"), - }, - events: Vec::new(), - }; - - index_resolver.process_task(&mut task).await; - - assert!(matches!(task.events[0], TaskEvent::Succeeded { .. })); - } -} diff --git a/meilisearch-lib/src/lib.rs b/meilisearch-lib/src/lib.rs index 7fe0984dc..264d42050 100644 --- a/meilisearch-lib/src/lib.rs +++ b/meilisearch-lib/src/lib.rs @@ -3,24 +3,23 @@ pub mod error; pub mod options; mod analytics; +mod document_formats; +// TODO: TAMO: reenable the dumps +#[cfg(todo)] mod dump; -pub mod index; -pub mod index_controller; -mod index_resolver; +mod index_controller; mod snapshot; -pub mod tasks; -mod update_file_store; use std::env::VarError; use std::ffi::OsStr; use std::path::Path; -pub use index_controller::MeiliSearch; +// TODO: TAMO: rename the MeiliSearch in Meilisearch +pub use index_controller::Meilisearch as MeiliSearch; pub use milli; pub use milli::heed; mod compression; -pub mod document_formats; /// Check if a db is empty. It does not provide any information on the /// validity of the data in it. diff --git a/meilisearch-lib/src/snapshot.rs b/meilisearch-lib/src/snapshot.rs index 4566a627e..5d68907c8 100644 --- a/meilisearch-lib/src/snapshot.rs +++ b/meilisearch-lib/src/snapshot.rs @@ -15,7 +15,7 @@ use walkdir::WalkDir; use crate::compression::from_tar_gz; use crate::index_controller::open_meta_env; use crate::index_controller::versioning::VERSION_FILE_NAME; -use crate::tasks::Scheduler; +use index_scheduler::IndexScheduler; pub struct SnapshotService { pub(crate) db_path: PathBuf, @@ -23,7 +23,7 @@ pub struct SnapshotService { pub(crate) snapshot_path: PathBuf, pub(crate) index_size: usize, pub(crate) meta_env_size: usize, - pub(crate) scheduler: Arc>, + pub(crate) scheduler: IndexScheduler, } impl SnapshotService { @@ -39,7 +39,8 @@ impl SnapshotService { meta_env_size: self.meta_env_size, index_size: self.index_size, }; - self.scheduler.write().await.schedule_snapshot(snapshot_job); + // TODO: TAMO: reenable the snapshots + // self.scheduler.write().await.schedule_snapshot(snapshot_job); sleep(self.snapshot_period).await; } } diff --git a/meilisearch-lib/src/tasks/batch.rs b/meilisearch-lib/src/tasks/batch.rs deleted file mode 100644 index 5fa2e224a..000000000 --- a/meilisearch-lib/src/tasks/batch.rs +++ /dev/null @@ -1,75 +0,0 @@ -use time::OffsetDateTime; - -use crate::snapshot::SnapshotJob; - -use super::task::{Task, TaskEvent}; - -pub type BatchId = u32; - -#[derive(Debug)] -pub enum BatchContent { - DocumentsAdditionBatch(Vec), - IndexUpdate(Task), - Dump(Task), - Snapshot(SnapshotJob), - // Symbolizes a empty batch. This can occur when we were woken, but there wasn't any work to do. - Empty, -} - -impl BatchContent { - pub fn first(&self) -> Option<&Task> { - match self { - BatchContent::DocumentsAdditionBatch(ts) => ts.first(), - BatchContent::Dump(t) | BatchContent::IndexUpdate(t) => Some(t), - BatchContent::Snapshot(_) | BatchContent::Empty => None, - } - } - - pub fn push_event(&mut self, event: TaskEvent) { - match self { - BatchContent::DocumentsAdditionBatch(ts) => { - ts.iter_mut().for_each(|t| t.events.push(event.clone())) - } - BatchContent::IndexUpdate(t) | BatchContent::Dump(t) => t.events.push(event), - BatchContent::Snapshot(_) | BatchContent::Empty => (), - } - } -} - -#[derive(Debug)] -pub struct Batch { - // Only batches that contains a persistent tasks are given an id. Snapshot batches don't have - // an id. - pub id: Option, - pub created_at: OffsetDateTime, - pub content: BatchContent, -} - -impl Batch { - pub fn new(id: Option, content: BatchContent) -> Self { - Self { - id, - created_at: OffsetDateTime::now_utc(), - content, - } - } - pub fn len(&self) -> usize { - match self.content { - BatchContent::DocumentsAdditionBatch(ref ts) => ts.len(), - BatchContent::IndexUpdate(_) | BatchContent::Dump(_) | BatchContent::Snapshot(_) => 1, - BatchContent::Empty => 0, - } - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - pub fn empty() -> Self { - Self { - id: None, - created_at: OffsetDateTime::now_utc(), - content: BatchContent::Empty, - } - } -} diff --git a/meilisearch-lib/src/tasks/error.rs b/meilisearch-lib/src/tasks/error.rs deleted file mode 100644 index 75fd7a591..000000000 --- a/meilisearch-lib/src/tasks/error.rs +++ /dev/null @@ -1,34 +0,0 @@ -use meilisearch_types::error::{Code, ErrorCode}; -use meilisearch_types::internal_error; -use tokio::task::JoinError; - -use crate::update_file_store::UpdateFileStoreError; - -use super::task::TaskId; - -pub type Result = std::result::Result; - -#[derive(Debug, thiserror::Error)] -pub enum TaskError { - #[error("Task `{0}` not found.")] - UnexistingTask(TaskId), - #[error("Internal error: {0}")] - Internal(Box), -} - -internal_error!( - TaskError: milli::heed::Error, - JoinError, - std::io::Error, - serde_json::Error, - UpdateFileStoreError -); - -impl ErrorCode for TaskError { - fn error_code(&self) -> Code { - match self { - TaskError::UnexistingTask(_) => Code::TaskNotFound, - TaskError::Internal(_) => Code::Internal, - } - } -} diff --git a/meilisearch-lib/src/tasks/handlers/dump_handler.rs b/meilisearch-lib/src/tasks/handlers/dump_handler.rs deleted file mode 100644 index c0833e4c7..000000000 --- a/meilisearch-lib/src/tasks/handlers/dump_handler.rs +++ /dev/null @@ -1,132 +0,0 @@ -use crate::dump::DumpHandler; -use crate::index_resolver::index_store::IndexStore; -use crate::index_resolver::meta_store::IndexMetaStore; -use crate::tasks::batch::{Batch, BatchContent}; -use crate::tasks::task::{Task, TaskContent, TaskEvent, TaskResult}; -use crate::tasks::BatchHandler; - -#[async_trait::async_trait] -impl BatchHandler for DumpHandler -where - U: IndexMetaStore + Sync + Send + 'static, - I: IndexStore + Sync + Send + 'static, -{ - fn accept(&self, batch: &Batch) -> bool { - matches!(batch.content, BatchContent::Dump { .. }) - } - - async fn process_batch(&self, mut batch: Batch) -> Batch { - match &batch.content { - BatchContent::Dump(Task { - content: TaskContent::Dump { uid }, - .. - }) => { - match self.run(uid.clone()).await { - Ok(_) => { - batch - .content - .push_event(TaskEvent::succeeded(TaskResult::Other)); - } - Err(e) => batch.content.push_event(TaskEvent::failed(e)), - } - batch - } - _ => unreachable!("invalid batch content for dump"), - } - } - - async fn finish(&self, _: &Batch) {} -} - -#[cfg(test)] -mod test { - use crate::dump::error::{DumpError, Result as DumpResult}; - use crate::index_resolver::{index_store::MockIndexStore, meta_store::MockIndexMetaStore}; - use crate::tasks::handlers::test::task_to_batch; - - use super::*; - - use nelson::Mocker; - use proptest::prelude::*; - - proptest! { - #[test] - fn finish_does_nothing( - task in any::(), - ) { - let rt = tokio::runtime::Runtime::new().unwrap(); - let handle = rt.spawn(async { - let batch = task_to_batch(task); - - let mocker = Mocker::default(); - let dump_handler = DumpHandler::::mock(mocker); - - dump_handler.finish(&batch).await; - }); - - rt.block_on(handle).unwrap(); - } - - #[test] - fn test_handle_dump_success( - task in any::(), - ) { - let rt = tokio::runtime::Runtime::new().unwrap(); - let handle = rt.spawn(async { - let batch = task_to_batch(task); - let should_accept = matches!(batch.content, BatchContent::Dump { .. }); - - let mocker = Mocker::default(); - if should_accept { - mocker.when::>("run") - .once() - .then(|_| Ok(())); - } - - let dump_handler = DumpHandler::::mock(mocker); - - let accept = dump_handler.accept(&batch); - assert_eq!(accept, should_accept); - - if accept { - let batch = dump_handler.process_batch(batch).await; - let last_event = batch.content.first().unwrap().events.last().unwrap(); - assert!(matches!(last_event, TaskEvent::Succeeded { .. })); - } - }); - - rt.block_on(handle).unwrap(); - } - - #[test] - fn test_handle_dump_error( - task in any::(), - ) { - let rt = tokio::runtime::Runtime::new().unwrap(); - let handle = rt.spawn(async { - let batch = task_to_batch(task); - let should_accept = matches!(batch.content, BatchContent::Dump { .. }); - - let mocker = Mocker::default(); - if should_accept { - mocker.when::>("run") - .once() - .then(|_| Err(DumpError::Internal("error".into()))); - } - - let dump_handler = DumpHandler::::mock(mocker); - - let accept = dump_handler.accept(&batch); - assert_eq!(accept, should_accept); - - if accept { - let batch = dump_handler.process_batch(batch).await; - let last_event = batch.content.first().unwrap().events.last().unwrap(); - assert!(matches!(last_event, TaskEvent::Failed { .. })); - } - }); - - rt.block_on(handle).unwrap(); - } - } -} diff --git a/meilisearch-lib/src/tasks/handlers/empty_handler.rs b/meilisearch-lib/src/tasks/handlers/empty_handler.rs deleted file mode 100644 index d800e1965..000000000 --- a/meilisearch-lib/src/tasks/handlers/empty_handler.rs +++ /dev/null @@ -1,18 +0,0 @@ -use crate::tasks::batch::{Batch, BatchContent}; -use crate::tasks::BatchHandler; - -/// A sink handler for empty tasks. -pub struct EmptyBatchHandler; - -#[async_trait::async_trait] -impl BatchHandler for EmptyBatchHandler { - fn accept(&self, batch: &Batch) -> bool { - matches!(batch.content, BatchContent::Empty) - } - - async fn process_batch(&self, batch: Batch) -> Batch { - batch - } - - async fn finish(&self, _: &Batch) {} -} diff --git a/meilisearch-lib/src/tasks/handlers/index_resolver_handler.rs b/meilisearch-lib/src/tasks/handlers/index_resolver_handler.rs deleted file mode 100644 index 22c57e2fd..000000000 --- a/meilisearch-lib/src/tasks/handlers/index_resolver_handler.rs +++ /dev/null @@ -1,199 +0,0 @@ -use crate::index_resolver::IndexResolver; -use crate::index_resolver::{index_store::IndexStore, meta_store::IndexMetaStore}; -use crate::tasks::batch::{Batch, BatchContent}; -use crate::tasks::BatchHandler; - -#[async_trait::async_trait] -impl BatchHandler for IndexResolver -where - U: IndexMetaStore + Send + Sync + 'static, - I: IndexStore + Send + Sync + 'static, -{ - fn accept(&self, batch: &Batch) -> bool { - matches!( - batch.content, - BatchContent::DocumentsAdditionBatch(_) | BatchContent::IndexUpdate(_) - ) - } - - async fn process_batch(&self, mut batch: Batch) -> Batch { - match batch.content { - BatchContent::DocumentsAdditionBatch(ref mut tasks) => { - self.process_document_addition_batch(tasks).await; - } - BatchContent::IndexUpdate(ref mut task) => { - self.process_task(task).await; - } - _ => unreachable!(), - } - - batch - } - - async fn finish(&self, batch: &Batch) { - if let BatchContent::DocumentsAdditionBatch(ref tasks) = batch.content { - for task in tasks { - if let Some(content_uuid) = task.get_content_uuid() { - if let Err(e) = self.delete_content_file(content_uuid).await { - log::error!("error deleting update file: {}", e); - } - } - } - } - } -} - -#[cfg(test)] -mod test { - use crate::index_resolver::index_store::MapIndexStore; - use crate::index_resolver::meta_store::HeedMetaStore; - use crate::index_resolver::{ - error::Result as IndexResult, index_store::MockIndexStore, meta_store::MockIndexMetaStore, - }; - use crate::tasks::{ - handlers::test::task_to_batch, - task::{Task, TaskContent}, - }; - use crate::update_file_store::{Result as FileStoreResult, UpdateFileStore}; - - use super::*; - use meilisearch_types::index_uid::IndexUid; - use milli::update::IndexDocumentsMethod; - use nelson::Mocker; - use proptest::prelude::*; - use uuid::Uuid; - - proptest! { - #[test] - fn test_accept_task( - task in any::(), - ) { - let batch = task_to_batch(task); - - let index_store = MockIndexStore::new(); - let meta_store = MockIndexMetaStore::new(); - let mocker = Mocker::default(); - let update_file_store = UpdateFileStore::mock(mocker); - let index_resolver = IndexResolver::new(meta_store, index_store, update_file_store); - - match batch.content { - BatchContent::DocumentsAdditionBatch(_) - | BatchContent::IndexUpdate(_) => assert!(index_resolver.accept(&batch)), - BatchContent::Dump(_) - | BatchContent::Snapshot(_) - | BatchContent::Empty => assert!(!index_resolver.accept(&batch)), - } - } - } - - #[actix_rt::test] - async fn finisher_called_on_document_update() { - let index_store = MockIndexStore::new(); - let meta_store = MockIndexMetaStore::new(); - let mocker = Mocker::default(); - let content_uuid = Uuid::new_v4(); - mocker - .when::>("delete") - .once() - .then(move |uuid| { - assert_eq!(uuid, content_uuid); - Ok(()) - }); - let update_file_store = UpdateFileStore::mock(mocker); - let index_resolver = IndexResolver::new(meta_store, index_store, update_file_store); - - let task = Task { - id: 1, - content: TaskContent::DocumentAddition { - content_uuid, - merge_strategy: IndexDocumentsMethod::ReplaceDocuments, - primary_key: None, - documents_count: 100, - allow_index_creation: true, - index_uid: IndexUid::new_unchecked("test"), - }, - events: Vec::new(), - }; - - let batch = task_to_batch(task); - - index_resolver.finish(&batch).await; - } - - #[actix_rt::test] - #[should_panic] - async fn panic_when_passed_unsupported_batch() { - let index_store = MockIndexStore::new(); - let meta_store = MockIndexMetaStore::new(); - let mocker = Mocker::default(); - let update_file_store = UpdateFileStore::mock(mocker); - let index_resolver = IndexResolver::new(meta_store, index_store, update_file_store); - - let task = Task { - id: 1, - content: TaskContent::Dump { - uid: String::from("hello"), - }, - events: Vec::new(), - }; - - let batch = task_to_batch(task); - - index_resolver.process_batch(batch).await; - } - - proptest! { - #[test] - fn index_document_task_deletes_update_file( - task in any::(), - ) { - let rt = tokio::runtime::Runtime::new().unwrap(); - let handle = rt.spawn(async { - let mocker = Mocker::default(); - - if let TaskContent::DocumentAddition{ .. } = task.content { - mocker.when::>("delete_content_file").then(|_| Ok(())); - } - - let index_resolver: IndexResolver = IndexResolver::mock(mocker); - - let batch = task_to_batch(task); - - index_resolver.finish(&batch).await; - }); - - rt.block_on(handle).unwrap(); - } - - #[test] - fn test_handle_batch(task in any::()) { - let rt = tokio::runtime::Runtime::new().unwrap(); - let handle = rt.spawn(async { - let mocker = Mocker::default(); - match task.content { - TaskContent::DocumentAddition { .. } => { - mocker.when::<&mut [Task], ()>("process_document_addition_batch").then(|_| ()); - } - TaskContent::Dump { .. } => (), - _ => { - mocker.when::<&mut Task, ()>("process_task").then(|_| ()); - } - } - let index_resolver: IndexResolver = IndexResolver::mock(mocker); - - - let batch = task_to_batch(task); - - if index_resolver.accept(&batch) { - index_resolver.process_batch(batch).await; - } - }); - - if let Err(e) = rt.block_on(handle) { - if e.is_panic() { - std::panic::resume_unwind(e.into_panic()); - } - } - } - } -} diff --git a/meilisearch-lib/src/tasks/handlers/mod.rs b/meilisearch-lib/src/tasks/handlers/mod.rs deleted file mode 100644 index 8f02de8b9..000000000 --- a/meilisearch-lib/src/tasks/handlers/mod.rs +++ /dev/null @@ -1,34 +0,0 @@ -pub mod dump_handler; -pub mod empty_handler; -mod index_resolver_handler; -pub mod snapshot_handler; - -#[cfg(test)] -mod test { - use time::OffsetDateTime; - - use crate::tasks::{ - batch::{Batch, BatchContent}, - task::{Task, TaskContent}, - }; - - pub fn task_to_batch(task: Task) -> Batch { - let content = match task.content { - TaskContent::DocumentAddition { .. } => { - BatchContent::DocumentsAdditionBatch(vec![task]) - } - TaskContent::DocumentDeletion { .. } - | TaskContent::SettingsUpdate { .. } - | TaskContent::IndexDeletion { .. } - | TaskContent::IndexCreation { .. } - | TaskContent::IndexUpdate { .. } => BatchContent::IndexUpdate(task), - TaskContent::Dump { .. } => BatchContent::Dump(task), - }; - - Batch { - id: Some(1), - created_at: OffsetDateTime::now_utc(), - content, - } - } -} diff --git a/meilisearch-lib/src/tasks/handlers/snapshot_handler.rs b/meilisearch-lib/src/tasks/handlers/snapshot_handler.rs deleted file mode 100644 index 32fe6d746..000000000 --- a/meilisearch-lib/src/tasks/handlers/snapshot_handler.rs +++ /dev/null @@ -1,26 +0,0 @@ -use crate::tasks::batch::{Batch, BatchContent}; -use crate::tasks::BatchHandler; - -pub struct SnapshotHandler; - -#[async_trait::async_trait] -impl BatchHandler for SnapshotHandler { - fn accept(&self, batch: &Batch) -> bool { - matches!(batch.content, BatchContent::Snapshot(_)) - } - - async fn process_batch(&self, batch: Batch) -> Batch { - match batch.content { - BatchContent::Snapshot(job) => { - if let Err(e) = job.run().await { - log::error!("snapshot error: {e}"); - } - } - _ => unreachable!(), - } - - Batch::empty() - } - - async fn finish(&self, _: &Batch) {} -} diff --git a/meilisearch-lib/src/tasks/mod.rs b/meilisearch-lib/src/tasks/mod.rs deleted file mode 100644 index fe722a987..000000000 --- a/meilisearch-lib/src/tasks/mod.rs +++ /dev/null @@ -1,56 +0,0 @@ -use async_trait::async_trait; - -pub use handlers::empty_handler::EmptyBatchHandler; -pub use handlers::snapshot_handler::SnapshotHandler; -pub use scheduler::Scheduler; -pub use task_store::TaskFilter; - -#[cfg(test)] -pub use task_store::test::MockTaskStore as TaskStore; -#[cfg(not(test))] -pub use task_store::TaskStore; - -use batch::Batch; -use error::Result; - -pub mod batch; -pub mod error; -mod handlers; -mod scheduler; -pub mod task; -mod task_store; -pub mod update_loop; - -#[cfg_attr(test, mockall::automock(type Error=test::DebugError;))] -#[async_trait] -pub trait BatchHandler: Sync + Send + 'static { - /// return whether this handler can accept this batch - fn accept(&self, batch: &Batch) -> bool; - - /// Processes the `Task` batch returning the batch with the `Task` updated. - /// - /// It is ok for this function to panic if a batch is handed that hasn't been verified by - /// `accept` beforehand. - async fn process_batch(&self, batch: Batch) -> Batch; - - /// `finish` is called when the result of `process` has been committed to the task store. This - /// method can be used to perform cleanup after the update has been completed for example. - async fn finish(&self, batch: &Batch); -} - -#[cfg(test)] -mod test { - use serde::{Deserialize, Serialize}; - use std::fmt::Display; - - #[derive(Debug, Serialize, Deserialize)] - pub struct DebugError; - - impl Display for DebugError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str("an error") - } - } - - impl std::error::Error for DebugError {} -} diff --git a/meilisearch-lib/src/tasks/scheduler.rs b/meilisearch-lib/src/tasks/scheduler.rs deleted file mode 100644 index c592b71fa..000000000 --- a/meilisearch-lib/src/tasks/scheduler.rs +++ /dev/null @@ -1,609 +0,0 @@ -use std::cmp::Ordering; -use std::collections::{hash_map::Entry, BinaryHeap, HashMap, VecDeque}; -use std::ops::{Deref, DerefMut}; -use std::slice; -use std::sync::Arc; - -use atomic_refcell::AtomicRefCell; -use milli::update::IndexDocumentsMethod; -use time::OffsetDateTime; -use tokio::sync::{watch, RwLock}; - -use crate::options::SchedulerConfig; -use crate::snapshot::SnapshotJob; - -use super::batch::{Batch, BatchContent}; -use super::error::Result; -use super::task::{Task, TaskContent, TaskEvent, TaskId}; -use super::update_loop::UpdateLoop; -use super::{BatchHandler, TaskFilter, TaskStore}; - -#[derive(Eq, Debug, Clone, Copy)] -enum TaskType { - DocumentAddition { number: usize }, - DocumentUpdate { number: usize }, - IndexUpdate, - Dump, -} - -/// Two tasks are equal if they have the same type. -impl PartialEq for TaskType { - fn eq(&self, other: &Self) -> bool { - matches!( - (self, other), - (Self::DocumentAddition { .. }, Self::DocumentAddition { .. }) - | (Self::DocumentUpdate { .. }, Self::DocumentUpdate { .. }) - ) - } -} - -#[derive(Eq, Debug, Clone, Copy)] -struct PendingTask { - kind: TaskType, - id: TaskId, -} - -impl PartialEq for PendingTask { - fn eq(&self, other: &Self) -> bool { - self.id.eq(&other.id) - } -} - -impl PartialOrd for PendingTask { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for PendingTask { - fn cmp(&self, other: &Self) -> Ordering { - self.id.cmp(&other.id).reverse() - } -} - -#[derive(Debug)] -struct TaskList { - id: TaskListIdentifier, - tasks: BinaryHeap, -} - -impl Deref for TaskList { - type Target = BinaryHeap; - - fn deref(&self) -> &Self::Target { - &self.tasks - } -} - -impl DerefMut for TaskList { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.tasks - } -} - -impl TaskList { - fn new(id: TaskListIdentifier) -> Self { - Self { - id, - tasks: Default::default(), - } - } -} - -impl PartialEq for TaskList { - fn eq(&self, other: &Self) -> bool { - self.id == other.id - } -} - -impl Eq for TaskList {} - -impl Ord for TaskList { - fn cmp(&self, other: &Self) -> Ordering { - match (&self.id, &other.id) { - (TaskListIdentifier::Index(_), TaskListIdentifier::Index(_)) => { - match (self.peek(), other.peek()) { - (None, None) => Ordering::Equal, - (None, Some(_)) => Ordering::Less, - (Some(_), None) => Ordering::Greater, - (Some(lhs), Some(rhs)) => lhs.cmp(rhs), - } - } - (TaskListIdentifier::Index(_), TaskListIdentifier::Dump) => Ordering::Less, - (TaskListIdentifier::Dump, TaskListIdentifier::Index(_)) => Ordering::Greater, - (TaskListIdentifier::Dump, TaskListIdentifier::Dump) => { - unreachable!("There should be only one Dump task list") - } - } - } -} - -impl PartialOrd for TaskList { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -#[derive(PartialEq, Eq, Hash, Debug, Clone)] -enum TaskListIdentifier { - Index(String), - Dump, -} - -impl From<&Task> for TaskListIdentifier { - fn from(task: &Task) -> Self { - match &task.content { - TaskContent::DocumentAddition { index_uid, .. } - | TaskContent::DocumentDeletion { index_uid, .. } - | TaskContent::SettingsUpdate { index_uid, .. } - | TaskContent::IndexDeletion { index_uid } - | TaskContent::IndexCreation { index_uid, .. } - | TaskContent::IndexUpdate { index_uid, .. } => { - TaskListIdentifier::Index(index_uid.as_str().to_string()) - } - TaskContent::Dump { .. } => TaskListIdentifier::Dump, - } - } -} - -#[derive(Default)] -struct TaskQueue { - /// Maps index uids to their TaskList, for quick access - index_tasks: HashMap>>, - /// A queue that orders TaskList by the priority of their fist update - queue: BinaryHeap>>, -} - -impl TaskQueue { - fn insert(&mut self, task: Task) { - let id = task.id; - let uid = TaskListIdentifier::from(&task); - - let kind = match task.content { - TaskContent::DocumentAddition { - documents_count, - merge_strategy: IndexDocumentsMethod::ReplaceDocuments, - .. - } => TaskType::DocumentAddition { - number: documents_count, - }, - TaskContent::DocumentAddition { - documents_count, - merge_strategy: IndexDocumentsMethod::UpdateDocuments, - .. - } => TaskType::DocumentUpdate { - number: documents_count, - }, - TaskContent::Dump { .. } => TaskType::Dump, - TaskContent::DocumentDeletion { .. } - | TaskContent::SettingsUpdate { .. } - | TaskContent::IndexDeletion { .. } - | TaskContent::IndexCreation { .. } - | TaskContent::IndexUpdate { .. } => TaskType::IndexUpdate, - _ => unreachable!("unhandled task type"), - }; - let task = PendingTask { kind, id }; - - match self.index_tasks.entry(uid) { - Entry::Occupied(entry) => { - // A task list already exists for this index, all we have to to is to push the new - // update to the end of the list. This won't change the order since ids are - // monotonically increasing. - let mut list = entry.get().borrow_mut(); - - // We only need the first element to be lower than the one we want to - // insert to preserve the order in the queue. - assert!(list.peek().map(|old_id| id >= old_id.id).unwrap_or(true)); - - list.push(task); - } - Entry::Vacant(entry) => { - let mut task_list = TaskList::new(entry.key().clone()); - task_list.push(task); - let task_list = Arc::new(AtomicRefCell::new(task_list)); - entry.insert(task_list.clone()); - self.queue.push(task_list); - } - } - } - - /// Passes a context with a view to the task list of the next index to schedule. It is - /// guaranteed that the first id from task list will be the lowest pending task id. - fn head_mut(&mut self, mut f: impl FnMut(&mut TaskList) -> R) -> Option { - let head = self.queue.pop()?; - let result = { - let mut ref_head = head.borrow_mut(); - f(&mut *ref_head) - }; - if !head.borrow().tasks.is_empty() { - // After being mutated, the head is reinserted to the correct position. - self.queue.push(head); - } else { - self.index_tasks.remove(&head.borrow().id); - } - - Some(result) - } - - pub fn is_empty(&self) -> bool { - self.queue.is_empty() && self.index_tasks.is_empty() - } -} - -pub struct Scheduler { - // TODO: currently snapshots are non persistent tasks, and are treated differently. - snapshots: VecDeque, - tasks: TaskQueue, - - store: TaskStore, - processing: Processing, - next_fetched_task_id: TaskId, - config: SchedulerConfig, - /// Notifies the update loop that a new task was received - notifier: watch::Sender<()>, -} - -impl Scheduler { - pub fn new( - store: TaskStore, - performers: Vec>, - config: SchedulerConfig, - ) -> Result>> { - let (notifier, rcv) = watch::channel(()); - - let this = Self { - snapshots: VecDeque::new(), - tasks: TaskQueue::default(), - - store, - processing: Processing::Nothing, - next_fetched_task_id: 0, - config, - notifier, - }; - - // Notify update loop to start processing pending updates immediately after startup. - this.notify(); - - let this = Arc::new(RwLock::new(this)); - - let update_loop = UpdateLoop::new(this.clone(), performers, rcv); - - tokio::task::spawn_local(update_loop.run()); - - Ok(this) - } - - fn register_task(&mut self, task: Task) { - assert!(!task.is_finished()); - self.tasks.insert(task); - } - - /// Clears the processing list, this method should be called when the processing of a batch is finished. - pub fn finish(&mut self) { - self.processing = Processing::Nothing; - } - - pub fn notify(&self) { - let _ = self.notifier.send(()); - } - - fn notify_if_not_empty(&self) { - if !self.snapshots.is_empty() || !self.tasks.is_empty() { - self.notify(); - } - } - - pub async fn update_tasks(&self, content: BatchContent) -> Result { - match content { - BatchContent::DocumentsAdditionBatch(tasks) => { - let tasks = self.store.update_tasks(tasks).await?; - Ok(BatchContent::DocumentsAdditionBatch(tasks)) - } - BatchContent::IndexUpdate(t) => { - let mut tasks = self.store.update_tasks(vec![t]).await?; - Ok(BatchContent::IndexUpdate(tasks.remove(0))) - } - BatchContent::Dump(t) => { - let mut tasks = self.store.update_tasks(vec![t]).await?; - Ok(BatchContent::Dump(tasks.remove(0))) - } - other => Ok(other), - } - } - - pub async fn get_task(&self, id: TaskId, filter: Option) -> Result { - self.store.get_task(id, filter).await - } - - pub async fn list_tasks( - &self, - offset: Option, - filter: Option, - limit: Option, - ) -> Result> { - self.store.list_tasks(offset, filter, limit).await - } - - pub async fn get_processing_tasks(&self) -> Result> { - let mut tasks = Vec::new(); - - for id in self.processing.ids() { - let task = self.store.get_task(id, None).await?; - tasks.push(task); - } - - Ok(tasks) - } - - pub fn schedule_snapshot(&mut self, job: SnapshotJob) { - self.snapshots.push_back(job); - self.notify(); - } - - async fn fetch_pending_tasks(&mut self) -> Result<()> { - self.store - .fetch_unfinished_tasks(Some(self.next_fetched_task_id)) - .await? - .into_iter() - .for_each(|t| { - self.next_fetched_task_id = t.id + 1; - self.register_task(t); - }); - - Ok(()) - } - - /// Prepare the next batch, and set `processing` to the ids in that batch. - pub async fn prepare(&mut self) -> Result { - // If there is a job to process, do it first. - if let Some(job) = self.snapshots.pop_front() { - // There is more work to do, notify the update loop - self.notify_if_not_empty(); - let batch = Batch::new(None, BatchContent::Snapshot(job)); - return Ok(batch); - } - - // Try to fill the queue with pending tasks. - self.fetch_pending_tasks().await?; - - self.processing = make_batch(&mut self.tasks, &self.config); - - log::debug!("prepared batch with {} tasks", self.processing.len()); - - if !self.processing.is_nothing() { - let (processing, mut content) = self - .store - .get_processing_tasks(std::mem::take(&mut self.processing)) - .await?; - - // The batch id is the id of the first update it contains. At this point we must have a - // valid batch that contains at least 1 task. - let id = match content.first() { - Some(Task { id, .. }) => *id, - _ => panic!("invalid batch"), - }; - - content.push_event(TaskEvent::Batched { - batch_id: id, - timestamp: OffsetDateTime::now_utc(), - }); - - self.processing = processing; - - let batch = Batch::new(Some(id), content); - - // There is more work to do, notify the update loop - self.notify_if_not_empty(); - - Ok(batch) - } else { - Ok(Batch::empty()) - } - } -} - -#[derive(Debug, PartialEq, Eq)] -pub enum Processing { - DocumentAdditions(Vec), - IndexUpdate(TaskId), - Dump(TaskId), - /// Variant used when there is nothing to process. - Nothing, -} - -impl Default for Processing { - fn default() -> Self { - Self::Nothing - } -} - -enum ProcessingIter<'a> { - Many(slice::Iter<'a, TaskId>), - Single(Option), -} - -impl<'a> Iterator for ProcessingIter<'a> { - type Item = TaskId; - - fn next(&mut self) -> Option { - match self { - ProcessingIter::Many(iter) => iter.next().copied(), - ProcessingIter::Single(val) => val.take(), - } - } -} - -impl Processing { - fn is_nothing(&self) -> bool { - matches!(self, Processing::Nothing) - } - - pub fn ids(&self) -> impl Iterator + '_ { - match self { - Processing::DocumentAdditions(v) => ProcessingIter::Many(v.iter()), - Processing::IndexUpdate(id) | Processing::Dump(id) => ProcessingIter::Single(Some(*id)), - Processing::Nothing => ProcessingIter::Single(None), - } - } - - pub fn len(&self) -> usize { - match self { - Processing::DocumentAdditions(v) => v.len(), - Processing::IndexUpdate(_) | Processing::Dump(_) => 1, - Processing::Nothing => 0, - } - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } -} - -fn make_batch(tasks: &mut TaskQueue, config: &SchedulerConfig) -> Processing { - let mut doc_count = 0; - tasks - .head_mut(|list| match list.peek().copied() { - Some(PendingTask { - kind: TaskType::IndexUpdate, - id, - }) => { - list.pop(); - Processing::IndexUpdate(id) - } - Some(PendingTask { - kind: TaskType::Dump, - id, - }) => { - list.pop(); - Processing::Dump(id) - } - Some(PendingTask { kind, .. }) => { - let mut task_list = Vec::new(); - loop { - match list.peek() { - Some(pending) if pending.kind == kind => { - // We always need to process at least one task for the scheduler to make progress. - if config.disable_auto_batching && !task_list.is_empty() { - break; - } - let pending = list.pop().unwrap(); - task_list.push(pending.id); - - // We add the number of documents to the count if we are scheduling document additions. - match pending.kind { - TaskType::DocumentUpdate { number } - | TaskType::DocumentAddition { number } => { - doc_count += number; - } - _ => (), - } - } - _ => break, - } - } - Processing::DocumentAdditions(task_list) - } - None => Processing::Nothing, - }) - .unwrap_or(Processing::Nothing) -} - -#[cfg(test)] -mod test { - use meilisearch_types::index_uid::IndexUid; - use milli::update::IndexDocumentsMethod; - use uuid::Uuid; - - use crate::tasks::task::TaskContent; - - use super::*; - - fn gen_task(id: TaskId, content: TaskContent) -> Task { - Task { - id, - content, - events: vec![], - } - } - - #[test] - #[rustfmt::skip] - fn register_updates_multiples_indexes() { - let mut queue = TaskQueue::default(); - queue.insert(gen_task(0, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test1") })); - queue.insert(gen_task(1, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test2") })); - queue.insert(gen_task(2, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test2") })); - queue.insert(gen_task(3, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test2") })); - queue.insert(gen_task(4, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test1") })); - queue.insert(gen_task(5, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test1") })); - queue.insert(gen_task(6, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test2") })); - - let test1_tasks = queue - .head_mut(|tasks| tasks.drain().map(|t| t.id).collect::>()) - .unwrap(); - - assert_eq!(test1_tasks, &[0, 4, 5]); - - let test2_tasks = queue - .head_mut(|tasks| tasks.drain().map(|t| t.id).collect::>()) - .unwrap(); - - assert_eq!(test2_tasks, &[1, 2, 3, 6]); - - assert!(queue.index_tasks.is_empty()); - assert!(queue.queue.is_empty()); - } - - fn gen_doc_addition_task_content(index_uid: &str) -> TaskContent { - TaskContent::DocumentAddition { - content_uuid: Uuid::new_v4(), - merge_strategy: IndexDocumentsMethod::ReplaceDocuments, - primary_key: Some("test".to_string()), - documents_count: 0, - allow_index_creation: true, - index_uid: IndexUid::new_unchecked(index_uid), - } - } - - #[test] - #[rustfmt::skip] - fn test_make_batch() { - let mut queue = TaskQueue::default(); - queue.insert(gen_task(0, gen_doc_addition_task_content("test1"))); - queue.insert(gen_task(1, gen_doc_addition_task_content("test2"))); - queue.insert(gen_task(2, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test2")})); - queue.insert(gen_task(3, gen_doc_addition_task_content("test2"))); - queue.insert(gen_task(4, gen_doc_addition_task_content("test1"))); - queue.insert(gen_task(5, TaskContent::IndexDeletion { index_uid: IndexUid::new_unchecked("test1")})); - queue.insert(gen_task(6, gen_doc_addition_task_content("test2"))); - queue.insert(gen_task(7, gen_doc_addition_task_content("test1"))); - queue.insert(gen_task(8, TaskContent::Dump { uid: "adump".to_owned() })); - - let config = SchedulerConfig::default(); - - // Make sure that the dump is processed before everybody else. - let batch = make_batch(&mut queue, &config); - assert_eq!(batch, Processing::Dump(8)); - - let batch = make_batch(&mut queue, &config); - assert_eq!(batch, Processing::DocumentAdditions(vec![0, 4])); - - let batch = make_batch(&mut queue, &config); - assert_eq!(batch, Processing::DocumentAdditions(vec![1])); - - let batch = make_batch(&mut queue, &config); - assert_eq!(batch, Processing::IndexUpdate(2)); - - let batch = make_batch(&mut queue, &config); - assert_eq!(batch, Processing::DocumentAdditions(vec![3, 6])); - - let batch = make_batch(&mut queue, &config); - assert_eq!(batch, Processing::IndexUpdate(5)); - - let batch = make_batch(&mut queue, &config); - assert_eq!(batch, Processing::DocumentAdditions(vec![7])); - - assert!(queue.is_empty()); - } -} diff --git a/meilisearch-lib/src/tasks/update_loop.rs b/meilisearch-lib/src/tasks/update_loop.rs deleted file mode 100644 index b6e43e319..000000000 --- a/meilisearch-lib/src/tasks/update_loop.rs +++ /dev/null @@ -1,93 +0,0 @@ -use std::sync::Arc; - -use time::OffsetDateTime; -use tokio::sync::{watch, RwLock}; - -use super::batch::Batch; -use super::error::Result; -use super::{BatchHandler, Scheduler}; -use crate::tasks::task::TaskEvent; - -/// The update loop sequentially performs batches of updates by asking the scheduler for a batch, -/// and handing it to the `TaskPerformer`. -pub struct UpdateLoop { - scheduler: Arc>, - performers: Vec>, - - notifier: Option>, -} - -impl UpdateLoop { - pub fn new( - scheduler: Arc>, - performers: Vec>, - notifier: watch::Receiver<()>, - ) -> Self { - Self { - scheduler, - performers, - notifier: Some(notifier), - } - } - - pub async fn run(mut self) { - let mut notifier = self.notifier.take().unwrap(); - - loop { - if notifier.changed().await.is_err() { - break; - } - - if let Err(e) = self.process_next_batch().await { - log::error!("an error occurred while processing an update batch: {}", e); - } - } - } - - async fn process_next_batch(&self) -> Result<()> { - let mut batch = { self.scheduler.write().await.prepare().await? }; - let performer = self - .performers - .iter() - .find(|p| p.accept(&batch)) - .expect("No performer found for batch") - .clone(); - - batch - .content - .push_event(TaskEvent::Processing(OffsetDateTime::now_utc())); - - batch.content = { - self.scheduler - .read() - .await - .update_tasks(batch.content) - .await? - }; - - let batch = performer.process_batch(batch).await; - - self.handle_batch_result(batch, performer).await?; - - Ok(()) - } - - /// Handles the result from a processed batch. - /// - /// When a task is processed, the result of the process is pushed to its event list. The - /// `handle_batch_result` make sure that the new state is saved to the store. - /// The tasks are then removed from the processing queue. - async fn handle_batch_result( - &self, - mut batch: Batch, - performer: Arc, - ) -> Result<()> { - let mut scheduler = self.scheduler.write().await; - let content = scheduler.update_tasks(batch.content).await?; - scheduler.finish(); - drop(scheduler); - batch.content = content; - performer.finish(&batch).await; - Ok(()) - } -} diff --git a/meilisearch-lib/src/update_file_store.rs b/meilisearch-lib/src/update_file_store.rs deleted file mode 100644 index cb4eadf4d..000000000 --- a/meilisearch-lib/src/update_file_store.rs +++ /dev/null @@ -1,258 +0,0 @@ -use std::fs::{create_dir_all, File}; -use std::io::{self, BufReader, BufWriter, Write}; -use std::ops::{Deref, DerefMut}; -use std::path::{Path, PathBuf}; - -use milli::documents::DocumentsBatchReader; -use serde_json::Map; -use tempfile::{NamedTempFile, PersistError}; -use uuid::Uuid; - -#[cfg(not(test))] -pub use store::UpdateFileStore; -#[cfg(test)] -pub use test::MockUpdateFileStore as UpdateFileStore; - -const UPDATE_FILES_PATH: &str = "updates/updates_files"; - -use crate::document_formats::read_ndjson; - -pub struct UpdateFile { - path: PathBuf, - file: NamedTempFile, -} - -#[derive(Debug, thiserror::Error)] -#[error("Error while persisting update to disk: {0}")] -pub struct UpdateFileStoreError(Box); - -pub type Result = std::result::Result; - -macro_rules! into_update_store_error { - ($($other:path),*) => { - $( - impl From<$other> for UpdateFileStoreError { - fn from(other: $other) -> Self { - Self(Box::new(other)) - } - } - )* - }; -} - -into_update_store_error!( - PersistError, - io::Error, - serde_json::Error, - milli::documents::Error, - milli::documents::DocumentsBatchCursorError -); - -impl UpdateFile { - pub fn persist(self) -> Result<()> { - self.file.persist(&self.path)?; - Ok(()) - } -} - -impl Deref for UpdateFile { - type Target = NamedTempFile; - - fn deref(&self) -> &Self::Target { - &self.file - } -} - -impl DerefMut for UpdateFile { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.file - } -} - -mod store { - use super::*; - - #[derive(Clone, Debug)] - pub struct UpdateFileStore { - path: PathBuf, - } - - impl UpdateFileStore { - pub fn load_dump(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { - let src_update_files_path = src.as_ref().join(UPDATE_FILES_PATH); - let dst_update_files_path = dst.as_ref().join(UPDATE_FILES_PATH); - - // No update files to load - if !src_update_files_path.exists() { - return Ok(()); - } - - create_dir_all(&dst_update_files_path)?; - - let entries = std::fs::read_dir(src_update_files_path)?; - - for entry in entries { - let entry = entry?; - let update_file = BufReader::new(File::open(entry.path())?); - let file_uuid = entry.file_name(); - let file_uuid = file_uuid - .to_str() - .ok_or_else(|| anyhow::anyhow!("invalid update file name"))?; - let dst_path = dst_update_files_path.join(file_uuid); - let dst_file = BufWriter::new(File::create(dst_path)?); - read_ndjson(update_file, dst_file)?; - } - - Ok(()) - } - - pub fn new(path: impl AsRef) -> Result { - let path = path.as_ref().join(UPDATE_FILES_PATH); - std::fs::create_dir_all(&path)?; - Ok(Self { path }) - } - - /// Creates a new temporary update file. - /// A call to `persist` is needed to persist the file in the database. - pub fn new_update(&self) -> Result<(Uuid, UpdateFile)> { - let file = NamedTempFile::new_in(&self.path)?; - let uuid = Uuid::new_v4(); - let path = self.path.join(uuid.to_string()); - let update_file = UpdateFile { file, path }; - - Ok((uuid, update_file)) - } - - /// Returns the file corresponding to the requested uuid. - pub fn get_update(&self, uuid: Uuid) -> Result { - let path = self.path.join(uuid.to_string()); - let file = File::open(path)?; - Ok(file) - } - - /// Copies the content of the update file pointed to by `uuid` to the `dst` directory. - pub fn snapshot(&self, uuid: Uuid, dst: impl AsRef) -> Result<()> { - let src = self.path.join(uuid.to_string()); - let mut dst = dst.as_ref().join(UPDATE_FILES_PATH); - std::fs::create_dir_all(&dst)?; - dst.push(uuid.to_string()); - std::fs::copy(src, dst)?; - Ok(()) - } - - /// Peforms a dump of the given update file uuid into the provided dump path. - pub fn dump(&self, uuid: Uuid, dump_path: impl AsRef) -> Result<()> { - let uuid_string = uuid.to_string(); - let update_file_path = self.path.join(&uuid_string); - let mut dst = dump_path.as_ref().join(UPDATE_FILES_PATH); - std::fs::create_dir_all(&dst)?; - dst.push(&uuid_string); - - let update_file = File::open(update_file_path)?; - let mut dst_file = NamedTempFile::new_in(&dump_path)?; - let (mut document_cursor, index) = - DocumentsBatchReader::from_reader(update_file)?.into_cursor_and_fields_index(); - - let mut document_buffer = Map::new(); - // TODO: we need to find a way to do this more efficiently. (create a custom serializer - // for jsonl for example...) - while let Some(document) = document_cursor.next_document()? { - for (field_id, content) in document.iter() { - if let Some(field_name) = index.name(field_id) { - let content = serde_json::from_slice(content)?; - document_buffer.insert(field_name.to_string(), content); - } - } - - serde_json::to_writer(&mut dst_file, &document_buffer)?; - dst_file.write_all(b"\n")?; - document_buffer.clear(); - } - - dst_file.persist(dst)?; - - Ok(()) - } - - pub fn get_size(&self, uuid: Uuid) -> Result { - Ok(self.get_update(uuid)?.metadata()?.len()) - } - - pub async fn delete(&self, uuid: Uuid) -> Result<()> { - let path = self.path.join(uuid.to_string()); - tokio::fs::remove_file(path).await?; - Ok(()) - } - } -} - -#[cfg(test)] -mod test { - use std::sync::Arc; - - use nelson::Mocker; - - use super::*; - - #[derive(Clone)] - pub enum MockUpdateFileStore { - Real(store::UpdateFileStore), - Mock(Arc), - } - - impl MockUpdateFileStore { - pub fn mock(mocker: Mocker) -> Self { - Self::Mock(Arc::new(mocker)) - } - - pub fn load_dump(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { - store::UpdateFileStore::load_dump(src, dst) - } - - pub fn new(path: impl AsRef) -> Result { - store::UpdateFileStore::new(path).map(Self::Real) - } - - pub fn new_update(&self) -> Result<(Uuid, UpdateFile)> { - match self { - MockUpdateFileStore::Real(s) => s.new_update(), - MockUpdateFileStore::Mock(_) => todo!(), - } - } - - pub fn get_update(&self, uuid: Uuid) -> Result { - match self { - MockUpdateFileStore::Real(s) => s.get_update(uuid), - MockUpdateFileStore::Mock(_) => todo!(), - } - } - - pub fn snapshot(&self, uuid: Uuid, dst: impl AsRef) -> Result<()> { - match self { - MockUpdateFileStore::Real(s) => s.snapshot(uuid, dst), - MockUpdateFileStore::Mock(_) => todo!(), - } - } - - pub fn dump(&self, uuid: Uuid, dump_path: impl AsRef) -> Result<()> { - match self { - MockUpdateFileStore::Real(s) => s.dump(uuid, dump_path), - MockUpdateFileStore::Mock(_) => todo!(), - } - } - - pub fn get_size(&self, uuid: Uuid) -> Result { - match self { - MockUpdateFileStore::Real(s) => s.get_size(uuid), - MockUpdateFileStore::Mock(_) => todo!(), - } - } - - pub async fn delete(&self, uuid: Uuid) -> Result<()> { - match self { - MockUpdateFileStore::Real(s) => s.delete(uuid).await, - MockUpdateFileStore::Mock(mocker) => unsafe { mocker.get("delete").call(uuid) }, - } - } - } -}