diff --git a/meilisearch-lib/src/index_controller/indexes/error.rs b/meilisearch-lib/src/index_controller/indexes/error.rs deleted file mode 100644 index 51fe273f7..000000000 --- a/meilisearch-lib/src/index_controller/indexes/error.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::fmt; - -use meilisearch_error::{Code, ErrorCode}; - -use crate::{error::MilliError, index::error::IndexError}; - -pub type Result = std::result::Result; - -#[derive(thiserror::Error, Debug)] -pub enum IndexActorError { - #[error("{0}")] - IndexError(#[from] IndexError), - #[error("Index already exists")] - IndexAlreadyExists, - #[error("Index not found")] - UnexistingIndex, - #[error("A primary key is already present. It's impossible to update it")] - ExistingPrimaryKey, - #[error("Internal Error: {0}")] - Internal(Box), - #[error("{0}")] - Milli(#[from] milli::Error), -} - -impl From> for IndexActorError -where T: Send + Sync + 'static + fmt::Debug -{ - fn from(other: tokio::sync::mpsc::error::SendError) -> Self { - Self::Internal(Box::new(other)) - } -} - -impl From for IndexActorError { - fn from(other: tokio::sync::oneshot::error::RecvError) -> Self { - Self::Internal(Box::new(other)) - } -} - -macro_rules! internal_error { - ($($other:path), *) => { - $( - impl From<$other> for IndexActorError { - fn from(other: $other) -> Self { - Self::Internal(Box::new(other)) - } - } - )* - } -} - -internal_error!(heed::Error, tokio::task::JoinError, std::io::Error); - -impl ErrorCode for IndexActorError { - fn error_code(&self) -> Code { - match self { - IndexActorError::IndexError(e) => e.error_code(), - IndexActorError::IndexAlreadyExists => Code::IndexAlreadyExists, - IndexActorError::UnexistingIndex => Code::IndexNotFound, - IndexActorError::ExistingPrimaryKey => Code::PrimaryKeyAlreadyPresent, - IndexActorError::Internal(_) => Code::Internal, - IndexActorError::Milli(e) => MilliError(e).error_code(), - } - } -} diff --git a/meilisearch-lib/src/index_controller/indexes/message.rs b/meilisearch-lib/src/index_controller/indexes/message.rs deleted file mode 100644 index e9c67d0ab..000000000 --- a/meilisearch-lib/src/index_controller/indexes/message.rs +++ /dev/null @@ -1,212 +0,0 @@ -use std::path::PathBuf; - -use tokio::sync::{mpsc, oneshot}; -use uuid::Uuid; - -use super::error::Result; -use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; -use crate::index_controller::updates::status::{Failed, Processed, Processing}; -use crate::index_controller::{IndexSettings, IndexStats}; - -use super::IndexMeta; - -#[allow(clippy::large_enum_variant)] -#[derive(Debug)] -pub enum IndexMsg { - CreateIndex { - uuid: Uuid, - primary_key: Option, - ret: oneshot::Sender>, - }, - Update { - uuid: Uuid, - meta: Processing, - ret: oneshot::Sender>>, - }, - Search { - uuid: Uuid, - query: SearchQuery, - ret: oneshot::Sender>, - }, - Settings { - uuid: Uuid, - ret: oneshot::Sender>>, - }, - Documents { - uuid: Uuid, - attributes_to_retrieve: Option>, - offset: usize, - limit: usize, - ret: oneshot::Sender>>, - }, - Document { - uuid: Uuid, - attributes_to_retrieve: Option>, - doc_id: String, - ret: oneshot::Sender>, - }, - Delete { - uuid: Uuid, - ret: oneshot::Sender>, - }, - GetMeta { - uuid: Uuid, - ret: oneshot::Sender>, - }, - UpdateIndex { - uuid: Uuid, - index_settings: IndexSettings, - ret: oneshot::Sender>, - }, - Snapshot { - uuid: Uuid, - path: PathBuf, - ret: oneshot::Sender>, - }, - Dump { - uuid: Uuid, - path: PathBuf, - ret: oneshot::Sender>, - }, - GetStats { - uuid: Uuid, - ret: oneshot::Sender>, - }, -} - -impl IndexMsg { - pub async fn search( - sender: &mpsc::Sender, - uuid: Uuid, - query: SearchQuery, - ) -> Result { - let (ret, rcv) = oneshot::channel(); - let msg = Self::Search { - ret, - uuid, - query, - }; - sender.send(msg).await?; - rcv.await? - } - - pub async fn update_index( - sender: &mpsc::Sender, - uuid: Uuid, - index_settings: IndexSettings, - ) -> Result { - let (ret, rcv) = oneshot::channel(); - let msg = Self::UpdateIndex { - ret, - uuid, - index_settings, - }; - sender.send(msg).await?; - rcv.await? - } - - pub async fn create_index( - sender: &mpsc::Sender, - uuid: Uuid, - primary_key: Option, - ) -> Result { - let (ret, rcv) = oneshot::channel(); - let msg = Self::CreateIndex { - ret, - uuid, - primary_key, - }; - sender.send(msg).await?; - rcv.await? - } - - pub async fn index_meta(sender: &mpsc::Sender, uuid: Uuid) -> Result { - let (ret, rcv) = oneshot::channel(); - let msg = Self::GetMeta { ret, uuid }; - sender.send(msg).await?; - rcv.await? - } - - pub async fn index_stats(sender: &mpsc::Sender, uuid: Uuid) -> Result { - let (ret, rcv) = oneshot::channel(); - let msg = Self::GetStats { ret, uuid }; - sender.send(msg).await?; - rcv.await? - } - - pub async fn settings(sender: &mpsc::Sender, uuid: Uuid) -> Result> { - let (ret, rcv) = oneshot::channel(); - let msg = Self::Settings { ret, uuid }; - sender.send(msg).await?; - rcv.await? - } - - pub async fn documents( - sender: &mpsc::Sender, - uuid: Uuid, - offset: usize, - limit: usize, - attributes_to_retrieve: Option>, - ) -> Result> { - let (ret, rcv) = oneshot::channel(); - let msg = Self::Documents { - ret, - uuid, - attributes_to_retrieve, - offset, - limit, - }; - sender.send(msg).await?; - rcv.await? - } - - pub async fn document( - sender: &mpsc::Sender, - uuid: Uuid, - attributes_to_retrieve: Option>, - doc_id: String, - ) -> Result { - let (ret, rcv) = oneshot::channel(); - let msg = Self::Document { - ret, - uuid, - attributes_to_retrieve, - doc_id, - }; - sender.send(msg).await?; - rcv.await? - } - - pub async fn update(sender: &mpsc::Sender, uuid: Uuid, meta: Processing) -> Result> { - let (ret, rcv) = oneshot::channel(); - let msg = Self::Update { - ret, - uuid, - meta, - }; - sender.send(msg).await?; - rcv.await? - } - - pub async fn snapshot(sender: &mpsc::Sender, uuid: Uuid, path: PathBuf) -> Result<()> { - let (ret, rcv) = oneshot::channel(); - let msg = Self::Snapshot { - uuid, - path, - ret, - }; - sender.send(msg).await?; - rcv.await? - } - - pub async fn dump(sender: &mpsc::Sender, uuid: Uuid, path: PathBuf) -> Result<()> { - let (ret, rcv) = oneshot::channel(); - let msg = Self::Dump { - uuid, - ret, - path, - }; - sender.send(msg).await?; - rcv.await? - } -} diff --git a/meilisearch-lib/src/index_controller/indexes/mod.rs b/meilisearch-lib/src/index_controller/indexes/mod.rs deleted file mode 100644 index 48649cf40..000000000 --- a/meilisearch-lib/src/index_controller/indexes/mod.rs +++ /dev/null @@ -1,483 +0,0 @@ -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use async_stream::stream; -use futures::stream::StreamExt; -use heed::CompactionOption; -use log::debug; -use milli::update::UpdateBuilder; -use tokio::task::spawn_blocking; -use tokio::{fs, sync::mpsc}; - -use crate::index::update_handler::UpdateHandler; -use crate::index_controller::updates::status::{Failed, Processed, Processing}; -use crate::index_controller::{get_arc_ownership_blocking, IndexStats}; -use crate::options::IndexerOpts; - -pub const CONCURRENT_INDEX_MSG: usize = 10; - -use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -pub use message::IndexMsg; - -use crate::index::{Checked, Document, Index, SearchQuery, SearchResult, Settings}; -use error::Result; - -use self::error::IndexActorError; -use self::store::{IndexStore, MapIndexStore}; - -use super::IndexSettings; - -pub mod error; -mod message; -mod store; - -pub type IndexHandlerSender = mpsc::Sender; - -pub fn create_indexes_handler( - db_path: impl AsRef, - index_size: usize, - indexer_options: &IndexerOpts, -) -> anyhow::Result { - let (sender, receiver) = mpsc::channel(100); - let store = MapIndexStore::new(&db_path, index_size, indexer_options); - let actor = IndexActor::new(receiver, store, indexer_options)?; - - tokio::task::spawn(actor.run()); - - Ok(sender) -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct IndexMeta { - created_at: DateTime, - pub updated_at: DateTime, - pub primary_key: Option, -} - -impl IndexMeta { - pub fn new(index: &Index) -> Result { - let txn = index.read_txn()?; - Self::new_txn(index, &txn) - } - - fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result { - let created_at = index.created_at(txn)?; - let updated_at = index.updated_at(txn)?; - let primary_key = index.primary_key(txn)?.map(String::from); - Ok(Self { - created_at, - updated_at, - primary_key, - }) - } -} - -pub struct IndexActor { - receiver: Option>, - update_handler: Arc, - store: S, -} - -impl IndexActor -where - S: IndexStore + Sync + Send, -{ - pub fn new( - receiver: mpsc::Receiver, - store: S, - options: &IndexerOpts, - ) -> anyhow::Result { - let update_handler = Arc::new(UpdateHandler::new(options)?); - let receiver = Some(receiver); - - Ok(Self { - receiver, - update_handler, - store, - }) - } - - /// `run` poll the write_receiver and read_receiver concurrently, but while messages send - /// through the read channel are processed concurrently, the messages sent through the write - /// channel are processed one at a time. - pub async fn run(mut self) { - let mut receiver = self - .receiver - .take() - .expect("Index Actor must have a inbox at this point."); - - let stream = stream! { - loop { - match receiver.recv().await { - Some(msg) => yield msg, - None => break, - } - } - }; - - stream - .for_each_concurrent(Some(CONCURRENT_INDEX_MSG), |msg| self.handle_message(msg)) - .await; - } - - async fn handle_message(&self, msg: IndexMsg) { - use IndexMsg::*; - match msg { - CreateIndex { - uuid, - primary_key, - ret, - } => { - let _ = ret.send(self.handle_create_index(uuid, primary_key).await); - } - Update { ret, meta, uuid } => { - let _ = ret.send(self.handle_update(uuid, meta).await); - } - Search { ret, query, uuid } => { - let _ = ret.send(self.handle_search(uuid, query).await); - } - Settings { ret, uuid } => { - let _ = ret.send(self.handle_settings(uuid).await); - } - Documents { - ret, - uuid, - attributes_to_retrieve, - offset, - limit, - } => { - let _ = ret.send( - self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve) - .await, - ); - } - Document { - uuid, - attributes_to_retrieve, - doc_id, - ret, - } => { - let _ = ret.send( - self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve) - .await, - ); - } - Delete { uuid, ret } => { - let _ = ret.send(self.handle_delete(uuid).await); - } - GetMeta { uuid, ret } => { - let _ = ret.send(self.handle_get_meta(uuid).await); - } - UpdateIndex { - uuid, - index_settings, - ret, - } => { - let _ = ret.send(self.handle_update_index(uuid, index_settings).await); - } - Snapshot { uuid, path, ret } => { - let _ = ret.send(self.handle_snapshot(uuid, path).await); - } - Dump { uuid, path, ret } => { - let _ = ret.send(self.handle_dump(uuid, path).await); - } - GetStats { uuid, ret } => { - let _ = ret.send(self.handle_get_stats(uuid).await); - } - } - } - - async fn handle_search(&self, uuid: Uuid, query: SearchQuery) -> Result { - let index = self - .store - .get(uuid) - .await? - .ok_or(IndexActorError::UnexistingIndex)?; - let result = spawn_blocking(move || index.perform_search(query)).await??; - Ok(result) - } - - async fn handle_create_index( - &self, - uuid: Uuid, - primary_key: Option, - ) -> Result { - let index = self.store.create(uuid, primary_key).await?; - let meta = spawn_blocking(move || IndexMeta::new(&index)).await??; - Ok(meta) - } - - async fn handle_update( - &self, - uuid: Uuid, - meta: Processing, - ) -> Result> { - debug!("Processing update {}", meta.id()); - let update_handler = self.update_handler.clone(); - let index = match self.store.get(uuid).await? { - Some(index) => index, - None => self.store.create(uuid, None).await?, - }; - - Ok(spawn_blocking(move || update_handler.handle_update(&index, meta)).await?) - } - - async fn handle_settings(&self, uuid: Uuid) -> Result> { - let index = self - .store - .get(uuid) - .await? - .ok_or(IndexActorError::UnexistingIndex)?; - let result = spawn_blocking(move || index.settings()).await??; - Ok(result) - } - - async fn handle_fetch_documents( - &self, - uuid: Uuid, - offset: usize, - limit: usize, - attributes_to_retrieve: Option>, - ) -> Result> { - let index = self - .store - .get(uuid) - .await? - .ok_or(IndexActorError::UnexistingIndex)?; - let result = - spawn_blocking(move || index.retrieve_documents(offset, limit, attributes_to_retrieve)) - .await??; - - Ok(result) - } - - async fn handle_fetch_document( - &self, - uuid: Uuid, - doc_id: String, - attributes_to_retrieve: Option>, - ) -> Result { - let index = self - .store - .get(uuid) - .await? - .ok_or(IndexActorError::UnexistingIndex)?; - - let result = - spawn_blocking(move || index.retrieve_document(doc_id, attributes_to_retrieve)) - .await??; - - Ok(result) - } - - async fn handle_delete(&self, uuid: Uuid) -> Result<()> { - let index = self.store.delete(uuid).await?; - - if let Some(index) = index { - tokio::task::spawn(async move { - let index = index.inner; - let store = get_arc_ownership_blocking(index).await; - spawn_blocking(move || { - store.prepare_for_closing().wait(); - debug!("Index closed"); - }); - }); - } - - Ok(()) - } - - async fn handle_get_meta(&self, uuid: Uuid) -> Result { - match self.store.get(uuid).await? { - Some(index) => { - let meta = spawn_blocking(move || IndexMeta::new(&index)).await??; - Ok(meta) - } - None => Err(IndexActorError::UnexistingIndex), - } - } - - async fn handle_update_index( - &self, - uuid: Uuid, - index_settings: IndexSettings, - ) -> Result { - let index = self - .store - .get(uuid) - .await? - .ok_or(IndexActorError::UnexistingIndex)?; - - let result = spawn_blocking(move || match index_settings.primary_key { - Some(primary_key) => { - let mut txn = index.write_txn()?; - if index.primary_key(&txn)?.is_some() { - return Err(IndexActorError::ExistingPrimaryKey); - } - let mut builder = UpdateBuilder::new(0).settings(&mut txn, &index); - builder.set_primary_key(primary_key); - builder.execute(|_, _| ())?; - let meta = IndexMeta::new_txn(&index, &txn)?; - txn.commit()?; - Ok(meta) - } - None => { - let meta = IndexMeta::new(&index)?; - Ok(meta) - } - }) - .await??; - - Ok(result) - } - - async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> Result<()> { - use tokio::fs::create_dir_all; - - path.push("indexes"); - create_dir_all(&path).await?; - - if let Some(index) = self.store.get(uuid).await? { - let mut index_path = path.join(format!("index-{}", uuid)); - - create_dir_all(&index_path).await?; - - index_path.push("data.mdb"); - spawn_blocking(move || -> Result<()> { - // Get write txn to wait for ongoing write transaction before snapshot. - let _txn = index.write_txn()?; - index - .env - .copy_to_path(index_path, CompactionOption::Enabled)?; - Ok(()) - }) - .await??; - } - - Ok(()) - } - - /// Create a `documents.jsonl` and a `settings.json` in `path/uid/` with a dump of all the - /// documents and all the settings. - async fn handle_dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> { - let index = self - .store - .get(uuid) - .await? - .ok_or(IndexActorError::UnexistingIndex)?; - - let path = path.join(format!("indexes/index-{}/", uuid)); - fs::create_dir_all(&path).await?; - - tokio::task::spawn_blocking(move || index.dump(path)).await??; - - Ok(()) - } - - async fn handle_get_stats(&self, uuid: Uuid) -> Result { - let index = self - .store - .get(uuid) - .await? - .ok_or(IndexActorError::UnexistingIndex)?; - - spawn_blocking(move || { - let rtxn = index.read_txn()?; - - Ok(IndexStats { - size: index.size(), - number_of_documents: index.number_of_documents(&rtxn)?, - is_indexing: None, - field_distribution: index.field_distribution(&rtxn)?, - }) - }) - .await? - } -} - -#[cfg(test)] -mod test { - use std::sync::Arc; - - use super::*; - - #[async_trait::async_trait] - /// Useful for passing around an `Arc` in tests. - impl IndexActorHandle for Arc { - async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result { - self.as_ref().create_index(uuid, primary_key).await - } - - async fn update( - &self, - uuid: Uuid, - meta: Processing, - data: Option, - ) -> Result> { - self.as_ref().update(uuid, meta, data).await - } - - async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result { - self.as_ref().search(uuid, query).await - } - - async fn settings(&self, uuid: Uuid) -> Result> { - self.as_ref().settings(uuid).await - } - - async fn documents( - &self, - uuid: Uuid, - offset: usize, - limit: usize, - attributes_to_retrieve: Option>, - ) -> Result> { - self.as_ref() - .documents(uuid, offset, limit, attributes_to_retrieve) - .await - } - - async fn document( - &self, - uuid: Uuid, - doc_id: String, - attributes_to_retrieve: Option>, - ) -> Result { - self.as_ref() - .document(uuid, doc_id, attributes_to_retrieve) - .await - } - - async fn delete(&self, uuid: Uuid) -> Result<()> { - self.as_ref().delete(uuid).await - } - - async fn get_index_meta(&self, uuid: Uuid) -> Result { - self.as_ref().get_index_meta(uuid).await - } - - async fn update_index( - &self, - uuid: Uuid, - index_settings: IndexSettings, - ) -> Result { - self.as_ref().update_index(uuid, index_settings).await - } - - async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { - self.as_ref().snapshot(uuid, path).await - } - - async fn dump(&self, uuid: Uuid, path: PathBuf) -> Result<()> { - self.as_ref().dump(uuid, path).await - } - - async fn get_index_stats(&self, uuid: Uuid) -> Result { - self.as_ref().get_index_stats(uuid).await - } - } -} diff --git a/meilisearch-lib/src/index_controller/indexes/store.rs b/meilisearch-lib/src/index_controller/indexes/store.rs deleted file mode 100644 index 336ff6e0a..000000000 --- a/meilisearch-lib/src/index_controller/indexes/store.rs +++ /dev/null @@ -1,113 +0,0 @@ -use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use milli::update::UpdateBuilder; -use tokio::fs; -use tokio::sync::RwLock; -use tokio::task::spawn_blocking; -use uuid::Uuid; - -use super::error::{IndexActorError, Result}; -use crate::index::Index; -use crate::index::update_handler::UpdateHandler; -use crate::index_controller::update_file_store::UpdateFileStore; - -type AsyncMap = Arc>>; - -#[async_trait::async_trait] -pub trait IndexStore { - async fn create(&self, uuid: Uuid, primary_key: Option) -> Result; - async fn get(&self, uuid: Uuid) -> Result>; - async fn delete(&self, uuid: Uuid) -> Result>; -} - -pub struct MapIndexStore { - index_store: AsyncMap, - path: PathBuf, - index_size: usize, - update_file_store: Arc, - update_handler: Arc, -} - -impl MapIndexStore { - pub fn new(path: impl AsRef, index_size: usize, update_handler: Arc) -> Self { - let update_file_store = Arc::new(UpdateFileStore::new(path.as_ref()).unwrap()); - let path = path.as_ref().join("indexes/"); - let index_store = Arc::new(RwLock::new(HashMap::new())); - Self { - index_store, - path, - index_size, - update_file_store, - update_handler, - } - } -} - -#[async_trait::async_trait] -impl IndexStore for MapIndexStore { - async fn create(&self, uuid: Uuid, primary_key: Option) -> Result { - // We need to keep the lock until we are sure the db file has been opened correclty, to - // ensure that another db is not created at the same time. - let mut lock = self.index_store.write().await; - - if let Some(index) = lock.get(&uuid) { - return Ok(index.clone()); - } - let path = self.path.join(format!("index-{}", uuid)); - if path.exists() { - return Err(IndexActorError::IndexAlreadyExists); - } - - let index_size = self.index_size; - let file_store = self.update_file_store.clone(); - let update_handler = self.update_handler.clone(); - let index = spawn_blocking(move || -> Result { - let index = Index::open(path, index_size, file_store, uuid, update_handler)?; - if let Some(primary_key) = primary_key { - let mut txn = index.write_txn()?; - - let mut builder = UpdateBuilder::new(0).settings(&mut txn, &index); - builder.set_primary_key(primary_key); - builder.execute(|_, _| ())?; - - txn.commit()?; - } - Ok(index) - }) - .await??; - - lock.insert(uuid, index.clone()); - - Ok(index) - } - - async fn get(&self, uuid: Uuid) -> Result> { - let guard = self.index_store.read().await; - match guard.get(&uuid) { - Some(index) => Ok(Some(index.clone())), - None => { - // drop the guard here so we can perform the write after without deadlocking; - drop(guard); - let path = self.path.join(format!("index-{}", uuid)); - if !path.exists() { - return Ok(None); - } - - let index_size = self.index_size; - let file_store = self.update_file_store.clone(); - let index = spawn_blocking(move || Index::open(path, index_size, file_store)).await??; - self.index_store.write().await.insert(uuid, index.clone()); - Ok(Some(index)) - } - } - } - - async fn delete(&self, uuid: Uuid) -> Result> { - let db_path = self.path.join(format!("index-{}", uuid)); - fs::remove_dir_all(db_path).await?; - let index = self.index_store.write().await.remove(&uuid); - Ok(index) - } -} diff --git a/meilisearch-lib/src/index_controller/uuid_resolver/error.rs b/meilisearch-lib/src/index_controller/uuid_resolver/error.rs deleted file mode 100644 index 8f32fa35d..000000000 --- a/meilisearch-lib/src/index_controller/uuid_resolver/error.rs +++ /dev/null @@ -1,50 +0,0 @@ -use std::fmt; - -use meilisearch_error::{Code, ErrorCode}; -use tokio::sync::mpsc::error::SendError as MpscSendError; -use tokio::sync::oneshot::error::RecvError as OneshotRecvError; - -pub type Result = std::result::Result; - -#[derive(Debug, thiserror::Error)] -pub enum UuidResolverError { - #[error("Index already exists.")] - NameAlreadyExist, - #[error("Index \"{0}\" not found.")] - UnexistingIndex(String), - #[error("Index must have a valid uid; Index uid can be of type integer or string only composed of alphanumeric characters, hyphens (-) and underscores (_).")] - BadlyFormatted(String), - #[error("Internal error: {0}")] - Internal(Box), -} - -internal_error!( - UuidResolverError: heed::Error, - uuid::Error, - std::io::Error, - tokio::task::JoinError, - serde_json::Error -); - -impl From> for UuidResolverError { - fn from(other: MpscSendError) -> Self { - Self::Internal(Box::new(other)) - } -} - -impl From for UuidResolverError { - fn from(other: OneshotRecvError) -> Self { - Self::Internal(Box::new(other)) - } -} - -impl ErrorCode for UuidResolverError { - fn error_code(&self) -> Code { - match self { - UuidResolverError::NameAlreadyExist => Code::IndexAlreadyExists, - UuidResolverError::UnexistingIndex(_) => Code::IndexNotFound, - UuidResolverError::BadlyFormatted(_) => Code::InvalidIndexUid, - UuidResolverError::Internal(_) => Code::Internal, - } - } -} diff --git a/meilisearch-lib/src/index_controller/uuid_resolver/message.rs b/meilisearch-lib/src/index_controller/uuid_resolver/message.rs deleted file mode 100644 index e9da56d5e..000000000 --- a/meilisearch-lib/src/index_controller/uuid_resolver/message.rs +++ /dev/null @@ -1,89 +0,0 @@ -use std::collections::HashSet; -use std::path::PathBuf; - -use tokio::sync::{mpsc, oneshot}; -use uuid::Uuid; - -use super::error::Result; - -#[derive(Debug)] -pub enum UuidResolverMsg { - Get { - uid: String, - ret: oneshot::Sender>, - }, - Delete { - uid: String, - ret: oneshot::Sender>, - }, - List { - ret: oneshot::Sender>>, - }, - Insert { - uuid: Uuid, - name: String, - ret: oneshot::Sender>, - }, - SnapshotRequest { - path: PathBuf, - ret: oneshot::Sender>>, - }, - GetSize { - ret: oneshot::Sender>, - }, - DumpRequest { - path: PathBuf, - ret: oneshot::Sender>>, - }, -} - -impl UuidResolverMsg { - pub async fn get(channel: &mpsc::Sender, uid: String) -> Result { - let (ret, recv) = oneshot::channel(); - let msg = Self::Get { uid, ret }; - channel.send(msg).await?; - recv.await? - } - - pub async fn insert(channel: &mpsc::Sender, uuid: Uuid, name: String) -> Result<()> { - let (ret, recv) = oneshot::channel(); - let msg = Self::Insert { name, uuid, ret }; - channel.send(msg).await?; - recv.await? - } - - pub async fn list(channel: &mpsc::Sender) -> Result> { - let (ret, recv) = oneshot::channel(); - let msg = Self::List { ret }; - channel.send(msg).await?; - recv.await? - } - - pub async fn get_size(channel: &mpsc::Sender) -> Result { - let (ret, recv) = oneshot::channel(); - let msg = Self::GetSize { ret }; - channel.send(msg).await?; - recv.await? - } - - pub async fn dump(channel: &mpsc::Sender, path: PathBuf) -> Result> { - let (ret, recv) = oneshot::channel(); - let msg = Self::DumpRequest { ret, path }; - channel.send(msg).await?; - recv.await? - } - - pub async fn snapshot(channel: &mpsc::Sender, path: PathBuf) -> Result> { - let (ret, recv) = oneshot::channel(); - let msg = Self::SnapshotRequest { ret, path }; - channel.send(msg).await?; - recv.await? - } - - pub async fn delete(channel: &mpsc::Sender, uid: String) -> Result { - let (ret, recv) = oneshot::channel(); - let msg = Self::Delete { ret, uid }; - channel.send(msg).await?; - recv.await? - } -} diff --git a/meilisearch-lib/src/index_controller/uuid_resolver/mod.rs b/meilisearch-lib/src/index_controller/uuid_resolver/mod.rs deleted file mode 100644 index 7157c1b41..000000000 --- a/meilisearch-lib/src/index_controller/uuid_resolver/mod.rs +++ /dev/null @@ -1,118 +0,0 @@ -pub mod error; -mod message; -pub mod store; - -use std::path::Path; -use std::{collections::HashSet, path::PathBuf}; - -use log::{trace, warn}; -use tokio::sync::mpsc; -use uuid::Uuid; - -pub use self::error::UuidResolverError; -pub use self::message::UuidResolverMsg; -pub use self::store::{HeedUuidStore, UuidStore}; -use self::error::Result; - -pub type UuidResolverSender = mpsc::Sender; - -const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB - -pub fn create_uuid_resolver(path: impl AsRef) -> Result> { - let (sender, reveiver) = mpsc::channel(100); - let store = HeedUuidStore::new(path)?; - let actor = UuidResolver::new(reveiver, store); - tokio::spawn(actor.run()); - Ok(sender) -} - -pub struct UuidResolver { - inbox: mpsc::Receiver, - store: S, -} - -impl UuidResolver { - pub fn new(inbox: mpsc::Receiver, store: S) -> Self { - Self { inbox, store } - } - - pub async fn run(mut self) { - use UuidResolverMsg::*; - - trace!("uuid resolver started"); - - loop { - match self.inbox.recv().await { - Some(Get { uid: name, ret }) => { - let _ = ret.send(self.handle_get(name).await); - } - Some(Delete { uid: name, ret }) => { - let _ = ret.send(self.handle_delete(name).await); - } - Some(List { ret }) => { - let _ = ret.send(self.handle_list().await); - } - Some(Insert { ret, uuid, name }) => { - let _ = ret.send(self.handle_insert(name, uuid).await); - } - Some(SnapshotRequest { path, ret }) => { - let _ = ret.send(self.handle_snapshot(path).await); - } - Some(GetSize { ret }) => { - let _ = ret.send(self.handle_get_size().await); - } - Some(DumpRequest { path, ret }) => { - let _ = ret.send(self.handle_dump(path).await); - } - // all senders have been dropped, need to quit. - None => break, - } - } - - warn!("exiting uuid resolver loop"); - } - - async fn handle_get(&self, uid: String) -> Result { - self.store - .get_uuid(uid.clone()) - .await? - .ok_or(UuidResolverError::UnexistingIndex(uid)) - } - - async fn handle_delete(&self, uid: String) -> Result { - self.store - .delete(uid.clone()) - .await? - .ok_or(UuidResolverError::UnexistingIndex(uid)) - } - - async fn handle_list(&self) -> Result> { - let result = self.store.list().await?; - Ok(result) - } - - async fn handle_snapshot(&self, path: PathBuf) -> Result> { - self.store.snapshot(path).await - } - - async fn handle_dump(&self, path: PathBuf) -> Result> { - self.store.dump(path).await - } - - async fn handle_insert(&self, uid: String, uuid: Uuid) -> Result<()> { - if !is_index_uid_valid(&uid) { - return Err(UuidResolverError::BadlyFormatted(uid)); - } - self.store.insert(uid, uuid).await?; - Ok(()) - } - - async fn handle_get_size(&self) -> Result { - self.store.get_size().await - } -} - -fn is_index_uid_valid(uid: &str) -> bool { - uid.chars() - .all(|x| x.is_ascii_alphanumeric() || x == '-' || x == '_') -} diff --git a/meilisearch-lib/src/index_controller/uuid_resolver/store.rs b/meilisearch-lib/src/index_controller/uuid_resolver/store.rs deleted file mode 100644 index 34ba8ced5..000000000 --- a/meilisearch-lib/src/index_controller/uuid_resolver/store.rs +++ /dev/null @@ -1,225 +0,0 @@ -use std::collections::HashSet; -use std::fs::{create_dir_all, File}; -use std::io::{BufRead, BufReader, Write}; -use std::path::{Path, PathBuf}; - -use heed::types::{ByteSlice, Str}; -use heed::{CompactionOption, Database, Env, EnvOpenOptions}; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -use super::UUID_STORE_SIZE; -use super::error::{UuidResolverError, Result}; -use crate::EnvSizer; - -#[derive(Serialize, Deserialize)] -struct DumpEntry { - uuid: Uuid, - uid: String, -} - -const UUIDS_DB_PATH: &str = "index_uuids"; - -#[async_trait::async_trait] -pub trait UuidStore: Sized { - // Create a new entry for `name`. Return an error if `err` and the entry already exists, return - // the uuid otherwise. - async fn get_uuid(&self, uid: String) -> Result>; - async fn delete(&self, uid: String) -> Result>; - async fn list(&self) -> Result>; - async fn insert(&self, name: String, uuid: Uuid) -> Result<()>; - async fn snapshot(&self, path: PathBuf) -> Result>; - async fn get_size(&self) -> Result; - async fn dump(&self, path: PathBuf) -> Result>; -} - -#[derive(Clone)] -pub struct HeedUuidStore { - env: Env, - db: Database, -} - -impl HeedUuidStore { - pub fn new(path: impl AsRef) -> Result { - let path = path.as_ref().join(UUIDS_DB_PATH); - create_dir_all(&path)?; - let mut options = EnvOpenOptions::new(); - options.map_size(UUID_STORE_SIZE); // 1GB - let env = options.open(path)?; - let db = env.create_database(None)?; - Ok(Self { env, db }) - } - - pub fn get_uuid(&self, name: String) -> Result> { - let env = self.env.clone(); - let db = self.db; - let txn = env.read_txn()?; - match db.get(&txn, &name)? { - Some(uuid) => { - let uuid = Uuid::from_slice(uuid)?; - Ok(Some(uuid)) - } - None => Ok(None), - } - } - - pub fn delete(&self, uid: String) -> Result> { - let env = self.env.clone(); - let db = self.db; - let mut txn = env.write_txn()?; - match db.get(&txn, &uid)? { - Some(uuid) => { - let uuid = Uuid::from_slice(uuid)?; - db.delete(&mut txn, &uid)?; - txn.commit()?; - Ok(Some(uuid)) - } - None => Ok(None), - } - } - - pub fn list(&self) -> Result> { - let env = self.env.clone(); - let db = self.db; - let txn = env.read_txn()?; - let mut entries = Vec::new(); - for entry in db.iter(&txn)? { - let (name, uuid) = entry?; - let uuid = Uuid::from_slice(uuid)?; - entries.push((name.to_owned(), uuid)) - } - Ok(entries) - } - - pub fn insert(&self, name: String, uuid: Uuid) -> Result<()> { - let env = self.env.clone(); - let db = self.db; - let mut txn = env.write_txn()?; - - if db.get(&txn, &name)?.is_some() { - return Err(UuidResolverError::NameAlreadyExist); - } - - db.put(&mut txn, &name, uuid.as_bytes())?; - txn.commit()?; - Ok(()) - } - - pub fn snapshot(&self, mut path: PathBuf) -> Result> { - let env = self.env.clone(); - let db = self.db; - // Write transaction to acquire a lock on the database. - let txn = env.write_txn()?; - let mut entries = HashSet::new(); - for entry in db.iter(&txn)? { - let (_, uuid) = entry?; - let uuid = Uuid::from_slice(uuid)?; - entries.insert(uuid); - } - - // only perform snapshot if there are indexes - if !entries.is_empty() { - path.push(UUIDS_DB_PATH); - create_dir_all(&path).unwrap(); - path.push("data.mdb"); - env.copy_to_path(path, CompactionOption::Enabled)?; - } - Ok(entries) - } - - pub fn get_size(&self) -> Result { - Ok(self.env.size()) - } - - pub fn dump(&self, path: PathBuf) -> Result> { - let dump_path = path.join(UUIDS_DB_PATH); - create_dir_all(&dump_path)?; - let dump_file_path = dump_path.join("data.jsonl"); - let mut dump_file = File::create(&dump_file_path)?; - let mut uuids = HashSet::new(); - - let txn = self.env.read_txn()?; - for entry in self.db.iter(&txn)? { - let (uid, uuid) = entry?; - let uid = uid.to_string(); - let uuid = Uuid::from_slice(uuid)?; - - let entry = DumpEntry { uuid, uid }; - serde_json::to_writer(&mut dump_file, &entry)?; - dump_file.write_all(b"\n").unwrap(); - - uuids.insert(uuid); - } - - Ok(uuids) - } - - pub fn load_dump(src: impl AsRef, dst: impl AsRef) -> Result<()> { - let uuid_resolver_path = dst.as_ref().join(UUIDS_DB_PATH); - std::fs::create_dir_all(&uuid_resolver_path)?; - - let src_indexes = src.as_ref().join(UUIDS_DB_PATH).join("data.jsonl"); - let indexes = File::open(&src_indexes)?; - let mut indexes = BufReader::new(indexes); - let mut line = String::new(); - - let db = Self::new(dst)?; - let mut txn = db.env.write_txn()?; - - loop { - match indexes.read_line(&mut line) { - Ok(0) => break, - Ok(_) => { - let DumpEntry { uuid, uid } = serde_json::from_str(&line)?; - println!("importing {} {}", uid, uuid); - db.db.put(&mut txn, &uid, uuid.as_bytes())?; - } - Err(e) => return Err(e.into()), - } - - line.clear(); - } - txn.commit()?; - - db.env.prepare_for_closing().wait(); - - Ok(()) - } -} - -#[async_trait::async_trait] -impl UuidStore for HeedUuidStore { - async fn get_uuid(&self, name: String) -> Result> { - let this = self.clone(); - tokio::task::spawn_blocking(move || this.get_uuid(name)).await? - } - - async fn delete(&self, uid: String) -> Result> { - let this = self.clone(); - tokio::task::spawn_blocking(move || this.delete(uid)).await? - } - - async fn list(&self) -> Result> { - let this = self.clone(); - tokio::task::spawn_blocking(move || this.list()).await? - } - - async fn insert(&self, name: String, uuid: Uuid) -> Result<()> { - let this = self.clone(); - tokio::task::spawn_blocking(move || this.insert(name, uuid)).await? - } - - async fn snapshot(&self, path: PathBuf) -> Result> { - let this = self.clone(); - tokio::task::spawn_blocking(move || this.snapshot(path)).await? - } - - async fn get_size(&self) -> Result { - self.get_size() - } - - async fn dump(&self, path: PathBuf) -> Result> { - let this = self.clone(); - tokio::task::spawn_blocking(move || this.dump(path)).await? - } -}