diff --git a/Cargo.lock b/Cargo.lock index cb51300ac..76fa73634 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1020,6 +1020,37 @@ dependencies = [ "syn 1.0.101", ] +[[package]] +name = "derive_builder" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d07adf7be193b71cc36b193d0f5fe60b918a3a9db4dad0449f57bcfd519704a3" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f91d4cfa921f1c05904dc3c57b4a32c38aed3340cce209f3a6fd1478babafc4" +dependencies = [ + "darling", + "proc-macro2 1.0.39", + "quote 1.0.18", + "syn 1.0.96", +] + +[[package]] +name = "derive_builder_macro" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f0314b72bed045f3a68671b3c86328386762c93f82d98c65c3cb5e5f573dd68" +dependencies = [ + "derive_builder_core", + "syn 1.0.96", +] + [[package]] name = "derive_more" version = "0.99.17" @@ -1796,6 +1827,7 @@ dependencies = [ "big_s", "bincode", "csv", + "derive_builder", "document-formats", "file-store", "index", diff --git a/index-scheduler/Cargo.toml b/index-scheduler/Cargo.toml index baef2e6ca..6854dbee8 100644 --- a/index-scheduler/Cargo.toml +++ b/index-scheduler/Cargo.toml @@ -22,6 +22,7 @@ thiserror = "1.0.30" time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] } uuid = { version = "1.1.2", features = ["serde", "v4"] } synchronoise = "1.0.1" +derive_builder = "0.11.2" [dev-dependencies] nelson = { git = "https://github.com/meilisearch/nelson.git", rev = "675f13885548fb415ead8fbb447e9e6d9314000a"} diff --git a/index-scheduler/src/error.rs b/index-scheduler/src/error.rs index 1caeff33d..66cac7e0e 100644 --- a/index-scheduler/src/error.rs +++ b/index-scheduler/src/error.rs @@ -1,6 +1,8 @@ use milli::heed; use thiserror::Error; +use crate::TaskId; + #[derive(Error, Debug)] pub enum Error { #[error("Index `{0}` not found")] @@ -9,6 +11,8 @@ pub enum Error { IndexAlreadyExists(String), #[error("Corrupted task queue.")] CorruptedTaskQueue, + #[error("Task `{0}` not found")] + TaskNotFound(TaskId), #[error(transparent)] Heed(#[from] heed::Error), #[error(transparent)] diff --git a/index-scheduler/src/index_mapper.rs b/index-scheduler/src/index_mapper.rs index 4dd0f9093..767402fc0 100644 --- a/index-scheduler/src/index_mapper.rs +++ b/index-scheduler/src/index_mapper.rs @@ -107,6 +107,16 @@ impl IndexMapper { Ok(index) } + pub fn indexes(&self, rtxn: &RoTxn) -> Result> { + self.index_mapping + .iter(&rtxn)? + .map(|ret| { + ret.map_err(Error::from) + .and_then(|(name, _)| self.index(rtxn, name)) + }) + .collect() + } + /// Swap two index name. pub fn swap(&self, wtxn: &mut RwTxn, lhs: &str, rhs: &str) -> Result<()> { let lhs_uuid = self diff --git a/index-scheduler/src/index_scheduler.rs b/index-scheduler/src/index_scheduler.rs index 9c7f7838f..2c264bb1d 100644 --- a/index-scheduler/src/index_scheduler.rs +++ b/index-scheduler/src/index_scheduler.rs @@ -1,6 +1,6 @@ use crate::index_mapper::IndexMapper; use crate::task::{Kind, KindWithContent, Status, Task, TaskView}; -use crate::{Error, Result}; +use crate::{Error, Result, TaskId}; use file_store::FileStore; use index::Index; use milli::update::IndexerConfig; @@ -20,7 +20,7 @@ use serde::Deserialize; const DEFAULT_LIMIT: fn() -> u32 = || 20; -#[derive(Debug, Clone, Deserialize)] +#[derive(derive_builder::Builder, Debug, Clone, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Query { #[serde(default = "DEFAULT_LIMIT")] @@ -32,6 +32,38 @@ pub struct Query { index_uid: Option>, } +impl Default for Query { + fn default() -> Self { + Self { + limit: DEFAULT_LIMIT(), + from: None, + status: None, + kind: None, + index_uid: None, + } + } +} + +impl Query { + pub fn with_status(self, status: Status) -> Self { + let mut status_vec = self.status.unwrap_or_default(); + status_vec.push(status); + Self { + status: Some(status_vec), + ..self + } + } + + pub fn with_kind(self, kind: Kind) -> Self { + let mut kind_vec = self.kind.unwrap_or_default(); + kind_vec.push(kind); + Self { + kind: Some(kind_vec), + ..self + } + } +} + pub mod db_name { pub const ALL_TASKS: &str = "all-tasks"; pub const STATUS: &str = "status"; @@ -73,20 +105,20 @@ pub struct IndexScheduler { impl IndexScheduler { pub fn new( - db_path: PathBuf, + tasks_path: PathBuf, update_file_path: PathBuf, indexes_path: PathBuf, index_size: usize, indexer_config: IndexerConfig, ) -> Result { - std::fs::create_dir_all(&db_path)?; + std::fs::create_dir_all(&tasks_path)?; std::fs::create_dir_all(&update_file_path)?; std::fs::create_dir_all(&indexes_path)?; let mut options = heed::EnvOpenOptions::new(); options.max_dbs(6); - let env = options.open(db_path)?; + let env = options.open(tasks_path)?; // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things let wake_up = SignalEvent::auto(true); @@ -115,6 +147,12 @@ impl IndexScheduler { self.index_mapper.index(&rtxn, name) } + /// Return and open all the indexes. + pub fn indexes(&self) -> Result> { + let rtxn = self.env.read_txn()?; + self.index_mapper.indexes(&rtxn) + } + /// Returns the tasks corresponding to the query. pub fn get_tasks(&self, query: Query) -> Result> { let rtxn = self.env.read_txn()?; @@ -155,6 +193,15 @@ impl IndexScheduler { Ok(tasks.into_iter().map(|task| task.as_task_view()).collect()) } + /// Returns the tasks corresponding to the query. + pub fn task(&self, uid: TaskId) -> Result { + let rtxn = self.env.read_txn()?; + self.get_task(&rtxn, uid).and_then(|opt| { + opt.ok_or(Error::TaskNotFound(uid)) + .map(|task| task.as_task_view()) + }) + } + /// Register a new task in the scheduler. If it fails and data was associated with the task /// it tries to delete the file. pub fn register(&self, task: KindWithContent) -> Result { diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index d90972174..3b599308b 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -9,12 +9,12 @@ mod utils; pub type Result = std::result::Result; pub type TaskId = u32; -pub use crate::index_scheduler::IndexScheduler; +pub use crate::index_scheduler::{IndexScheduler, Query}; pub use error::Error; /// from the exterior you don't need to know there is multiple type of `Kind` -pub use task::KindWithContent as TaskKind; +pub use task::KindWithContent; /// from the exterior you don't need to know there is multiple type of `Task` -pub use task::TaskView as Task; +pub use task::TaskView; #[cfg(test)] mod tests { diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index ab5372908..cab36ae65 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -1,39 +1,30 @@ use std::collections::BTreeMap; use std::fmt; -use std::io::Cursor; use std::path::{Path, PathBuf}; -use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use actix_web::error::PayloadError; use bytes::Bytes; -use file_store::FileStore; use futures::Stream; -use futures::StreamExt; -use index_scheduler::IndexScheduler; -use index_scheduler::TaskKind; +use index_scheduler::task::{Status, Task}; +use index_scheduler::{IndexScheduler, KindWithContent, TaskId, TaskView}; use meilisearch_auth::SearchRules; -use meilisearch_types::index_uid::IndexUid; use milli::update::{IndexDocumentsMethod, IndexerConfig}; use serde::{Deserialize, Serialize}; use time::OffsetDateTime; -use tokio::sync::RwLock; use tokio::task::spawn_blocking; use tokio::time::sleep; use uuid::Uuid; -use crate::document_formats::{read_csv, read_json, read_ndjson}; // use crate::dump::{self, load_dump, DumpHandler}; use crate::options::{IndexerOpts, SchedulerConfig}; -use crate::snapshot::{load_snapshot, SnapshotService}; +// use crate::snapshot::{load_snapshot, SnapshotService}; use error::Result; use index::{ - Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked, + Checked, Document, Index, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked, }; -use self::error::IndexControllerError; - pub mod error; pub mod versioning; @@ -187,7 +178,6 @@ impl IndexControllerBuilder { let meta_env = Arc::new(open_meta_env(db_path.as_ref(), task_store_size)?); - let file_store = FileStore::new(&db_path)?; // Create or overwrite the version file for this DB versioning::create_version_file(db_path.as_ref())?; @@ -204,13 +194,15 @@ impl IndexControllerBuilder { max_positions_per_attributes: None, }; - let scheduler = IndexScheduler::new( - db_path.as_ref().to_path_buf(), + let index_scheduler = IndexScheduler::new( + db_path.as_ref().join("tasks"), + db_path.as_ref().join("update_files"), + db_path.as_ref().join("indexes"), index_size, indexer_config, - file_store, - ); + )?; + /* if self.schedule_snapshot { let snapshot_period = self .snapshot_interval @@ -230,10 +222,9 @@ impl IndexControllerBuilder { tokio::task::spawn_local(snapshot_service.run()); } + */ - Ok(Meilisearch { - index_scheduler: scheduler, - }) + Ok(Meilisearch { index_scheduler }) } /// Set the index controller builder's max update store size. @@ -318,100 +309,25 @@ impl Meilisearch { IndexControllerBuilder::default() } - pub async fn register_task(&self, task: TaskKind) -> Result { - Ok(self.index_scheduler.register(task).await?) + pub async fn register_task(&self, task: KindWithContent) -> Result { + let this = self.clone(); + Ok( + tokio::task::spawn_blocking(move || this.clone().index_scheduler.register(task)) + .await??, + ) } - pub async fn get_task(&self, id: TaskId, filter: Option) -> Result { - let task = self.scheduler.read().await.get_task(id, filter).await?; - Ok(task) + pub async fn get_task(&self, id: TaskId) -> Result { + Ok(self.index_scheduler.task(id)?) } - pub async fn get_index_task(&self, index_uid: String, task_id: TaskId) -> Result { - let creation_task_id = self - .index_resolver - .get_index_creation_task_id(index_uid.clone()) - .await?; - if task_id < creation_task_id { - return Err(TaskError::UnexistingTask(task_id).into()); - } - - let mut filter = TaskFilter::default(); - filter.filter_index(index_uid); - let task = self - .scheduler - .read() - .await - .get_task(task_id, Some(filter)) - .await?; - - Ok(task) + pub async fn list_tasks(&self, filter: index_scheduler::Query) -> Result> { + Ok(self.index_scheduler.get_tasks(filter)?) } - pub async fn list_tasks( - &self, - filter: Option, - limit: Option, - offset: Option, - ) -> Result> { - let tasks = self - .scheduler - .read() - .await - .list_tasks(offset, filter, limit) - .await?; - - Ok(tasks) - } - - pub async fn list_index_task( - &self, - index_uid: String, - limit: Option, - offset: Option, - ) -> Result> { - let task_id = self - .index_resolver - .get_index_creation_task_id(index_uid.clone()) - .await?; - - let mut filter = TaskFilter::default(); - filter.filter_index(index_uid); - - let tasks = self - .scheduler - .read() - .await - .list_tasks( - Some(offset.unwrap_or_default() + task_id), - Some(filter), - limit, - ) - .await?; - - Ok(tasks) - } - - pub async fn list_indexes(&self) -> Result> { - let indexes = self.index_resolver.list().await?; - let mut ret = Vec::new(); - for (uid, index) in indexes { - let meta = index.meta()?; - let meta = IndexMetadata { - uuid: index.uuid(), - uid, - meta, - }; - ret.push(meta); - } - - Ok(ret) - } - - pub async fn settings(&self, uid: String) -> Result> { - let index = self.index_resolver.get_index(uid).await?; - let settings = spawn_blocking(move || index.settings()).await??; - Ok(settings) + pub async fn list_indexes(&self) -> Result> { + let this = self.clone(); + Ok(spawn_blocking(move || this.index_scheduler.indexes()).await??) } /// Return the total number of documents contained in the index + the selected documents. @@ -422,11 +338,12 @@ impl Meilisearch { limit: usize, attributes_to_retrieve: Option>, ) -> Result<(u64, Vec)> { - let index = self.index_resolver.get_index(uid).await?; - let result = - spawn_blocking(move || index.retrieve_documents(offset, limit, attributes_to_retrieve)) - .await??; - Ok(result) + let this = self.clone(); + spawn_blocking(move || -> Result<_> { + let index = this.index_scheduler.index(&uid)?; + Ok(index.retrieve_documents(offset, limit, attributes_to_retrieve)?) + }) + .await? } pub async fn document( @@ -435,35 +352,38 @@ impl Meilisearch { doc_id: String, attributes_to_retrieve: Option>, ) -> Result { - let index = self.index_resolver.get_index(uid).await?; - let document = - spawn_blocking(move || index.retrieve_document(doc_id, attributes_to_retrieve)) - .await??; - Ok(document) + let this = self.clone(); + spawn_blocking(move || -> Result<_> { + let index = this.index_scheduler.index(&uid)?; + Ok(index.retrieve_document(doc_id, attributes_to_retrieve)?) + }) + .await? } pub async fn search(&self, uid: String, query: SearchQuery) -> Result { - let index = self.index_resolver.get_index(uid).await?; - let result = spawn_blocking(move || index.perform_search(query)).await??; - Ok(result) + let this = self.clone(); + spawn_blocking(move || -> Result<_> { + let index = this.index_scheduler.index(&uid)?; + Ok(index.perform_search(query)?) + }) + .await? } - pub async fn get_index(&self, uid: String) -> Result { - let index = self.index_resolver.get_index(uid.clone()).await?; - let uuid = index.uuid(); - let meta = spawn_blocking(move || index.meta()).await??; - let meta = IndexMetadata { uuid, uid, meta }; - Ok(meta) + pub async fn get_index(&self, uid: String) -> Result { + let this = self.clone(); + Ok(spawn_blocking(move || this.index_scheduler.index(&uid)).await??) } pub async fn get_index_stats(&self, uid: String) -> Result { - let processing_tasks = self.scheduler.read().await.get_processing_tasks().await?; + let processing_tasks = self + .index_scheduler + .get_tasks(index_scheduler::Query::default().with_status(Status::Processing))?; // Check if the currently indexing update is from our index. - let is_indexing = processing_tasks - .first() - .map_or(false, |task| task.index_uid().map_or(false, |u| u == uid)); + let is_indexing = processing_tasks.first().map_or(false, |task| { + task.index_uid.as_ref().map_or(false, |u| u == &uid) + }); - let index = self.index_resolver.get_index(uid).await?; + let index = self.get_index(uid).await?; let mut stats = spawn_blocking(move || index.stats()).await??; stats.is_indexing = Some(is_indexing); @@ -474,12 +394,15 @@ impl Meilisearch { let mut last_task: Option = None; let mut indexes = BTreeMap::new(); let mut database_size = 0; - let processing_tasks = self.scheduler.read().await.get_processing_tasks().await?; + let processing_tasks = self + .index_scheduler + .get_tasks(index_scheduler::Query::default().with_status(Status::Processing))?; - for (index_uid, index) in self.index_resolver.list().await? { - if !search_rules.is_index_authorized(&index_uid) { + for index in self.list_indexes().await? { + if !search_rules.is_index_authorized(&index.name) { continue; } + let index_name = index.name.clone(); let (mut stats, meta) = spawn_blocking::<_, Result<(IndexStats, IndexMeta)>>(move || { @@ -496,10 +419,10 @@ impl Meilisearch { // Check if the currently indexing update is from our index. stats.is_indexing = processing_tasks .first() - .and_then(|p| p.index_uid().map(|u| u == index_uid)) + .and_then(|p| p.index_uid.as_ref().map(|u| u == &index_name)) .or(Some(false)); - indexes.insert(index_uid, stats); + indexes.insert(index_name, stats); } Ok(Stats {