diff --git a/meilisearch-lib/src/index/error.rs b/meilisearch-lib/src/index/error.rs index cfae11a1f..5899b9356 100644 --- a/meilisearch-lib/src/index/error.rs +++ b/meilisearch-lib/src/index/error.rs @@ -17,6 +17,8 @@ pub enum IndexError { Facet(#[from] FacetError), #[error("{0}")] Milli(#[from] milli::Error), + #[error("A primary key is already present. It's impossible to update it")] + ExistingPrimaryKey, } internal_error!( @@ -33,6 +35,7 @@ impl ErrorCode for IndexError { IndexError::DocumentNotFound(_) => Code::DocumentNotFound, IndexError::Facet(e) => e.error_code(), IndexError::Milli(e) => MilliError(e).error_code(), + IndexError::ExistingPrimaryKey => Code::PrimaryKeyAlreadyPresent, } } } diff --git a/meilisearch-lib/src/index/mod.rs b/meilisearch-lib/src/index/mod.rs index c05e337e2..911a22464 100644 --- a/meilisearch-lib/src/index/mod.rs +++ b/meilisearch-lib/src/index/mod.rs @@ -5,19 +5,23 @@ use std::ops::Deref; use std::path::Path; use std::sync::Arc; +use chrono::{DateTime, Utc}; use heed::{EnvOpenOptions, RoTxn}; use milli::update::Setting; -use milli::{obkv_to_json, FieldId}; +use milli::{FieldDistribution, FieldId, obkv_to_json}; use serde_json::{Map, Value}; +use serde::{Serialize, Deserialize}; use error::Result; pub use search::{default_crop_length, SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT}; pub use updates::{Checked, Facets, Settings, Unchecked}; +use uuid::Uuid; use crate::EnvSizer; use crate::index_controller::update_file_store::UpdateFileStore; use self::error::IndexError; +use self::update_handler::UpdateHandler; pub mod error; pub mod update_handler; @@ -28,10 +32,51 @@ mod updates; pub type Document = Map; +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct IndexMeta { + created_at: DateTime, + pub updated_at: DateTime, + pub primary_key: Option, +} + +#[derive(Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct IndexStats { + #[serde(skip)] + pub size: u64, + pub number_of_documents: u64, + /// Whether the current index is performing an update. It is initially `None` when the + /// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is + /// later set to either true or false, we we retrieve the information from the `UpdateStore` + pub is_indexing: Option, + pub field_distribution: FieldDistribution, +} + +impl IndexMeta { + pub fn new(index: &Index) -> Result { + let txn = index.read_txn()?; + Self::new_txn(index, &txn) + } + + fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result { + let created_at = index.created_at(txn)?; + let updated_at = index.updated_at(txn)?; + let primary_key = index.primary_key(txn)?.map(String::from); + Ok(Self { + created_at, + updated_at, + primary_key, + }) + } +} + #[derive(Clone)] pub struct Index { + pub uuid: Uuid, pub inner: Arc, update_file_store: Arc, + update_handler: Arc, } impl Deref for Index { @@ -43,14 +88,28 @@ impl Deref for Index { } impl Index { - pub fn open(path: impl AsRef, size: usize, update_file_store: Arc) -> Result { + pub fn open(path: impl AsRef, size: usize, update_file_store: Arc, uuid: Uuid, update_handler: Arc) -> Result { create_dir_all(&path)?; let mut options = EnvOpenOptions::new(); options.map_size(size); let inner = Arc::new(milli::Index::new(options, &path)?); - Ok(Index { inner, update_file_store }) + Ok(Index { inner, update_file_store, uuid, update_handler }) } + pub fn stats(&self) -> Result { + let rtxn = self.read_txn()?; + + Ok(IndexStats { + size: self.size(), + number_of_documents: self.number_of_documents(&rtxn)?, + is_indexing: None, + field_distribution: self.field_distribution(&rtxn)?, + }) + } + + pub fn meta(&self) -> Result { + IndexMeta::new(self) + } pub fn settings(&self) -> Result> { let txn = self.read_txn()?; self.settings_txn(&txn) diff --git a/meilisearch-lib/src/index/update_handler.rs b/meilisearch-lib/src/index/update_handler.rs index 95ae2f556..6969b4567 100644 --- a/meilisearch-lib/src/index/update_handler.rs +++ b/meilisearch-lib/src/index/update_handler.rs @@ -52,7 +52,7 @@ impl UpdateHandler { pub fn handle_update( &self, - index: Index, + index: &Index, meta: Processing, ) -> Result { let update_id = meta.id(); diff --git a/meilisearch-lib/src/index/updates.rs b/meilisearch-lib/src/index/updates.rs index c83862f9b..20cf6b2ec 100644 --- a/meilisearch-lib/src/index/updates.rs +++ b/meilisearch-lib/src/index/updates.rs @@ -8,10 +8,10 @@ use milli::update::{IndexDocumentsMethod, Setting, UpdateBuilder}; use serde::{Deserialize, Serialize, Serializer}; use uuid::Uuid; -use crate::index_controller::updates::status::UpdateResult; +use crate::index_controller::updates::status::{Failed, Processed, Processing, UpdateResult}; -use super::Index; -use super::error::Result; +use super::{Index, IndexMeta}; +use super::error::{IndexError, Result}; fn serialize_with_wildcard( field: &Setting>, @@ -163,6 +163,31 @@ pub struct Facets { } impl Index { + pub fn handle_update(&self, update: Processing) -> std::result::Result { + self.update_handler.handle_update(self, update) + } + + pub fn update_primary_key(&self, primary_key: Option) -> Result { + match primary_key { + Some(primary_key) => { + let mut txn = self.write_txn()?; + if self.primary_key(&txn)?.is_some() { + return Err(IndexError::ExistingPrimaryKey); + } + let mut builder = UpdateBuilder::new(0).settings(&mut txn, self); + builder.set_primary_key(primary_key); + builder.execute(|_, _| ())?; + let meta = IndexMeta::new_txn(self, &txn)?; + txn.commit()?; + Ok(meta) + } + None => { + let meta = IndexMeta::new(self)?; + Ok(meta) + } + } + } + pub fn update_documents( &self, method: IndexDocumentsMethod, diff --git a/meilisearch-lib/src/index_controller/dump_actor/actor.rs b/meilisearch-lib/src/index_controller/dump_actor/actor.rs index 881f3e5b8..bfde3896c 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/actor.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/actor.rs @@ -10,14 +10,14 @@ use tokio::sync::{mpsc, oneshot, RwLock}; use super::error::{DumpActorError, Result}; use super::{DumpInfo, DumpMsg, DumpStatus, DumpTask}; -use crate::index_controller::uuid_resolver::UuidResolverSender; +use crate::index_controller::index_resolver::HardStateIndexResolver; use crate::index_controller::updates::UpdateSender; pub const CONCURRENT_DUMP_MSG: usize = 10; pub struct DumpActor { inbox: Option>, - uuid_resolver: UuidResolverSender, + index_resolver: Arc, update: UpdateSender, dump_path: PathBuf, lock: Arc>, @@ -34,7 +34,7 @@ fn generate_uid() -> String { impl DumpActor { pub fn new( inbox: mpsc::Receiver, - uuid_resolver: UuidResolverSender, + index_resolver: Arc, update: UpdateSender, dump_path: impl AsRef, index_db_size: usize, @@ -44,7 +44,7 @@ impl DumpActor { let lock = Arc::new(Mutex::new(())); Self { inbox: Some(inbox), - uuid_resolver, + index_resolver, update, dump_path: dump_path.as_ref().into(), dump_infos, @@ -113,7 +113,7 @@ impl DumpActor { let task = DumpTask { path: self.dump_path.clone(), - uuid_resolver: self.uuid_resolver.clone(), + index_resolver: self.index_resolver.clone(), update_handle: self.update.clone(), uid: uid.clone(), update_db_size: self.update_db_size, diff --git a/meilisearch-lib/src/index_controller/dump_actor/error.rs b/meilisearch-lib/src/index_controller/dump_actor/error.rs index eb6f08c00..9831f3931 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/error.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/error.rs @@ -1,7 +1,7 @@ use meilisearch_error::{Code, ErrorCode}; -use crate::index_controller::updates::error::UpdateActorError; -use crate::index_controller::uuid_resolver::error::UuidResolverError; +use crate::index_controller::index_resolver::error::IndexResolverError; +use crate::index_controller::updates::error::UpdateLoopError; pub type Result = std::result::Result; @@ -14,9 +14,9 @@ pub enum DumpActorError { #[error("Internal error: {0}")] Internal(Box), #[error("{0}")] - UuidResolver(#[from] UuidResolverError), + IndexResolver(#[from] IndexResolverError), #[error("{0}")] - UpdateActor(#[from] UpdateActorError), + UpdateLoop(#[from] UpdateLoopError), } macro_rules! internal_error { @@ -45,8 +45,8 @@ impl ErrorCode for DumpActorError { DumpActorError::DumpAlreadyRunning => Code::DumpAlreadyInProgress, DumpActorError::DumpDoesNotExist(_) => Code::NotFound, DumpActorError::Internal(_) => Code::Internal, - DumpActorError::UuidResolver(e) => e.error_code(), - DumpActorError::UpdateActor(e) => e.error_code(), + DumpActorError::IndexResolver(e) => e.error_code(), + DumpActorError::UpdateLoop(e) => e.error_code(), } } } diff --git a/meilisearch-lib/src/index_controller/dump_actor/handle_impl.rs b/meilisearch-lib/src/index_controller/dump_actor/handle_impl.rs index a629ff753..5acee2f81 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/handle_impl.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/handle_impl.rs @@ -1,8 +1,9 @@ use std::path::Path; +use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; -use crate::index_controller::uuid_resolver::UuidResolverSender; +use crate::index_controller::index_resolver::HardStateIndexResolver; use super::error::Result; use super::{DumpActor, DumpActorHandle, DumpInfo, DumpMsg}; @@ -32,7 +33,7 @@ impl DumpActorHandle for DumpActorHandleImpl { impl DumpActorHandleImpl { pub fn new( path: impl AsRef, - uuid_resolver: UuidResolverSender, + index_resolver: Arc, update: crate::index_controller::updates::UpdateSender, index_db_size: usize, update_db_size: usize, @@ -40,7 +41,7 @@ impl DumpActorHandleImpl { let (sender, receiver) = mpsc::channel(10); let actor = DumpActor::new( receiver, - uuid_resolver, + index_resolver, update, path, index_db_size, diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs b/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs index b489b2107..1add2709b 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/loaders/v1.rs @@ -7,7 +7,7 @@ use milli::update::Setting; use serde::{Deserialize, Deserializer, Serialize}; use uuid::Uuid; -use crate::index_controller::uuid_resolver::store::HeedUuidStore; +use crate::index_controller::index_resolver::uuid_store::HeedUuidStore; use crate::index_controller::{self, IndexMetadata}; use crate::index_controller::{asc_ranking_rule, desc_ranking_rule}; use crate::{ diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs b/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs index c50e8a722..94b7321ae 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs @@ -5,8 +5,8 @@ use log::info; use serde::{Deserialize, Serialize}; use crate::index::Index; +use crate::index_controller::index_resolver::uuid_store::HeedUuidStore; use crate::index_controller::updates::store::UpdateStore; -use crate::index_controller::{uuid_resolver::store::HeedUuidStore}; use crate::options::IndexerOpts; #[derive(Serialize, Deserialize, Debug)] diff --git a/meilisearch-lib/src/index_controller/dump_actor/mod.rs b/meilisearch-lib/src/index_controller/dump_actor/mod.rs index 7db682e98..b7c61f568 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/mod.rs @@ -1,5 +1,6 @@ use std::fs::File; use std::path::{Path, PathBuf}; +use std::sync::Arc; use anyhow::Context; use chrono::{DateTime, Utc}; @@ -16,11 +17,10 @@ pub use actor::DumpActor; pub use handle_impl::*; pub use message::DumpMsg; +use super::index_resolver::HardStateIndexResolver; use super::updates::UpdateSender; -use super::uuid_resolver::UuidResolverSender; use crate::index_controller::dump_actor::error::DumpActorError; use crate::index_controller::updates::UpdateMsg; -use crate::index_controller::uuid_resolver::UuidResolverMsg; use crate::options::IndexerOpts; use error::Result; @@ -154,7 +154,7 @@ pub fn load_dump( struct DumpTask { path: PathBuf, - uuid_resolver: UuidResolverSender, + index_resolver: Arc, update_handle: UpdateSender, uid: String, update_db_size: usize, @@ -177,9 +177,9 @@ impl DumpTask { let mut meta_file = File::create(&meta_path)?; serde_json::to_writer(&mut meta_file, &meta)?; - let uuids = UuidResolverMsg::dump(&self.uuid_resolver, temp_dump_path.clone()).await?; + let uuids = self.index_resolver.dump(temp_dump_path.clone()).await?; - UpdateMsg::dump(&self.update_handle, uuids, temp_dump_path.clone()).await?; + UpdateMsg::dump(&self.update_handle, uuids.into_iter().collect(), temp_dump_path.clone()).await?; let dump_path = tokio::task::spawn_blocking(move || -> Result { let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?; diff --git a/meilisearch-lib/src/index_controller/error.rs b/meilisearch-lib/src/index_controller/error.rs index 8c60e9103..417bda01b 100644 --- a/meilisearch-lib/src/index_controller/error.rs +++ b/meilisearch-lib/src/index_controller/error.rs @@ -2,13 +2,13 @@ use std::error::Error; use meilisearch_error::Code; use meilisearch_error::ErrorCode; +use tokio::task::JoinError; use crate::index::error::IndexError; use super::dump_actor::error::DumpActorError; -use super::indexes::error::IndexActorError; -use super::updates::error::UpdateActorError; -use super::uuid_resolver::error::UuidResolverError; +use super::index_resolver::error::IndexResolverError; +use super::updates::error::UpdateLoopError; pub type Result = std::result::Result; @@ -17,11 +17,9 @@ pub enum IndexControllerError { #[error("Index creation must have an uid")] MissingUid, #[error("{0}")] - Uuid(#[from] UuidResolverError), + IndexResolver(#[from] IndexResolverError), #[error("{0}")] - IndexActor(#[from] IndexActorError), - #[error("{0}")] - UpdateActor(#[from] UpdateActorError), + UpdateLoop(#[from] UpdateLoopError), #[error("{0}")] DumpActor(#[from] DumpActorError), #[error("{0}")] @@ -30,13 +28,14 @@ pub enum IndexControllerError { Internal(Box), } +internal_error!(IndexControllerError: JoinError); + impl ErrorCode for IndexControllerError { fn error_code(&self) -> Code { match self { IndexControllerError::MissingUid => Code::BadRequest, - IndexControllerError::Uuid(e) => e.error_code(), - IndexControllerError::IndexActor(e) => e.error_code(), - IndexControllerError::UpdateActor(e) => e.error_code(), + IndexControllerError::IndexResolver(e) => e.error_code(), + IndexControllerError::UpdateLoop(e) => e.error_code(), IndexControllerError::DumpActor(e) => e.error_code(), IndexControllerError::IndexError(e) => e.error_code(), IndexControllerError::Internal(_) => Code::Internal, diff --git a/meilisearch-lib/src/index_controller/index_resolver/error.rs b/meilisearch-lib/src/index_controller/index_resolver/error.rs new file mode 100644 index 000000000..af61a99de --- /dev/null +++ b/meilisearch-lib/src/index_controller/index_resolver/error.rs @@ -0,0 +1,63 @@ +use std::fmt; + +use meilisearch_error::{Code, ErrorCode}; +use tokio::sync::mpsc::error::SendError as MpscSendError; +use tokio::sync::oneshot::error::RecvError as OneshotRecvError; + +use crate::{error::MilliError, index::error::IndexError}; + +pub type Result = std::result::Result; + +#[derive(thiserror::Error, Debug)] +pub enum IndexResolverError { + #[error("{0}")] + IndexError(#[from] IndexError), + #[error("Index already exists")] + IndexAlreadyExists, + #[error("Index {0} not found")] + UnexistingIndex(String), + #[error("A primary key is already present. It's impossible to update it")] + ExistingPrimaryKey, + #[error("Internal Error: {0}")] + Internal(Box), + #[error("{0}")] + Milli(#[from] milli::Error), + #[error("Index must have a valid uid; Index uid can be of type integer or string only composed of alphanumeric characters, hyphens (-) and underscores (_).")] + BadlyFormatted(String), +} + +impl From> for IndexResolverError +where T: Send + Sync + 'static + fmt::Debug +{ + fn from(other: tokio::sync::mpsc::error::SendError) -> Self { + Self::Internal(Box::new(other)) + } +} + +impl From for IndexResolverError { + fn from(other: tokio::sync::oneshot::error::RecvError) -> Self { + Self::Internal(Box::new(other)) + } +} + +internal_error!( + IndexResolverError: heed::Error, + uuid::Error, + std::io::Error, + tokio::task::JoinError, + serde_json::Error +); + +impl ErrorCode for IndexResolverError { + fn error_code(&self) -> Code { + match self { + IndexResolverError::IndexError(e) => e.error_code(), + IndexResolverError::IndexAlreadyExists => Code::IndexAlreadyExists, + IndexResolverError::UnexistingIndex(_) => Code::IndexNotFound, + IndexResolverError::ExistingPrimaryKey => Code::PrimaryKeyAlreadyPresent, + IndexResolverError::Internal(_) => Code::Internal, + IndexResolverError::Milli(e) => MilliError(e).error_code(), + IndexResolverError::BadlyFormatted(_) => Code::InvalidIndexUid, + } + } +} diff --git a/meilisearch-lib/src/index_controller/index_resolver/index_store.rs b/meilisearch-lib/src/index_controller/index_resolver/index_store.rs new file mode 100644 index 000000000..c038ceb20 --- /dev/null +++ b/meilisearch-lib/src/index_controller/index_resolver/index_store.rs @@ -0,0 +1,116 @@ +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use milli::update::UpdateBuilder; +use tokio::fs; +use tokio::sync::RwLock; +use tokio::task::spawn_blocking; +use uuid::Uuid; + +use super::error::{IndexResolverError, Result}; +use crate::index::Index; +use crate::index::update_handler::UpdateHandler; +use crate::index_controller::update_file_store::UpdateFileStore; +use crate::options::IndexerOpts; + +type AsyncMap = Arc>>; + +#[async_trait::async_trait] +pub trait IndexStore { + async fn create(&self, uuid: Uuid, primary_key: Option) -> Result; + async fn get(&self, uuid: Uuid) -> Result>; + async fn delete(&self, uuid: Uuid) -> Result>; +} + +pub struct MapIndexStore { + index_store: AsyncMap, + path: PathBuf, + index_size: usize, + update_file_store: Arc, + update_handler: Arc, +} + +impl MapIndexStore { + pub fn new(path: impl AsRef, index_size: usize, indexer_opts: &IndexerOpts) -> anyhow::Result { + let update_handler = Arc::new(UpdateHandler::new(indexer_opts)?); + let update_file_store = Arc::new(UpdateFileStore::new(path.as_ref()).unwrap()); + let path = path.as_ref().join("indexes/"); + let index_store = Arc::new(RwLock::new(HashMap::new())); + Ok(Self { + index_store, + path, + index_size, + update_file_store, + update_handler, + }) + } +} + +#[async_trait::async_trait] +impl IndexStore for MapIndexStore { + async fn create(&self, uuid: Uuid, primary_key: Option) -> Result { + // We need to keep the lock until we are sure the db file has been opened correclty, to + // ensure that another db is not created at the same time. + let mut lock = self.index_store.write().await; + + if let Some(index) = lock.get(&uuid) { + return Ok(index.clone()); + } + let path = self.path.join(format!("index-{}", uuid)); + if path.exists() { + return Err(IndexResolverError::IndexAlreadyExists); + } + + let index_size = self.index_size; + let file_store = self.update_file_store.clone(); + let update_handler = self.update_handler.clone(); + let index = spawn_blocking(move || -> Result { + let index = Index::open(path, index_size, file_store, uuid, update_handler)?; + if let Some(primary_key) = primary_key { + let mut txn = index.write_txn()?; + + let mut builder = UpdateBuilder::new(0).settings(&mut txn, &index); + builder.set_primary_key(primary_key); + builder.execute(|_, _| ())?; + + txn.commit()?; + } + Ok(index) + }) + .await??; + + lock.insert(uuid, index.clone()); + + Ok(index) + } + + async fn get(&self, uuid: Uuid) -> Result> { + let guard = self.index_store.read().await; + match guard.get(&uuid) { + Some(index) => Ok(Some(index.clone())), + None => { + // drop the guard here so we can perform the write after without deadlocking; + drop(guard); + let path = self.path.join(format!("index-{}", uuid)); + if !path.exists() { + return Ok(None); + } + + let index_size = self.index_size; + let file_store = self.update_file_store.clone(); + let update_handler = self.update_handler.clone(); + let index = spawn_blocking(move || Index::open(path, index_size, file_store, uuid, update_handler)).await??; + self.index_store.write().await.insert(uuid, index.clone()); + Ok(Some(index)) + } + } + } + + async fn delete(&self, uuid: Uuid) -> Result> { + let db_path = self.path.join(format!("index-{}", uuid)); + fs::remove_dir_all(db_path).await?; + let index = self.index_store.write().await.remove(&uuid); + Ok(index) + } +} diff --git a/meilisearch-lib/src/index_controller/index_resolver/message.rs b/meilisearch-lib/src/index_controller/index_resolver/message.rs new file mode 100644 index 000000000..25a0d64a9 --- /dev/null +++ b/meilisearch-lib/src/index_controller/index_resolver/message.rs @@ -0,0 +1,37 @@ +use std::{collections::HashSet, path::PathBuf}; + +use tokio::sync::oneshot; +use uuid::Uuid; + +use crate::index::Index; +use super::error::Result; + +pub enum IndexResolverMsg { + Get { + uid: String, + ret: oneshot::Sender>, + }, + Delete { + uid: String, + ret: oneshot::Sender>, + }, + List { + ret: oneshot::Sender>>, + }, + Insert { + uuid: Uuid, + name: String, + ret: oneshot::Sender>, + }, + SnapshotRequest { + path: PathBuf, + ret: oneshot::Sender>>, + }, + GetSize { + ret: oneshot::Sender>, + }, + DumpRequest { + path: PathBuf, + ret: oneshot::Sender>>, + }, +} diff --git a/meilisearch-lib/src/index_controller/index_resolver/mod.rs b/meilisearch-lib/src/index_controller/index_resolver/mod.rs new file mode 100644 index 000000000..eebb8ef95 --- /dev/null +++ b/meilisearch-lib/src/index_controller/index_resolver/mod.rs @@ -0,0 +1,117 @@ +pub mod uuid_store; +mod index_store; +//mod message; +pub mod error; + +use std::path::Path; + +use uuid::Uuid; +use uuid_store::{UuidStore, HeedUuidStore}; +use index_store::{IndexStore, MapIndexStore}; +use error::{Result, IndexResolverError}; + +use crate::{index::Index, options::IndexerOpts}; + +pub type HardStateIndexResolver = IndexResolver; + +pub fn create_index_resolver(path: impl AsRef, index_size: usize, indexer_opts: &IndexerOpts) -> anyhow::Result { + let uuid_store = HeedUuidStore::new(&path)?; + let index_store = MapIndexStore::new(&path, index_size, indexer_opts)?; + Ok(IndexResolver::new(uuid_store, index_store)) +} + +pub struct IndexResolver { + index_uuid_store: U, + index_store: I, +} + +impl IndexResolver +where U: UuidStore, + I: IndexStore, +{ + pub fn new( + index_uuid_store: U, + index_store: I, + ) -> Self { + Self { + index_uuid_store, + index_store, + } + } + + pub async fn dump(&self, _path: impl AsRef) -> Result> { + todo!() + } + + pub async fn get_size(&self) -> Result { + todo!() + } + + pub async fn perform_snapshot(&self, _path: impl AsRef) -> Result<()> { + todo!() + } + + pub async fn create_index(&self, uid: String, primary_key: Option) -> Result<(Uuid, Index)> { + let uuid = Uuid::new_v4(); + let index = self.index_store.create(uuid, primary_key).await?; + self.index_uuid_store.insert(uid, uuid).await?; + Ok((uuid, index)) + } + + pub async fn list(&self) -> Result> { + let uuids = self.index_uuid_store.list().await?; + let mut indexes = Vec::new(); + for (name, uuid) in uuids { + match self.index_store.get(uuid).await? { + Some(index) => { + indexes.push((name, index)) + }, + None => { + // we found an unexisting index, we remove it from the uuid store + let _ = self.index_uuid_store.delete(name).await; + }, + } + } + + Ok(indexes) + } + + pub async fn delete_index(&self, uid: String) -> Result<()> { + match self.index_uuid_store.delete(uid.clone()).await? { + Some(uuid) => { + let _ = self.index_store.delete(uuid).await; + Ok(()) + } + None => Err(IndexResolverError::UnexistingIndex(uid)), + } + } + + pub async fn get_index_by_uuid(&self, uuid: Uuid) -> Result { + // TODO: Handle this error better. + self.index_store.get(uuid).await?.ok_or(IndexResolverError::UnexistingIndex(String::new())) + } + + pub async fn get_index(&self, uid: String) -> Result { + match self.index_uuid_store.get_uuid(uid).await? { + (name, Some(uuid)) => { + match self.index_store.get(uuid).await? { + Some(index) => Ok(index), + None => { + // For some reason we got a uuid to an unexisting index, we return an error, + // and remove the uuid from th uuid store. + let _ = self.index_uuid_store.delete(name.clone()).await; + Err(IndexResolverError::UnexistingIndex(name)) + }, + } + } + (name, _) => Err(IndexResolverError::UnexistingIndex(name)) + } + } + + pub async fn get_uuid(&self, uid: String) -> Result { + match self.index_uuid_store.get_uuid(uid).await? { + (_, Some(uuid)) => Ok(uuid), + (name, _) => Err(IndexResolverError::UnexistingIndex(name)) + } + } +} diff --git a/meilisearch-lib/src/index_controller/index_resolver/uuid_store.rs b/meilisearch-lib/src/index_controller/index_resolver/uuid_store.rs new file mode 100644 index 000000000..7974bf4ae --- /dev/null +++ b/meilisearch-lib/src/index_controller/index_resolver/uuid_store.rs @@ -0,0 +1,226 @@ +use std::collections::HashSet; +use std::fs::{create_dir_all, File}; +use std::io::{BufRead, BufReader, Write}; +use std::path::{Path, PathBuf}; + +use heed::types::{ByteSlice, Str}; +use heed::{CompactionOption, Database, Env, EnvOpenOptions}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use super::error::{Result, IndexResolverError}; +use crate::EnvSizer; + +const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB + +#[derive(Serialize, Deserialize)] +struct DumpEntry { + uuid: Uuid, + uid: String, +} + +const UUIDS_DB_PATH: &str = "index_uuids"; + +#[async_trait::async_trait] +pub trait UuidStore: Sized { + // Create a new entry for `name`. Return an error if `err` and the entry already exists, return + // the uuid otherwise. + async fn get_uuid(&self, uid: String) -> Result<(String, Option)>; + async fn delete(&self, uid: String) -> Result>; + async fn list(&self) -> Result>; + async fn insert(&self, name: String, uuid: Uuid) -> Result<()>; + async fn snapshot(&self, path: PathBuf) -> Result>; + async fn get_size(&self) -> Result; + async fn dump(&self, path: PathBuf) -> Result>; +} + +#[derive(Clone)] +pub struct HeedUuidStore { + env: Env, + db: Database, +} + +impl HeedUuidStore { + pub fn new(path: impl AsRef) -> Result { + let path = path.as_ref().join(UUIDS_DB_PATH); + create_dir_all(&path)?; + let mut options = EnvOpenOptions::new(); + options.map_size(UUID_STORE_SIZE); // 1GB + let env = options.open(path)?; + let db = env.create_database(None)?; + Ok(Self { env, db }) + } + + pub fn get_uuid(&self, name: &str) -> Result> { + let env = self.env.clone(); + let db = self.db; + let txn = env.read_txn()?; + match db.get(&txn, name)? { + Some(uuid) => { + let uuid = Uuid::from_slice(uuid)?; + Ok(Some(uuid)) + } + None => Ok(None), + } + } + + pub fn delete(&self, uid: String) -> Result> { + let env = self.env.clone(); + let db = self.db; + let mut txn = env.write_txn()?; + match db.get(&txn, &uid)? { + Some(uuid) => { + let uuid = Uuid::from_slice(uuid)?; + db.delete(&mut txn, &uid)?; + txn.commit()?; + Ok(Some(uuid)) + } + None => Ok(None), + } + } + + pub fn list(&self) -> Result> { + let env = self.env.clone(); + let db = self.db; + let txn = env.read_txn()?; + let mut entries = Vec::new(); + for entry in db.iter(&txn)? { + let (name, uuid) = entry?; + let uuid = Uuid::from_slice(uuid)?; + entries.push((name.to_owned(), uuid)) + } + Ok(entries) + } + + pub fn insert(&self, name: String, uuid: Uuid) -> Result<()> { + let env = self.env.clone(); + let db = self.db; + let mut txn = env.write_txn()?; + + if db.get(&txn, &name)?.is_some() { + return Err(IndexResolverError::IndexAlreadyExists); + } + + db.put(&mut txn, &name, uuid.as_bytes())?; + txn.commit()?; + Ok(()) + } + + pub fn snapshot(&self, mut path: PathBuf) -> Result> { + let env = self.env.clone(); + let db = self.db; + // Write transaction to acquire a lock on the database. + let txn = env.write_txn()?; + let mut entries = HashSet::new(); + for entry in db.iter(&txn)? { + let (_, uuid) = entry?; + let uuid = Uuid::from_slice(uuid)?; + entries.insert(uuid); + } + + // only perform snapshot if there are indexes + if !entries.is_empty() { + path.push(UUIDS_DB_PATH); + create_dir_all(&path).unwrap(); + path.push("data.mdb"); + env.copy_to_path(path, CompactionOption::Enabled)?; + } + Ok(entries) + } + + pub fn get_size(&self) -> Result { + Ok(self.env.size()) + } + + pub fn dump(&self, path: PathBuf) -> Result> { + let dump_path = path.join(UUIDS_DB_PATH); + create_dir_all(&dump_path)?; + let dump_file_path = dump_path.join("data.jsonl"); + let mut dump_file = File::create(&dump_file_path)?; + let mut uuids = HashSet::new(); + + let txn = self.env.read_txn()?; + for entry in self.db.iter(&txn)? { + let (uid, uuid) = entry?; + let uid = uid.to_string(); + let uuid = Uuid::from_slice(uuid)?; + + let entry = DumpEntry { uuid, uid }; + serde_json::to_writer(&mut dump_file, &entry)?; + dump_file.write_all(b"\n").unwrap(); + + uuids.insert(uuid); + } + + Ok(uuids) + } + + pub fn load_dump(src: impl AsRef, dst: impl AsRef) -> Result<()> { + let uuid_resolver_path = dst.as_ref().join(UUIDS_DB_PATH); + std::fs::create_dir_all(&uuid_resolver_path)?; + + let src_indexes = src.as_ref().join(UUIDS_DB_PATH).join("data.jsonl"); + let indexes = File::open(&src_indexes)?; + let mut indexes = BufReader::new(indexes); + let mut line = String::new(); + + let db = Self::new(dst)?; + let mut txn = db.env.write_txn()?; + + loop { + match indexes.read_line(&mut line) { + Ok(0) => break, + Ok(_) => { + let DumpEntry { uuid, uid } = serde_json::from_str(&line)?; + println!("importing {} {}", uid, uuid); + db.db.put(&mut txn, &uid, uuid.as_bytes())?; + } + Err(e) => return Err(e.into()), + } + + line.clear(); + } + txn.commit()?; + + db.env.prepare_for_closing().wait(); + + Ok(()) + } +} + +#[async_trait::async_trait] +impl UuidStore for HeedUuidStore { + async fn get_uuid(&self, name: String) -> Result<(String, Option)> { + let this = self.clone(); + tokio::task::spawn_blocking(move || this.get_uuid(&name).map(|res| (name, res))).await? + } + + async fn delete(&self, uid: String) -> Result> { + let this = self.clone(); + tokio::task::spawn_blocking(move || this.delete(uid)).await? + } + + async fn list(&self) -> Result> { + let this = self.clone(); + tokio::task::spawn_blocking(move || this.list()).await? + } + + async fn insert(&self, name: String, uuid: Uuid) -> Result<()> { + let this = self.clone(); + tokio::task::spawn_blocking(move || this.insert(name, uuid)).await? + } + + async fn snapshot(&self, path: PathBuf) -> Result> { + let this = self.clone(); + tokio::task::spawn_blocking(move || this.snapshot(path)).await? + } + + async fn get_size(&self) -> Result { + self.get_size() + } + + async fn dump(&self, path: PathBuf) -> Result> { + let this = self.clone(); + tokio::task::spawn_blocking(move || this.dump(path)).await? + } +} diff --git a/meilisearch-lib/src/index_controller/indexes/mod.rs b/meilisearch-lib/src/index_controller/indexes/mod.rs index bac492364..48649cf40 100644 --- a/meilisearch-lib/src/index_controller/indexes/mod.rs +++ b/meilisearch-lib/src/index_controller/indexes/mod.rs @@ -42,7 +42,7 @@ pub fn create_indexes_handler( indexer_options: &IndexerOpts, ) -> anyhow::Result { let (sender, receiver) = mpsc::channel(100); - let store = MapIndexStore::new(&db_path, index_size); + let store = MapIndexStore::new(&db_path, index_size, indexer_options); let actor = IndexActor::new(receiver, store, indexer_options)?; tokio::task::spawn(actor.run()); @@ -59,7 +59,7 @@ pub struct IndexMeta { } impl IndexMeta { - fn new(index: &Index) -> Result { + pub fn new(index: &Index) -> Result { let txn = index.read_txn()?; Self::new_txn(index, &txn) } @@ -223,7 +223,7 @@ where None => self.store.create(uuid, None).await?, }; - Ok(spawn_blocking(move || update_handler.handle_update(index, meta)).await?) + Ok(spawn_blocking(move || update_handler.handle_update(&index, meta)).await?) } async fn handle_settings(&self, uuid: Uuid) -> Result> { diff --git a/meilisearch-lib/src/index_controller/indexes/store.rs b/meilisearch-lib/src/index_controller/indexes/store.rs index 252271d51..336ff6e0a 100644 --- a/meilisearch-lib/src/index_controller/indexes/store.rs +++ b/meilisearch-lib/src/index_controller/indexes/store.rs @@ -10,6 +10,7 @@ use uuid::Uuid; use super::error::{IndexActorError, Result}; use crate::index::Index; +use crate::index::update_handler::UpdateHandler; use crate::index_controller::update_file_store::UpdateFileStore; type AsyncMap = Arc>>; @@ -26,10 +27,11 @@ pub struct MapIndexStore { path: PathBuf, index_size: usize, update_file_store: Arc, + update_handler: Arc, } impl MapIndexStore { - pub fn new(path: impl AsRef, index_size: usize) -> Self { + pub fn new(path: impl AsRef, index_size: usize, update_handler: Arc) -> Self { let update_file_store = Arc::new(UpdateFileStore::new(path.as_ref()).unwrap()); let path = path.as_ref().join("indexes/"); let index_store = Arc::new(RwLock::new(HashMap::new())); @@ -38,6 +40,7 @@ impl MapIndexStore { path, index_size, update_file_store, + update_handler, } } } @@ -59,8 +62,9 @@ impl IndexStore for MapIndexStore { let index_size = self.index_size; let file_store = self.update_file_store.clone(); + let update_handler = self.update_handler.clone(); let index = spawn_blocking(move || -> Result { - let index = Index::open(path, index_size, file_store)?; + let index = Index::open(path, index_size, file_store, uuid, update_handler)?; if let Some(primary_key) = primary_key { let mut txn = index.write_txn()?; diff --git a/meilisearch-lib/src/index_controller/mod.rs b/meilisearch-lib/src/index_controller/mod.rs index bd3f4c07b..da108fe68 100644 --- a/meilisearch-lib/src/index_controller/mod.rs +++ b/meilisearch-lib/src/index_controller/mod.rs @@ -9,38 +9,52 @@ use chrono::{DateTime, Utc}; use futures::Stream; use log::info; use milli::update::IndexDocumentsMethod; -use milli::FieldDistribution; use serde::{Deserialize, Serialize}; +use tokio::task::spawn_blocking; use tokio::time::sleep; use uuid::Uuid; use dump_actor::DumpActorHandle; pub use dump_actor::{DumpInfo, DumpStatus}; use snapshot::load_snapshot; -use uuid_resolver::error::UuidResolverError; -use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; +use crate::index::{Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings}; +use crate::index_controller::index_resolver::create_index_resolver; use crate::options::IndexerOpts; use error::Result; +use crate::index::error::Result as IndexResult; use self::dump_actor::load_dump; -use self::indexes::IndexMsg; +use self::index_resolver::HardStateIndexResolver; +use self::index_resolver::error::IndexResolverError; use self::updates::status::UpdateStatus; use self::updates::UpdateMsg; -use self::uuid_resolver::UuidResolverMsg; mod dump_actor; pub mod error; -pub mod indexes; +//pub mod indexes; mod snapshot; pub mod update_file_store; pub mod updates; -mod uuid_resolver; +//mod uuid_resolver; +mod index_resolver; pub type Payload = Box< dyn Stream> + Send + Sync + 'static + Unpin, >; +macro_rules! time { + ($e:expr) => { + { + let now = std::time::Instant::now(); + let result = $e; + let elapsed = now.elapsed(); + println!("elapsed at line {}: {}ms ({}ns)", line!(), elapsed.as_millis(), elapsed.as_nanos()); + result + } + }; +} + #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct IndexMetadata { @@ -49,7 +63,7 @@ pub struct IndexMetadata { pub uid: String, name: String, #[serde(flatten)] - pub meta: indexes::IndexMeta, + pub meta: IndexMeta, } #[derive(Clone, Debug)] @@ -58,23 +72,9 @@ pub struct IndexSettings { pub primary_key: Option, } -#[derive(Serialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct IndexStats { - #[serde(skip)] - pub size: u64, - pub number_of_documents: u64, - /// Whether the current index is performing an update. It is initially `None` when the - /// index returns it, since it is the `UpdateStore` that knows what index is currently indexing. It is - /// later set to either true or false, we we retrieve the information from the `UpdateStore` - pub is_indexing: Option, - pub field_distribution: FieldDistribution, -} - #[derive(Clone)] pub struct IndexController { - uuid_resolver: uuid_resolver::UuidResolverSender, - index_handle: indexes::IndexHandlerSender, + index_resolver: Arc, update_handle: updates::UpdateSender, dump_handle: dump_actor::DumpActorHandleImpl, } @@ -149,17 +149,15 @@ impl IndexControllerBuilder { std::fs::create_dir_all(db_path.as_ref())?; - let uuid_resolver = uuid_resolver::create_uuid_resolver(&db_path)?; - let index_handle = indexes::create_indexes_handler(&db_path, index_size, &indexer_options)?; + let index_resolver = Arc::new(create_index_resolver(&db_path, index_size, &indexer_options)?); #[allow(unreachable_code)] - let update_handle = updates::create_update_handler(index_handle.clone(), &db_path, update_store_size)?; + let update_handle = updates::create_update_handler(index_resolver.clone(), &db_path, update_store_size)?; + let dump_path = self.dump_dst.ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?; let dump_handle = dump_actor::DumpActorHandleImpl::new( - &self - .dump_dst - .ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?, - uuid_resolver.clone(), + dump_path, + index_resolver.clone(), update_handle.clone(), index_size, update_store_size, @@ -182,8 +180,7 @@ impl IndexControllerBuilder { //} Ok(IndexController { - uuid_resolver, - index_handle, + index_resolver, update_handle, dump_handle, }) @@ -246,18 +243,15 @@ impl IndexController { } pub async fn register_update(&self, uid: &str, update: Update) -> Result { - let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid.to_string()).await; - match uuid { + match self.index_resolver.get_uuid(uid.to_string()).await { Ok(uuid) => { let update_result = UpdateMsg::update(&self.update_handle, uuid, update).await?; Ok(update_result) } - Err(UuidResolverError::UnexistingIndex(name)) => { - let uuid = Uuid::new_v4(); + Err(IndexResolverError::UnexistingIndex(name)) => { + let (uuid, _) = self.index_resolver.create_index(name, None).await?; let update_result = UpdateMsg::update(&self.update_handle, uuid, update).await?; // ignore if index creation fails now, since it may already have been created - let _ = IndexMsg::create_index(&self.index_handle, uuid, None).await?; - UuidResolverMsg::insert(&self.uuid_resolver, uuid, name).await?; Ok(update_result) } @@ -391,24 +385,24 @@ impl IndexController { //} pub async fn update_status(&self, uid: String, id: u64) -> Result { - let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; + let uuid = self.index_resolver.get_uuid(uid).await?; let result = UpdateMsg::get_update(&self.update_handle, uuid, id).await?; Ok(result) } pub async fn all_update_status(&self, uid: String) -> Result> { - let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; + let uuid = self.index_resolver.get_uuid(uid).await?; let result = UpdateMsg::list_updates(&self.update_handle, uuid).await?; Ok(result) } pub async fn list_indexes(&self) -> Result> { - let uuids = UuidResolverMsg::list(&self.uuid_resolver).await?; + let indexes = self.index_resolver.list().await?; let mut ret = Vec::new(); - for (uid, uuid) in uuids { - let meta = IndexMsg::index_meta(&self.index_handle, uuid).await?; + for (uid, index) in indexes { + let meta = index.meta()?; let meta = IndexMetadata { - uuid, + uuid: index.uuid, name: uid.clone(), uid, meta, @@ -420,8 +414,8 @@ impl IndexController { } pub async fn settings(&self, uid: String) -> Result> { - let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; - let settings = IndexMsg::settings(&self.index_handle, uuid).await?; + let index = self.index_resolver.get_index(uid).await?; + let settings = spawn_blocking(move || index.settings()).await??; Ok(settings) } @@ -432,15 +426,8 @@ impl IndexController { limit: usize, attributes_to_retrieve: Option>, ) -> Result> { - let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; - let documents = IndexMsg::documents( - &self.index_handle, - uuid, - offset, - limit, - attributes_to_retrieve, - ) - .await?; + let index = self.index_resolver.get_index(uid).await?; + let documents = spawn_blocking(move || index.retrieve_documents(offset, limit, attributes_to_retrieve)).await??; Ok(documents) } @@ -450,8 +437,8 @@ impl IndexController { doc_id: String, attributes_to_retrieve: Option>, ) -> Result { - let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; - let document = IndexMsg::document(&self.index_handle, uuid, attributes_to_retrieve, doc_id).await?; + let index = self.index_resolver.get_index(uid).await?; + let document = spawn_blocking(move || index.retrieve_document(doc_id, attributes_to_retrieve)).await??; Ok(document) } @@ -460,12 +447,12 @@ impl IndexController { uid: String, mut index_settings: IndexSettings, ) -> Result { - if index_settings.uid.is_some() { - index_settings.uid.take(); - } - let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid.clone()).await?; - let meta = IndexMsg::update_index(&self.index_handle, uuid, index_settings).await?; + index_settings.uid.take(); + + let index = self.index_resolver.get_index(uid.clone()).await?; + let uuid = index.uuid; + let meta = spawn_blocking(move || index.update_primary_key(index_settings.primary_key)).await??; let meta = IndexMetadata { uuid, name: uid.clone(), @@ -476,14 +463,15 @@ impl IndexController { } pub async fn search(&self, uid: String, query: SearchQuery) -> Result { - let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; - let result = IndexMsg::search(&self.index_handle, uuid, query).await?; + let index = time!(self.index_resolver.get_index(uid.clone()).await?); + let result = time!(spawn_blocking(move || time!(index.perform_search(query))).await??); Ok(result) } pub async fn get_index(&self, uid: String) -> Result { - let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid.clone()).await?; - let meta = IndexMsg::index_meta(&self.index_handle, uuid).await?; + let index = self.index_resolver.get_index(uid.clone()).await?; + let uuid = index.uuid; + let meta = spawn_blocking(move || index.meta()).await??; let meta = IndexMetadata { uuid, name: uid.clone(), @@ -494,15 +482,16 @@ impl IndexController { } pub async fn get_uuids_size(&self) -> Result { - let size = UuidResolverMsg::get_size(&self.uuid_resolver).await?; + let size = self.index_resolver.get_size().await?; Ok(size) } pub async fn get_index_stats(&self, uid: String) -> Result { - let uuid = UuidResolverMsg::get(&self.uuid_resolver, uid).await?; let update_infos = UpdateMsg::get_info(&self.update_handle).await?; - let mut stats = IndexMsg::index_stats(&self.index_handle, uuid).await?; - // Check if the currently indexing update is from out index. + let index = self.index_resolver.get_index(uid).await?; + let uuid = index.uuid; + let mut stats = spawn_blocking(move || index.stats()).await??; + // Check if the currently indexing update is from our index. stats.is_indexing = Some(Some(uuid) == update_infos.processing); Ok(stats) } @@ -513,17 +502,24 @@ impl IndexController { let mut last_update: Option> = None; let mut indexes = BTreeMap::new(); - for index in self.list_indexes().await? { - let mut index_stats = IndexMsg::index_stats(&self.index_handle, index.uuid).await?; - database_size += index_stats.size; + for (index_uid, index) in self.index_resolver.list().await? { + let uuid = index.uuid; + let (mut stats, meta) = spawn_blocking::<_, IndexResult<_>>(move || { + let stats = index.stats()?; + let meta = index.meta()?; + Ok((stats, meta)) + }).await??; - last_update = last_update.map_or(Some(index.meta.updated_at), |last| { - Some(last.max(index.meta.updated_at)) + database_size += stats.size; + + last_update = last_update.map_or(Some(meta.updated_at), |last| { + Some(last.max(meta.updated_at)) }); - index_stats.is_indexing = Some(Some(index.uuid) == update_infos.processing); + // Check if the currently indexing update is from our index. + stats.is_indexing = Some(Some(uuid) == update_infos.processing); - indexes.insert(index.uid, index_stats); + indexes.insert(index_uid, stats); } Ok(Stats { diff --git a/meilisearch-lib/src/index_controller/updates/error.rs b/meilisearch-lib/src/index_controller/updates/error.rs index 858631f69..58635b3df 100644 --- a/meilisearch-lib/src/index_controller/updates/error.rs +++ b/meilisearch-lib/src/index_controller/updates/error.rs @@ -3,19 +3,17 @@ use std::error::Error; use meilisearch_error::{Code, ErrorCode}; -use crate::index_controller::indexes::error::IndexActorError; - -pub type Result = std::result::Result; +pub type Result = std::result::Result; #[derive(Debug, thiserror::Error)] #[allow(clippy::large_enum_variant)] -pub enum UpdateActorError { +pub enum UpdateLoopError { #[error("Update {0} not found.")] UnexistingUpdate(u64), #[error("Internal error: {0}")] Internal(Box), - #[error("{0}")] - IndexActor(#[from] IndexActorError), + //#[error("{0}")] + //IndexActor(#[from] IndexActorError), #[error( "update store was shut down due to a fatal error, please check your logs for more info." )] @@ -26,7 +24,7 @@ pub enum UpdateActorError { PayloadError(#[from] actix_web::error::PayloadError), } -impl From> for UpdateActorError +impl From> for UpdateLoopError where T: Sync + Send + 'static + fmt::Debug { fn from(other: tokio::sync::mpsc::error::SendError) -> Self { @@ -34,28 +32,28 @@ where T: Sync + Send + 'static + fmt::Debug } } -impl From for UpdateActorError { +impl From for UpdateLoopError { fn from(other: tokio::sync::oneshot::error::RecvError) -> Self { Self::Internal(Box::new(other)) } } internal_error!( - UpdateActorError: heed::Error, + UpdateLoopError: heed::Error, std::io::Error, serde_json::Error, tokio::task::JoinError ); -impl ErrorCode for UpdateActorError { +impl ErrorCode for UpdateLoopError { fn error_code(&self) -> Code { match self { - UpdateActorError::UnexistingUpdate(_) => Code::NotFound, - UpdateActorError::Internal(_) => Code::Internal, - UpdateActorError::IndexActor(e) => e.error_code(), - UpdateActorError::FatalUpdateStoreError => Code::Internal, - UpdateActorError::InvalidPayload(_) => Code::BadRequest, - UpdateActorError::PayloadError(error) => match error { + UpdateLoopError::UnexistingUpdate(_) => Code::NotFound, + UpdateLoopError::Internal(_) => Code::Internal, + //UpdateLoopError::IndexActor(e) => e.error_code(), + UpdateLoopError::FatalUpdateStoreError => Code::Internal, + UpdateLoopError::InvalidPayload(_) => Code::BadRequest, + UpdateLoopError::PayloadError(error) => match error { actix_web::error::PayloadError::Overflow => Code::PayloadTooLarge, _ => Code::Internal, }, diff --git a/meilisearch-lib/src/index_controller/updates/mod.rs b/meilisearch-lib/src/index_controller/updates/mod.rs index 750ca7c46..7cc38490f 100644 --- a/meilisearch-lib/src/index_controller/updates/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/mod.rs @@ -21,25 +21,25 @@ use serde_json::{Map, Value}; use tokio::sync::mpsc; use uuid::Uuid; -use self::error::{Result, UpdateActorError}; +use self::error::{Result, UpdateLoopError}; pub use self::message::UpdateMsg; use self::store::{UpdateStore, UpdateStoreInfo}; use crate::index_controller::update_file_store::UpdateFileStore; use status::UpdateStatus; -use super::indexes::IndexHandlerSender; +use super::index_resolver::HardStateIndexResolver; use super::{DocumentAdditionFormat, Payload, Update}; pub type UpdateSender = mpsc::Sender; pub fn create_update_handler( - index_sender: IndexHandlerSender, + index_resolver: Arc, db_path: impl AsRef, update_store_size: usize, ) -> anyhow::Result { let path = db_path.as_ref().to_owned(); let (sender, receiver) = mpsc::channel(100); - let actor = UpdateLoop::new(update_store_size, receiver, path, index_sender)?; + let actor = UpdateLoop::new(update_store_size, receiver, path, index_resolver)?; tokio::task::spawn_local(actor.run()); @@ -100,7 +100,7 @@ pub struct UpdateLoop { store: Arc, inbox: Option>, update_file_store: UpdateFileStore, - index_handle: IndexHandlerSender, + index_resolver: Arc, must_exit: Arc, } @@ -109,7 +109,7 @@ impl UpdateLoop { update_db_size: usize, inbox: mpsc::Receiver, path: impl AsRef, - index_handle: IndexHandlerSender, + index_resolver: Arc, ) -> anyhow::Result { let path = path.as_ref().to_owned(); std::fs::create_dir_all(&path)?; @@ -119,7 +119,7 @@ impl UpdateLoop { let must_exit = Arc::new(AtomicBool::new(false)); - let store = UpdateStore::open(options, &path, index_handle.clone(), must_exit.clone())?; + let store = UpdateStore::open(options, &path, index_resolver.clone(), must_exit.clone())?; let inbox = Some(inbox); @@ -128,9 +128,9 @@ impl UpdateLoop { Ok(Self { store, inbox, - index_handle, must_exit, update_file_store, + index_resolver, }) } @@ -249,7 +249,7 @@ impl UpdateLoop { tokio::task::spawn_blocking(move || { let result = store .meta(uuid, id)? - .ok_or(UpdateActorError::UnexistingUpdate(id))?; + .ok_or(UpdateLoopError::UnexistingUpdate(id))?; Ok(result) }) .await? @@ -263,18 +263,19 @@ impl UpdateLoop { Ok(()) } - async fn handle_snapshot(&self, uuids: HashSet, path: PathBuf) -> Result<()> { - let index_handle = self.index_handle.clone(); - let update_store = self.store.clone(); + async fn handle_snapshot(&self, _uuids: HashSet,_pathh: PathBuf) -> Result<()> { + todo!() + //let index_handle = self.index_resolver.clone(); + //let update_store = self.store.clone(); - tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle)) - .await??; + //tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle)) + //.await??; - Ok(()) + //Ok(()) } async fn handle_dump(&self, uuids: HashSet, path: PathBuf) -> Result<()> { - let index_handle = self.index_handle.clone(); + let index_handle = self.index_resolver.clone(); let update_store = self.store.clone(); tokio::task::spawn_blocking(move || -> Result<()> { diff --git a/meilisearch-lib/src/index_controller/updates/store/dump.rs b/meilisearch-lib/src/index_controller/updates/store/dump.rs index 689678cc4..cf5d7e842 100644 --- a/meilisearch-lib/src/index_controller/updates/store/dump.rs +++ b/meilisearch-lib/src/index_controller/updates/store/dump.rs @@ -1,16 +1,11 @@ -use std::{ - collections::HashSet, - fs::{create_dir_all, File}, - io::Write, - path::{Path, PathBuf}, -}; +use std::{collections::HashSet, fs::{create_dir_all, File}, io::Write, path::{Path, PathBuf}, sync::Arc}; use heed::RoTxn; use serde::{Deserialize, Serialize}; use uuid::Uuid; use super::{Result, State, UpdateStore}; -use crate::index_controller::{indexes::{IndexHandlerSender, IndexMsg}, updates::{status::UpdateStatus}}; +use crate::index_controller::{index_resolver::HardStateIndexResolver, updates::status::UpdateStatus}; #[derive(Serialize, Deserialize)] struct UpdateEntry { @@ -23,7 +18,7 @@ impl UpdateStore { &self, uuids: &HashSet, path: PathBuf, - handle: IndexHandlerSender, + handle: Arc, ) -> Result<()> { let state_lock = self.state.write(); state_lock.swap(State::Dumping); @@ -171,13 +166,14 @@ impl UpdateStore { } async fn dump_indexes( - uuids: &HashSet, - handle: IndexHandlerSender, - path: impl AsRef, + _uuids: &HashSet, + _handle: Arc, + _path: impl AsRef, ) -> Result<()> { - for uuid in uuids { - IndexMsg::dump(&handle, *uuid, path.as_ref().to_owned()).await?; - } + todo!() + //for uuid in uuids { + //IndexMsg::dump(&handle, *uuid, path.as_ref().to_owned()).await?; + //} - Ok(()) + //Ok(()) } diff --git a/meilisearch-lib/src/index_controller/updates/store/mod.rs b/meilisearch-lib/src/index_controller/updates/store/mod.rs index 25eb840c9..8d40d8309 100644 --- a/meilisearch-lib/src/index_controller/updates/store/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/store/mod.rs @@ -12,7 +12,6 @@ use std::{ }; use arc_swap::ArcSwap; -use futures::StreamExt; use heed::types::{ByteSlice, OwnedType, SerdeJson}; use heed::zerocopy::U64; use heed::{CompactionOption, Database, Env, EnvOpenOptions}; @@ -30,7 +29,6 @@ use super::RegisterUpdate; use super::error::Result; use super::status::{Enqueued, Processing}; use crate::EnvSizer; -use crate::index_controller::indexes::{CONCURRENT_INDEX_MSG, IndexHandlerSender, IndexMsg}; use crate::index_controller::update_files_path; use crate::index_controller::updates::*; @@ -148,7 +146,7 @@ impl UpdateStore { pub fn open( options: EnvOpenOptions, path: impl AsRef, - index_handle: IndexHandlerSender, + index_resolver: Arc, must_exit: Arc, ) -> anyhow::Result> { let (update_store, mut notification_receiver) = Self::new(options, path)?; @@ -173,7 +171,7 @@ impl UpdateStore { loop { match update_store_weak.upgrade() { Some(update_store) => { - let handler = index_handle.clone(); + let handler = index_resolver.clone(); let res = tokio::task::spawn_blocking(move || { update_store.process_pending_update(handler) }) @@ -286,7 +284,7 @@ impl UpdateStore { /// Executes the user provided function on the next pending update (the one with the lowest id). /// This is asynchronous as it let the user process the update with a read-only txn and /// only writing the result meta to the processed-meta store *after* it has been processed. - fn process_pending_update(&self, index_handle: IndexHandlerSender) -> Result> { + fn process_pending_update(&self, index_resolver: Arc) -> Result> { // Create a read transaction to be able to retrieve the pending update in order. let rtxn = self.env.read_txn()?; let first_meta = self.pending_queue.first(&rtxn)?; @@ -303,7 +301,7 @@ impl UpdateStore { state.swap(State::Processing(index_uuid, processing.clone())); let result = - self.perform_update(processing, index_handle, index_uuid, global_id); + self.perform_update(processing, index_resolver, index_uuid, global_id); state.swap(State::Idle); @@ -316,18 +314,18 @@ impl UpdateStore { fn perform_update( &self, processing: Processing, - index_handle: IndexHandlerSender, + index_resolver: Arc, index_uuid: Uuid, global_id: u64, ) -> Result> { // Process the pending update using the provided user function. let handle = Handle::current(); let update_id = processing.id(); - let result = - match handle.block_on(IndexMsg::update(&index_handle, index_uuid, processing.clone())) { - Ok(result) => result, - Err(e) => Err(processing.fail(e)), - }; + //IndexMsg::update(index_resolver, index_uuid, processing.clone() + let result = match handle.block_on(index_resolver.get_index_by_uuid(index_uuid)) { + Ok(index) => index.handle_update(processing), + Err(e) => Err(processing.fail(e)), + }; // Once the pending update have been successfully processed // we must remove the content from the pending and processing stores and @@ -484,9 +482,9 @@ impl UpdateStore { pub fn snapshot( &self, - uuids: &HashSet, + _uuids: &HashSet, path: impl AsRef, - handle: IndexHandlerSender, + handle: Arc, ) -> Result<()> { let state_lock = self.state.write(); state_lock.swap(State::Snapshoting); @@ -522,22 +520,23 @@ impl UpdateStore { //} } - let path = &path.as_ref().to_path_buf(); - let handle = &handle; + let _path = &path.as_ref().to_path_buf(); + let _handle = &handle; // Perform the snapshot of each index concurently. Only a third of the capabilities of // the index actor at a time not to put too much pressure on the index actor - let mut stream = futures::stream::iter(uuids.iter()) - .map(move |uuid| IndexMsg::snapshot(handle,*uuid, path.clone())) - .buffer_unordered(CONCURRENT_INDEX_MSG / 3); + todo!() + //let mut stream = futures::stream::iter(uuids.iter()) + //.map(move |uuid| IndexMsg::snapshot(handle,*uuid, path.clone())) + //.buffer_unordered(CONCURRENT_INDEX_MSG / 3); - Handle::current().block_on(async { - while let Some(res) = stream.next().await { - res?; - } - Ok(()) as Result<()> - })?; + //Handle::current().block_on(async { + //while let Some(res) = stream.next().await { + //res?; + //} + //Ok(()) as Result<()> + //})?; - Ok(()) + //Ok(()) } pub fn get_info(&self) -> Result {