diff --git a/Cargo.lock b/Cargo.lock index cbb9e76f7..868d1c631 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -825,6 +825,12 @@ version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2c9736e15e7df1638a7f6eee92a6511615c738246a052af5ba86f039b65aede" +[[package]] +name = "difference" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524cbf6897b527295dff137cec09ecf3a05f4fddffd7dfcd1585403449e74198" + [[package]] name = "digest" version = "0.8.1" @@ -849,6 +855,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" +[[package]] +name = "downcast" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bb454f0228b18c7f4c3b0ebbee346ed9c52e7443b0999cd543ff3571205701d" + [[package]] name = "either" version = "1.6.1" @@ -933,6 +945,15 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "float-cmp" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1267f4ac4f343772758f7b1bdcbe767c218bbab93bb432acbf5162bbf85a6c4" +dependencies = [ + "num-traits", +] + [[package]] name = "fnv" version = "1.0.7" @@ -949,6 +970,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69a039c3498dc930fe810151a34ba0c1c70b02b8625035592e74432f678591f2" + [[package]] name = "fs_extra" version = "1.2.0" @@ -1688,6 +1715,7 @@ dependencies = [ "memmap", "milli", "mime", + "mockall", "num_cpus", "obkv", "once_cell", @@ -1847,6 +1875,39 @@ dependencies = [ "winapi", ] +[[package]] +name = "mockall" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ab571328afa78ae322493cacca3efac6a0f2e0a67305b4df31fd439ef129ac0" +dependencies = [ + "cfg-if 1.0.0", + "downcast", + "fragile", + "lazy_static", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7e25b214433f669161f414959594216d8e6ba83b6679d3db96899c0b4639033" +dependencies = [ + "cfg-if 1.0.0", + "proc-macro2 1.0.29", + "quote 1.0.9", + "syn 1.0.77", +] + +[[package]] +name = "normalize-line-endings" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61807f77802ff30975e01f4f071c8ba10c022052f98b3294119f3e615d13e5be" + [[package]] name = "ntapi" version = "0.3.6" @@ -2119,6 +2180,35 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" +[[package]] +name = "predicates" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f49cfaf7fdaa3bfacc6fa3e7054e65148878354a5cfddcf661df4c851f8021df" +dependencies = [ + "difference", + "float-cmp", + "normalize-line-endings", + "predicates-core", + "regex", +] + +[[package]] +name = "predicates-core" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57e35a3326b75e49aa85f5dc6ec15b41108cf5aee58eabb1f274dd18b73c2451" + +[[package]] +name = "predicates-tree" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7dd0fd014130206c9352efbdc92be592751b2b9274dff685348341082c6ea3d" +dependencies = [ + "predicates-core", + "treeline", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -3044,6 +3134,12 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "treeline" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7f741b240f1a48843f9b8e0444fb55fb2a4ff67293b50a9179dfd5ea67f8d41" + [[package]] name = "try-lock" version = "0.2.3" diff --git a/meilisearch-lib/Cargo.toml b/meilisearch-lib/Cargo.toml index 713d07fc3..b741e80d8 100644 --- a/meilisearch-lib/Cargo.toml +++ b/meilisearch-lib/Cargo.toml @@ -60,4 +60,5 @@ derivative = "2.2.0" [dev-dependencies] actix-rt = "2.2.0" +mockall = "0.10.2" paste = "1.0.5" diff --git a/meilisearch-lib/src/index/index.rs b/meilisearch-lib/src/index/index.rs new file mode 100644 index 000000000..e7d36f62d --- /dev/null +++ b/meilisearch-lib/src/index/index.rs @@ -0,0 +1,287 @@ +use std::collections::{BTreeSet, HashSet}; +use std::fs::create_dir_all; +use std::marker::PhantomData; +use std::ops::Deref; +use std::path::Path; +use std::sync::Arc; + +use chrono::{DateTime, Utc}; +use heed::{EnvOpenOptions, RoTxn}; +use milli::update::Setting; +use milli::{obkv_to_json, FieldDistribution, FieldId}; +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; +use uuid::Uuid; + +use crate::index_controller::update_file_store::UpdateFileStore; +use crate::EnvSizer; + +use super::{Checked, Settings}; +use super::error::IndexError; +use super::update_handler::UpdateHandler; +use super::error::Result; + +pub type Document = Map; + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct IndexMeta { + created_at: DateTime, + pub updated_at: DateTime, + pub primary_key: Option, +} + +impl IndexMeta { + pub fn new(index: &Index) -> Result { + let txn = index.read_txn()?; + Self::new_txn(index, &txn) + } + + pub fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result { + let created_at = index.created_at(txn)?; + let updated_at = index.updated_at(txn)?; + let primary_key = index.primary_key(txn)?.map(String::from); + Ok(Self { + created_at, + updated_at, + primary_key, + }) + } +} + +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct IndexStats { + #[serde(skip)] + pub size: u64, + pub number_of_documents: u64, + /// Whether the current index is performing an update. It is initially `None` when the + /// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is + /// later set to either true or false, we we retrieve the information from the `UpdateStore` + pub is_indexing: Option, + pub field_distribution: FieldDistribution, +} + +#[derive(Clone, derivative::Derivative)] +#[derivative(Debug)] +pub struct Index { + pub uuid: Uuid, + #[derivative(Debug = "ignore")] + pub inner: Arc, + #[derivative(Debug = "ignore")] + pub update_file_store: Arc, + #[derivative(Debug = "ignore")] + pub update_handler: Arc, +} + +impl Deref for Index { + type Target = milli::Index; + + fn deref(&self) -> &Self::Target { + self.inner.as_ref() + } +} + +impl Index { + pub fn open( + path: impl AsRef, + size: usize, + update_file_store: Arc, + uuid: Uuid, + update_handler: Arc, + ) -> Result { + create_dir_all(&path)?; + let mut options = EnvOpenOptions::new(); + options.map_size(size); + let inner = Arc::new(milli::Index::new(options, &path)?); + Ok(Index { + inner, + update_file_store, + uuid, + update_handler, + }) + } + + pub fn inner(&self) -> &milli::Index { + &self.inner + } + + pub fn stats(&self) -> Result { + let rtxn = self.read_txn()?; + + Ok(IndexStats { + size: self.size(), + number_of_documents: self.number_of_documents(&rtxn)?, + is_indexing: None, + field_distribution: self.field_distribution(&rtxn)?, + }) + } + + pub fn meta(&self) -> Result { + IndexMeta::new(self) + } + pub fn settings(&self) -> Result> { + let txn = self.read_txn()?; + self.settings_txn(&txn) + } + + pub fn uuid(&self) -> Uuid { + self.uuid + } + + pub fn settings_txn(&self, txn: &RoTxn) -> Result> { + let displayed_attributes = self + .displayed_fields(txn)? + .map(|fields| fields.into_iter().map(String::from).collect()); + + let searchable_attributes = self + .searchable_fields(txn)? + .map(|fields| fields.into_iter().map(String::from).collect()); + + let filterable_attributes = self.filterable_fields(txn)?.into_iter().collect(); + + let sortable_attributes = self.sortable_fields(txn)?.into_iter().collect(); + + let criteria = self + .criteria(txn)? + .into_iter() + .map(|c| c.to_string()) + .collect(); + + let stop_words = self + .stop_words(txn)? + .map(|stop_words| -> Result> { + Ok(stop_words.stream().into_strs()?.into_iter().collect()) + }) + .transpose()? + .unwrap_or_else(BTreeSet::new); + let distinct_field = self.distinct_field(txn)?.map(String::from); + + // in milli each word in the synonyms map were split on their separator. Since we lost + // this information we are going to put space between words. + let synonyms = self + .synonyms(txn)? + .iter() + .map(|(key, values)| { + ( + key.join(" "), + values.iter().map(|value| value.join(" ")).collect(), + ) + }) + .collect(); + + Ok(Settings { + displayed_attributes: match displayed_attributes { + Some(attrs) => Setting::Set(attrs), + None => Setting::Reset, + }, + searchable_attributes: match searchable_attributes { + Some(attrs) => Setting::Set(attrs), + None => Setting::Reset, + }, + filterable_attributes: Setting::Set(filterable_attributes), + sortable_attributes: Setting::Set(sortable_attributes), + ranking_rules: Setting::Set(criteria), + stop_words: Setting::Set(stop_words), + distinct_attribute: match distinct_field { + Some(field) => Setting::Set(field), + None => Setting::Reset, + }, + synonyms: Setting::Set(synonyms), + _kind: PhantomData, + }) + } + + pub fn retrieve_documents>( + &self, + offset: usize, + limit: usize, + attributes_to_retrieve: Option>, + ) -> Result>> { + let txn = self.read_txn()?; + + let fields_ids_map = self.fields_ids_map(&txn)?; + let fields_to_display = + self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?; + + let iter = self.documents.range(&txn, &(..))?.skip(offset).take(limit); + + let mut documents = Vec::new(); + + for entry in iter { + let (_id, obkv) = entry?; + let object = obkv_to_json(&fields_to_display, &fields_ids_map, obkv)?; + documents.push(object); + } + + Ok(documents) + } + + pub fn retrieve_document>( + &self, + doc_id: String, + attributes_to_retrieve: Option>, + ) -> Result> { + let txn = self.read_txn()?; + + let fields_ids_map = self.fields_ids_map(&txn)?; + + let fields_to_display = + self.fields_to_display(&txn, &attributes_to_retrieve, &fields_ids_map)?; + + let internal_id = self + .external_documents_ids(&txn)? + .get(doc_id.as_bytes()) + .ok_or_else(|| IndexError::DocumentNotFound(doc_id.clone()))?; + + let document = self + .documents(&txn, std::iter::once(internal_id))? + .into_iter() + .next() + .map(|(_, d)| d) + .ok_or(IndexError::DocumentNotFound(doc_id))?; + + let document = obkv_to_json(&fields_to_display, &fields_ids_map, document)?; + + Ok(document) + } + + pub fn size(&self) -> u64 { + self.env.size() + } + + fn fields_to_display>( + &self, + txn: &heed::RoTxn, + attributes_to_retrieve: &Option>, + fields_ids_map: &milli::FieldsIdsMap, + ) -> Result> { + let mut displayed_fields_ids = match self.displayed_fields_ids(txn)? { + Some(ids) => ids.into_iter().collect::>(), + None => fields_ids_map.iter().map(|(id, _)| id).collect(), + }; + + let attributes_to_retrieve_ids = match attributes_to_retrieve { + Some(attrs) => attrs + .iter() + .filter_map(|f| fields_ids_map.id(f.as_ref())) + .collect::>(), + None => fields_ids_map.iter().map(|(id, _)| id).collect(), + }; + + displayed_fields_ids.retain(|fid| attributes_to_retrieve_ids.contains(fid)); + Ok(displayed_fields_ids) + } + + pub fn snapshot(&self, path: impl AsRef) -> Result<()> { + let mut dst = path.as_ref().join(format!("indexes/{}/", self.uuid)); + create_dir_all(&dst)?; + dst.push("data.mdb"); + let _txn = self.write_txn()?; + self.inner + .env + .copy_to_path(dst, heed::CompactionOption::Enabled)?; + Ok(()) + } +} + diff --git a/meilisearch-lib/src/index_controller/index_resolver/mod.rs b/meilisearch-lib/src/index_controller/index_resolver/mod.rs index 008d0d219..e68bd46f8 100644 --- a/meilisearch-lib/src/index_controller/index_resolver/mod.rs +++ b/meilisearch-lib/src/index_controller/index_resolver/mod.rs @@ -1,5 +1,5 @@ pub mod error; -mod index_store; +pub mod index_store; pub mod uuid_store; use std::path::Path; diff --git a/meilisearch-lib/src/index_controller/snapshot.rs b/meilisearch-lib/src/index_controller/snapshot.rs index 694360299..1394957f7 100644 --- a/meilisearch-lib/src/index_controller/snapshot.rs +++ b/meilisearch-lib/src/index_controller/snapshot.rs @@ -11,20 +11,26 @@ use tokio::time::sleep; use crate::compression::from_tar_gz; use crate::index_controller::updates::UpdateMsg; -use super::index_resolver::HardStateIndexResolver; +use super::index_resolver::IndexResolver; +use super::index_resolver::index_store::IndexStore; +use super::index_resolver::uuid_store::UuidStore; use super::updates::UpdateSender; -pub struct SnapshotService { - index_resolver: Arc, +pub struct SnapshotService { + index_resolver: Arc>, update_sender: UpdateSender, snapshot_period: Duration, snapshot_path: PathBuf, db_name: String, } -impl SnapshotService { +impl SnapshotService + where + U: UuidStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, +{ pub fn new( - index_resolver: Arc, + index_resolver: Arc>, update_sender: UpdateSender, snapshot_period: Duration, snapshot_path: PathBuf, @@ -127,131 +133,161 @@ pub fn load_snapshot( #[cfg(test)] mod test { - //use std::iter::FromIterator; - //use std::{collections::HashSet, sync::Arc}; + use std::{collections::HashSet, sync::Arc}; - //use futures::future::{err, ok}; - //use rand::Rng; - //use tokio::time::timeout; - //use uuid::Uuid; + use futures::future::{err, ok}; + use once_cell::sync::Lazy; + use rand::Rng; + use uuid::Uuid; - //use super::*; + use crate::index::error::IndexError; + use crate::index::test::Mocker; + use crate::index::{Index, error::Result as IndexResult}; + use crate::index_controller::index_resolver::IndexResolver; + use crate::index_controller::index_resolver::error::IndexResolverError; + use crate::index_controller::index_resolver::uuid_store::MockUuidStore; + use crate::index_controller::index_resolver::index_store::MockIndexStore; + use crate::index_controller::updates::create_update_handler; - //#[actix_rt::test] - //async fn test_normal() { - //let mut rng = rand::thread_rng(); - //let uuids_num: usize = rng.gen_range(5..10); - //let uuids = (0..uuids_num) - //.map(|_| Uuid::new_v4()) - //.collect::>(); + use super::*; - //let mut uuid_resolver = MockUuidResolverHandle::new(); - //let uuids_clone = uuids.clone(); - //uuid_resolver - //.expect_snapshot() - //.times(1) - //.returning(move |_| Box::pin(ok(uuids_clone.clone()))); + fn setup() { + static SETUP: Lazy<()> = Lazy::new(|| { + if cfg!(windows) { + std::env::set_var("TMP", "."); + } else { + std::env::set_var("TMPDIR", "."); + } + }); - //let uuids_clone = uuids.clone(); - //let mut index_handle = MockIndexActorHandle::new(); - //index_handle - //.expect_snapshot() - //.withf(move |uuid, _path| uuids_clone.contains(uuid)) - //.times(uuids_num) - //.returning(move |_, _| Box::pin(ok(()))); + // just deref to make sure the env is setup + *SETUP + } - //let dir = tempfile::tempdir_in(".").unwrap(); - //let handle = Arc::new(index_handle); - //let update_handle = - //UpdateActorHandleImpl::>::new(handle.clone(), dir.path(), 4096 * 100).unwrap(); + #[actix_rt::test] + async fn test_normal() { + setup(); - //let snapshot_path = tempfile::tempdir_in(".").unwrap(); - //let snapshot_service = SnapshotService::new( - //uuid_resolver, - //update_handle, - //Duration::from_millis(100), - //snapshot_path.path().to_owned(), - //"data.ms".to_string(), - //); + let mut rng = rand::thread_rng(); + let uuids_num: usize = rng.gen_range(5..10); + let uuids = (0..uuids_num) + .map(|_| Uuid::new_v4()) + .collect::>(); - //snapshot_service.perform_snapshot().await.unwrap(); - //} + let mut uuid_store = MockUuidStore::new(); + let uuids_clone = uuids.clone(); + uuid_store + .expect_snapshot() + .times(1) + .returning(move |_| Box::pin(ok(uuids_clone.clone()))); - //#[actix_rt::test] - //async fn error_performing_uuid_snapshot() { - //let mut uuid_resolver = MockUuidResolverHandle::new(); - //uuid_resolver - //.expect_snapshot() - //.times(1) - ////abitrary error - //.returning(|_| Box::pin(err(UuidResolverError::NameAlreadyExist))); + let mut indexes = uuids.clone().into_iter().map(|uuid| { + let mocker = Mocker::default(); + mocker.when("snapshot").times(1).then(|_: &Path| -> IndexResult<()> { Ok(()) }); + mocker.when("uuid").then(move |_: ()| uuid); + Index::faux(mocker) + }); - //let update_handle = MockUpdateActorHandle::new(); + let uuids_clone = uuids.clone(); + let mut index_store = MockIndexStore::new(); + index_store + .expect_get() + .withf(move |uuid| uuids_clone.contains(uuid)) + .times(uuids_num) + .returning(move |_| Box::pin(ok(Some(indexes.next().unwrap())))); - //let snapshot_path = tempfile::tempdir_in(".").unwrap(); - //let snapshot_service = SnapshotService::new( - //uuid_resolver, - //update_handle, - //Duration::from_millis(100), - //snapshot_path.path().to_owned(), - //"data.ms".to_string(), - //); + let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store)); - //assert!(snapshot_service.perform_snapshot().await.is_err()); - ////Nothing was written to the file - //assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); - //} + let dir = tempfile::tempdir().unwrap(); + let update_sender = create_update_handler(index_resolver.clone(), dir.path(), 4096 * 100).unwrap(); - //#[actix_rt::test] - //async fn error_performing_index_snapshot() { - //let uuid = Uuid::new_v4(); - //let mut uuid_resolver = MockUuidResolverHandle::new(); - //uuid_resolver - //.expect_snapshot() - //.times(1) - //.returning(move |_| Box::pin(ok(HashSet::from_iter(Some(uuid))))); + let snapshot_path = tempfile::tempdir().unwrap(); + let snapshot_service = SnapshotService::new( + index_resolver, + update_sender, + Duration::from_millis(100), + snapshot_path.path().to_owned(), + "data.ms".to_string(), + ); - //let mut update_handle = MockUpdateActorHandle::new(); - //update_handle - //.expect_snapshot() - ////abitrary error - //.returning(|_, _| Box::pin(err(UpdateActorError::UnexistingUpdate(0)))); + snapshot_service.perform_snapshot().await.unwrap(); + } - //let snapshot_path = tempfile::tempdir_in(".").unwrap(); - //let snapshot_service = SnapshotService::new( - //uuid_resolver, - //update_handle, - //Duration::from_millis(100), - //snapshot_path.path().to_owned(), - //"data.ms".to_string(), - //); + #[actix_rt::test] + async fn error_performing_uuid_snapshot() { + setup(); - //assert!(snapshot_service.perform_snapshot().await.is_err()); - ////Nothing was written to the file - //assert!(!snapshot_path.path().join("data.ms.snapshot").exists()); - //} + let mut uuid_store = MockUuidStore::new(); + uuid_store + .expect_snapshot() + .once() + .returning(move |_| Box::pin(err(IndexResolverError::IndexAlreadyExists))); - //#[actix_rt::test] - //async fn test_loop() { - //let mut uuid_resolver = MockUuidResolverHandle::new(); - //uuid_resolver - //.expect_snapshot() - ////we expect the funtion to be called between 2 and 3 time in the given interval. - //.times(2..4) - ////abitrary error, to short-circuit the function - //.returning(move |_| Box::pin(err(UuidResolverError::NameAlreadyExist))); + let mut index_store = MockIndexStore::new(); + index_store + .expect_get() + .never(); - //let update_handle = MockUpdateActorHandle::new(); + let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store)); - //let snapshot_path = tempfile::tempdir_in(".").unwrap(); - //let snapshot_service = SnapshotService::new( - //uuid_resolver, - //update_handle, - //Duration::from_millis(100), - //snapshot_path.path().to_owned(), - //"data.ms".to_string(), - //); + let dir = tempfile::tempdir().unwrap(); + let update_sender = create_update_handler(index_resolver.clone(), dir.path(), 4096 * 100).unwrap(); - //let _ = timeout(Duration::from_millis(300), snapshot_service.run()).await; - //} + let snapshot_path = tempfile::tempdir().unwrap(); + let snapshot_service = SnapshotService::new( + index_resolver, + update_sender, + Duration::from_millis(100), + snapshot_path.path().to_owned(), + "data.ms".to_string(), + ); + + assert!(snapshot_service.perform_snapshot().await.is_err()); + } + + #[actix_rt::test] + async fn error_performing_index_snapshot() { + setup(); + + let uuids: HashSet = vec![Uuid::new_v4()].into_iter().collect(); + + let mut uuid_store = MockUuidStore::new(); + let uuids_clone = uuids.clone(); + uuid_store + .expect_snapshot() + .once() + .returning(move |_| Box::pin(ok(uuids_clone.clone()))); + + let mut indexes = uuids.clone().into_iter().map(|uuid| { + let mocker = Mocker::default(); + // index returns random error + mocker.when("snapshot").then(|_: &Path| -> IndexResult<()> { Err(IndexError::ExistingPrimaryKey) }); + mocker.when("uuid").then(move |_: ()| uuid); + Index::faux(mocker) + }); + + let uuids_clone = uuids.clone(); + let mut index_store = MockIndexStore::new(); + index_store + .expect_get() + .withf(move |uuid| uuids_clone.contains(uuid)) + .once() + .returning(move |_| Box::pin(ok(Some(indexes.next().unwrap())))); + + let index_resolver = Arc::new(IndexResolver::new(uuid_store, index_store)); + + let dir = tempfile::tempdir().unwrap(); + let update_sender = create_update_handler(index_resolver.clone(), dir.path(), 4096 * 100).unwrap(); + + let snapshot_path = tempfile::tempdir().unwrap(); + let snapshot_service = SnapshotService::new( + index_resolver, + update_sender, + Duration::from_millis(100), + snapshot_path.path().to_owned(), + "data.ms".to_string(), + ); + + assert!(snapshot_service.perform_snapshot().await.is_err()); + } } diff --git a/meilisearch-lib/src/index_controller/updates/error.rs b/meilisearch-lib/src/index_controller/updates/error.rs index eb539963e..097f564ab 100644 --- a/meilisearch-lib/src/index_controller/updates/error.rs +++ b/meilisearch-lib/src/index_controller/updates/error.rs @@ -3,10 +3,7 @@ use std::fmt; use meilisearch_error::{Code, ErrorCode}; -use crate::{ - document_formats::DocumentFormatError, - index_controller::{update_file_store::UpdateFileStoreError, DocumentAdditionFormat}, -}; +use crate::{document_formats::DocumentFormatError, index::error::IndexError, index_controller::{update_file_store::UpdateFileStoreError, DocumentAdditionFormat}}; pub type Result = std::result::Result; @@ -28,6 +25,8 @@ pub enum UpdateLoopError { PayloadError(#[from] actix_web::error::PayloadError), #[error("A {0} payload is missing.")] MissingPayload(DocumentAdditionFormat), + #[error("{0}")] + IndexError(#[from] IndexError), } impl From> for UpdateLoopError @@ -58,7 +57,6 @@ impl ErrorCode for UpdateLoopError { match self { Self::UnexistingUpdate(_) => Code::NotFound, Self::Internal(_) => Code::Internal, - //Self::IndexActor(e) => e.error_code(), Self::FatalUpdateStoreError => Code::Internal, Self::DocumentFormatError(error) => error.error_code(), Self::PayloadError(error) => match error { @@ -66,6 +64,7 @@ impl ErrorCode for UpdateLoopError { _ => Code::Internal, }, Self::MissingPayload(_) => Code::MissingPayload, + Self::IndexError(e) => e.error_code(), } } } diff --git a/meilisearch-lib/src/index_controller/updates/mod.rs b/meilisearch-lib/src/index_controller/updates/mod.rs index 037cf96b0..20d291c64 100644 --- a/meilisearch-lib/src/index_controller/updates/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/mod.rs @@ -26,16 +26,22 @@ use crate::index::{Index, Settings, Unchecked}; use crate::index_controller::update_file_store::UpdateFileStore; use status::UpdateStatus; -use super::index_resolver::HardStateIndexResolver; +use super::index_resolver::index_store::IndexStore; +use super::index_resolver::uuid_store::UuidStore; +use super::index_resolver::IndexResolver; use super::{DocumentAdditionFormat, Update}; pub type UpdateSender = mpsc::Sender; -pub fn create_update_handler( - index_resolver: Arc, +pub fn create_update_handler( + index_resolver: Arc>, db_path: impl AsRef, update_store_size: usize, -) -> anyhow::Result { +) -> anyhow::Result + where + U: UuidStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, +{ let path = db_path.as_ref().to_owned(); let (sender, receiver) = mpsc::channel(100); let actor = UpdateLoop::new(update_store_size, receiver, path, index_resolver)?; @@ -95,12 +101,16 @@ pub struct UpdateLoop { } impl UpdateLoop { - pub fn new( + pub fn new( update_db_size: usize, inbox: mpsc::Receiver, path: impl AsRef, - index_resolver: Arc, - ) -> anyhow::Result { + index_resolver: Arc>, + ) -> anyhow::Result + where + U: UuidStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, + { let path = path.as_ref().to_owned(); std::fs::create_dir_all(&path)?; diff --git a/meilisearch-lib/src/index_controller/updates/store/mod.rs b/meilisearch-lib/src/index_controller/updates/store/mod.rs index 0dd714a0e..74f20517a 100644 --- a/meilisearch-lib/src/index_controller/updates/store/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/store/mod.rs @@ -29,6 +29,8 @@ use codec::*; use super::error::Result; use super::status::{Enqueued, Processing}; use crate::index::Index; +use crate::index_controller::index_resolver::index_store::IndexStore; +use crate::index_controller::index_resolver::uuid_store::UuidStore; use crate::index_controller::updates::*; use crate::EnvSizer; @@ -157,13 +159,17 @@ impl UpdateStore { )) } - pub fn open( + pub fn open( options: EnvOpenOptions, path: impl AsRef, - index_resolver: Arc, + index_resolver: Arc>, must_exit: Arc, update_file_store: UpdateFileStore, - ) -> anyhow::Result> { + ) -> anyhow::Result> + where + U: UuidStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, + { let (update_store, mut notification_receiver) = Self::new(options, path, update_file_store)?; let update_store = Arc::new(update_store); @@ -296,10 +302,14 @@ impl UpdateStore { /// Executes the user provided function on the next pending update (the one with the lowest id). /// This is asynchronous as it let the user process the update with a read-only txn and /// only writing the result meta to the processed-meta store *after* it has been processed. - fn process_pending_update( + fn process_pending_update( &self, - index_resolver: Arc, - ) -> Result> { + index_resolver: Arc>, + ) -> Result> + where + U: UuidStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, + { // Create a read transaction to be able to retrieve the pending update in order. let rtxn = self.env.read_txn()?; let first_meta = self.pending_queue.first(&rtxn)?; @@ -325,13 +335,17 @@ impl UpdateStore { } } - fn perform_update( + fn perform_update( &self, processing: Processing, - index_resolver: Arc, + index_resolver: Arc>, index_uuid: Uuid, global_id: u64, - ) -> Result> { + ) -> Result> + where + U: UuidStore + Sync + Send + 'static, + I: IndexStore + Sync + Send + 'static, + { // Process the pending update using the provided user function. let handle = Handle::current(); let update_id = processing.id(); @@ -519,8 +533,7 @@ impl UpdateStore { } = pending.decode()? { self.update_file_store - .snapshot(content_uuid, &path) - .unwrap(); + .snapshot(content_uuid, &path)?; } } } @@ -528,8 +541,7 @@ impl UpdateStore { let path = path.as_ref().to_owned(); indexes .par_iter() - .try_for_each(|index| index.snapshot(&path)) - .unwrap(); + .try_for_each(|index| index.snapshot(&path))?; Ok(()) }