From 1a479490638c7cc5afba817949383c0df64ccdbf Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 6 Sep 2022 16:43:59 +0200 Subject: [PATCH] =?UTF-8?q?START=20THE=C2=A0REWRITE=C2=A0OF=C2=A0THE=C2=A0?= =?UTF-8?q?INDEX=C2=A0SCHEDULER:=20index=20&=20register=20has=20been=20imp?= =?UTF-8?q?lemented?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.lock | 127 ++++++++++++++++++++++++-- Cargo.toml | 1 + index-scheduler/Cargo.toml | 15 ++++ index-scheduler/src/error.rs | 7 ++ index-scheduler/src/lib.rs | 168 +++++++++++++++++++++++++++++++++++ index-scheduler/src/main.rs | 3 + index-scheduler/src/task.rs | 141 +++++++++++++++++++++++++++++ 7 files changed, 453 insertions(+), 9 deletions(-) create mode 100644 index-scheduler/Cargo.toml create mode 100644 index-scheduler/src/error.rs create mode 100644 index-scheduler/src/lib.rs create mode 100644 index-scheduler/src/main.rs create mode 100644 index-scheduler/src/task.rs diff --git a/Cargo.lock b/Cargo.lock index 69b0af37d..3ae4e2ac8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1181,6 +1181,15 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "filter-parser" +version = "0.33.0" +source = "git+https://github.com/meilisearch/milli.git?tag=v0.33.0#a79ff8a1a98a807f40f970131c8de2ab11560de5" +dependencies = [ + "nom", + "nom_locate", +] + [[package]] name = "filter-parser" version = "0.34.0" @@ -1200,6 +1209,14 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "flatten-serde-json" +version = "0.33.0" +source = "git+https://github.com/meilisearch/milli.git?tag=v0.33.0#a79ff8a1a98a807f40f970131c8de2ab11560de5" +dependencies = [ + "serde_json", +] + [[package]] name = "flatten-serde-json" version = "0.34.0" @@ -1358,6 +1375,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "geoutils" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e006f616a407d396ace1d2ebb3f43ed73189db8b098079bd129928d7645dd1e" + [[package]] name = "geoutils" version = "0.5.1" @@ -1631,6 +1654,19 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "index-scheduler" +version = "0.1.0" +dependencies = [ + "anyhow", + "bincode", + "milli 0.33.0", + "roaring 0.9.0", + "serde", + "thiserror", + "time", +] + [[package]] name = "indexmap" version = "1.9.1" @@ -1711,6 +1747,14 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "json-depth-checker" +version = "0.33.0" +source = "git+https://github.com/meilisearch/milli.git?tag=v0.33.0#a79ff8a1a98a807f40f970131c8de2ab11560de5" +dependencies = [ + "serde_json", +] + [[package]] name = "json-depth-checker" version = "0.34.0" @@ -2060,7 +2104,7 @@ dependencies = [ "enum-iterator", "hmac", "meilisearch-types", - "milli", + "milli 0.34.0", "rand", "serde", "serde_json", @@ -2178,7 +2222,7 @@ dependencies = [ "log", "meilisearch-auth", "meilisearch-types", - "milli", + "milli 0.34.0", "mime", "mockall", "nelson", @@ -2195,7 +2239,7 @@ dependencies = [ "rayon", "regex", "reqwest", - "roaring", + "roaring 0.10.1", "rustls", "serde", "serde_json", @@ -2247,6 +2291,51 @@ dependencies = [ "autocfg", ] +[[package]] +name = "milli" +version = "0.33.0" +source = "git+https://github.com/meilisearch/milli.git?tag=v0.33.0#a79ff8a1a98a807f40f970131c8de2ab11560de5" +dependencies = [ + "bimap", + "bincode", + "bstr 0.2.17", + "byteorder", + "charabia", + "concat-arrays", + "crossbeam-channel", + "csv", + "either", + "filter-parser 0.33.0", + "flatten-serde-json 0.33.0", + "fst", + "fxhash", + "geoutils 0.4.1", + "grenad", + "heed", + "itertools", + "json-depth-checker 0.33.0", + "levenshtein_automata", + "log", + "logging_timer", + "memmap2", + "obkv", + "once_cell", + "ordered-float 2.10.0", + "rayon", + "roaring 0.9.0", + "rstar", + "serde", + "serde_json", + "slice-group-by", + "smallstr", + "smallvec", + "smartstring", + "tempfile", + "thiserror", + "time", + "uuid", +] + [[package]] name = "milli" version = "0.34.0" @@ -2261,24 +2350,24 @@ dependencies = [ "crossbeam-channel", "csv", "either", - "filter-parser", - "flatten-serde-json", + "filter-parser 0.34.0", + "flatten-serde-json 0.34.0", "fst", "fxhash", - "geoutils", + "geoutils 0.5.1", "grenad", "heed", "itertools", - "json-depth-checker", + "json-depth-checker 0.34.0", "levenshtein_automata", "log", "logging_timer", "memmap2", "obkv", "once_cell", - "ordered-float", + "ordered-float 3.3.0", "rayon", - "roaring", + "roaring 0.10.1", "rstar", "serde", "serde_json", @@ -2504,6 +2593,15 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "ordered-float" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-float" version = "3.3.0" @@ -3055,6 +3153,17 @@ dependencies = [ "regex", ] +[[package]] +name = "roaring" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd539cab4e32019956fe7e0cf160bb6d4802f4be2b52c4253d76d3bb0f85a5f7" +dependencies = [ + "bytemuck", + "byteorder", + "retain_mut", +] + [[package]] name = "roaring" version = "0.10.1" diff --git a/Cargo.toml b/Cargo.toml index 678d1b78b..e4325adce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "meilisearch-types", "meilisearch-lib", "meilisearch-auth", + "index-scheduler", "permissive-json-pointer", ] diff --git a/index-scheduler/Cargo.toml b/index-scheduler/Cargo.toml new file mode 100644 index 000000000..057e59324 --- /dev/null +++ b/index-scheduler/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "index-scheduler" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.64" +bincode = "1.3.3" +milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.33.0" } +roaring = "0.9.0" +serde = { version = "1.0.136", features = ["derive"] } +thiserror = "1.0.30" +time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] } diff --git a/index-scheduler/src/error.rs b/index-scheduler/src/error.rs new file mode 100644 index 000000000..bc9a2e4c7 --- /dev/null +++ b/index-scheduler/src/error.rs @@ -0,0 +1,7 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum Error { + #[error("Index not found")] + IndexNotFound, +} diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs new file mode 100644 index 000000000..7dbf14623 --- /dev/null +++ b/index-scheduler/src/lib.rs @@ -0,0 +1,168 @@ +pub mod error; +pub mod task; + +use error::Error; +use milli::heed::types::{DecodeIgnore, OwnedType, SerdeBincode, Str}; +pub use task::Task; +use task::{Kind, Status}; + +use std::collections::hash_map::Entry; +use std::sync::atomic::AtomicBool; +use std::sync::Arc; +use std::{collections::HashMap, sync::RwLock}; + +use anyhow::Result; +use milli::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn}; +use milli::{Index, RoaringBitmapCodec, BEU32}; +use roaring::RoaringBitmap; + +pub type TaskId = u32; +type IndexName = String; +type IndexUuid = String; + +/// This module is responsible for two things; +/// 1. Resolve the name of the indexes. +/// 2. Schedule the tasks. + +#[derive(Clone)] +pub struct IndexScheduler { + // Keep track of the opened indexes and is used + // mainly by the index resolver. + index_map: Arc>>, + + /// The list of tasks currently processing. + processing_tasks: Arc>, + + /// The LMDB environment which the DBs are associated with. + env: Env, + + // The main database, it contains all the tasks accessible by their Id. + all_tasks: Database, SerdeBincode>, + + // All the tasks ids grouped by their status. + status: Database, RoaringBitmapCodec>, + // All the tasks ids grouped by their kind. + kind: Database, RoaringBitmapCodec>, + + // Map an index name with an indexuuid. + index_name_mapper: Database, + // Store the tasks associated to an index. + index_tasks: Database, + + // set to true when there is work to do. + wake_up: Arc, +} + +impl IndexScheduler { + pub fn index(&self, name: &str) -> Result { + let rtxn = self.env.read_txn()?; + let uuid = self + .index_name_mapper + .get(&rtxn, name)? + .ok_or(Error::IndexNotFound)?; + // we clone here to drop the lock before entering the match + let index = self.index_map.read().unwrap().get(&*uuid).cloned(); + let index = match index { + Some(index) => index, + // since we're lazy, it's possible that the index doesn't exist yet. + // We need to open it ourselves. + None => { + let mut index_map = self.index_map.write().unwrap(); + // between the read lock and the write lock it's not impossible + // that someone already opened the index (eg if two search happens + // at the same time), thus before opening it we check a second time + // if it's not already there. + // Since there is a good chance it's not already there we can use + // the entry method. + match index_map.entry(uuid.to_string()) { + Entry::Vacant(entry) => { + // TODO: TAMO: get the envopenoptions from somewhere + let index = milli::Index::new(EnvOpenOptions::new(), uuid)?; + entry.insert(index.clone()); + index + } + Entry::Occupied(entry) => entry.get().clone(), + } + } + }; + + Ok(index) + } + + fn next_task_id(&self, rtxn: &RoTxn) -> Result { + Ok(self + .all_tasks + .remap_data_type::() + .last(rtxn)? + .map(|(k, _)| k.get()) + .unwrap_or(0)) + } + + /// Register a new task in the scheduler. If it fails and data was associated with the task + /// it tries to delete the file. + pub fn register(&self, task: Task) -> Result<()> { + let mut wtxn = self.env.write_txn()?; + + let task_id = self.next_task_id(&wtxn)?; + + self.all_tasks + .append(&mut wtxn, &BEU32::new(task_id), &task)?; + + self.update_status(&mut wtxn, Status::Enqueued, |mut bitmap| { + bitmap.insert(task_id); + bitmap + })?; + + self.update_kind(&mut wtxn, &task.kind, |mut bitmap| { + bitmap.insert(task_id); + bitmap + })?; + + // we persist the file in last to be sure everything before was applied successfuly + task.persist()?; + + match wtxn.commit() { + Ok(()) => (), + e @ Err(_) => { + task.remove_data()?; + e?; + } + } + + self.notify(); + + Ok(()) + } + + pub fn notify(&self) { + self.wake_up + .store(true, std::sync::atomic::Ordering::Relaxed); + } + + fn update_status( + &self, + wtxn: &mut RwTxn, + status: Status, + f: impl Fn(RoaringBitmap) -> RoaringBitmap, + ) -> Result<()> { + let tasks = self.status.get(&wtxn, &status)?.unwrap_or_default(); + let tasks = f(tasks); + self.status.put(wtxn, &status, &tasks)?; + + Ok(()) + } + + fn update_kind( + &self, + wtxn: &mut RwTxn, + kind: &Kind, + f: impl Fn(RoaringBitmap) -> RoaringBitmap, + ) -> Result<()> { + let kind = BEU32::new(kind.to_u32()); + let tasks = self.kind.get(&wtxn, &kind)?.unwrap_or_default(); + let tasks = f(tasks); + self.kind.put(wtxn, &kind, &tasks)?; + + Ok(()) + } +} diff --git a/index-scheduler/src/main.rs b/index-scheduler/src/main.rs new file mode 100644 index 000000000..e7a11a969 --- /dev/null +++ b/index-scheduler/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +} diff --git a/index-scheduler/src/task.rs b/index-scheduler/src/task.rs new file mode 100644 index 000000000..3c928c280 --- /dev/null +++ b/index-scheduler/src/task.rs @@ -0,0 +1,141 @@ +use anyhow::Result; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; +use time::OffsetDateTime; + +use crate::TaskId; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum Status { + Enqueued, + Processing, + Succeeded, + Failed, +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Task { + #[serde(with = "time::serde::rfc3339::option")] + pub enqueued_at: Option, + #[serde(with = "time::serde::rfc3339::option")] + pub started_at: Option, + #[serde(with = "time::serde::rfc3339::option")] + pub finished_at: Option, + + pub status: Status, + pub kind: Kind, +} + +impl Task { + pub fn persist(&self) -> Result<()> { + self.kind.persist() + } + + pub fn remove_data(&self) -> Result<()> { + self.kind.remove_data() + } +} + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum Kind { + DumpExport { + output: PathBuf, + }, + DocumentAddition { + index_name: String, + content_file: String, + }, + DocumentDeletion { + index_name: String, + documents_ids: Vec, + }, + ClearAllDocuments { + index_name: String, + }, + // TODO: TAMO: uncomment the settings + // Settings { + // index_name: String, + // new_settings: Settings, + // }, + RenameIndex { + index_name: String, + new_name: String, + }, + CreateIndex { + index_name: String, + primary_key: Option, + }, + DeleteIndex { + index_name: String, + }, + SwapIndex { + lhs: String, + rhs: String, + }, + CancelTask { + tasks: Vec, + }, +} + +impl Kind { + pub fn persist(&self) -> Result<()> { + match self { + Kind::DocumentAddition { + index_name, + content_file, + } => { + // TODO: TAMO: persist the file + // content_file.persist(); + Ok(()) + } + // There is nothing to persist for all these tasks + Kind::DumpExport { .. } + | Kind::DocumentDeletion { .. } + | Kind::ClearAllDocuments { .. } + | Kind::RenameIndex { .. } + | Kind::CreateIndex { .. } + | Kind::DeleteIndex { .. } + | Kind::SwapIndex { .. } + | Kind::CancelTask { .. } => Ok(()), + } + } + + pub fn remove_data(&self) -> Result<()> { + match self { + Kind::DocumentAddition { + index_name, + content_file, + } => { + // TODO: TAMO: delete the file + // content_file.delete(); + Ok(()) + } + // There is no data associated with all these tasks + Kind::DumpExport { .. } + | Kind::DocumentDeletion { .. } + | Kind::ClearAllDocuments { .. } + | Kind::RenameIndex { .. } + | Kind::CreateIndex { .. } + | Kind::DeleteIndex { .. } + | Kind::SwapIndex { .. } + | Kind::CancelTask { .. } => Ok(()), + } + } + + pub fn to_u32(&self) -> u32 { + match self { + Kind::DumpExport { .. } => 0, + Kind::DocumentAddition { .. } => 1, + Kind::DocumentDeletion { .. } => 2, + Kind::ClearAllDocuments { .. } => 3, + Kind::RenameIndex { .. } => 4, + Kind::CreateIndex { .. } => 5, + Kind::DeleteIndex { .. } => 6, + Kind::SwapIndex { .. } => 7, + Kind::CancelTask { .. } => 8, + } + } +}