diff --git a/meilisearch-http/src/index_controller/index_actor.rs b/meilisearch-http/src/index_controller/index_actor.rs deleted file mode 100644 index 9c4feef8a..000000000 --- a/meilisearch-http/src/index_controller/index_actor.rs +++ /dev/null @@ -1,657 +0,0 @@ -use std::collections::HashMap; -use std::fs::{create_dir_all, File}; -use std::future::Future; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use async_stream::stream; -use chrono::{DateTime, Utc}; -use futures::pin_mut; -use futures::stream::StreamExt; -use heed::{CompactionOption, EnvOpenOptions}; -use log::debug; -use serde::{Deserialize, Serialize}; -use thiserror::Error; -use tokio::fs::remove_dir_all; -use tokio::sync::{mpsc, oneshot, RwLock}; -use tokio::task::spawn_blocking; -use uuid::Uuid; - -use super::update_handler::UpdateHandler; -use super::{get_arc_ownership_blocking, IndexSettings}; -use crate::index::UpdateResult as UResult; -use crate::index::{Document, Index, SearchQuery, SearchResult, Settings}; -use crate::index_controller::{ - updates::{Failed, Processed, Processing}, - UpdateMeta, -}; -use crate::option::IndexerOpts; - -pub type Result = std::result::Result; -type AsyncMap = Arc>>; -type UpdateResult = std::result::Result, Failed>; - -#[derive(Debug, Serialize, Deserialize, Clone)] -#[serde(rename_all = "camelCase")] -pub struct IndexMeta { - created_at: DateTime, - updated_at: DateTime, - primary_key: Option, -} - -impl IndexMeta { - fn new(index: &Index) -> Result { - let txn = index.read_txn()?; - Self::new_txn(index, &txn) - } - - fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result { - let created_at = index.created_at(&txn)?; - let updated_at = index.updated_at(&txn)?; - let primary_key = index.primary_key(&txn)?.map(String::from); - Ok(Self { - primary_key, - updated_at, - created_at, - }) - } -} - -enum IndexMsg { - CreateIndex { - uuid: Uuid, - primary_key: Option, - ret: oneshot::Sender>, - }, - Update { - meta: Processing, - data: std::fs::File, - ret: oneshot::Sender>, - }, - Search { - uuid: Uuid, - query: SearchQuery, - ret: oneshot::Sender>, - }, - Settings { - uuid: Uuid, - ret: oneshot::Sender>, - }, - Documents { - uuid: Uuid, - attributes_to_retrieve: Option>, - offset: usize, - limit: usize, - ret: oneshot::Sender>>, - }, - Document { - uuid: Uuid, - attributes_to_retrieve: Option>, - doc_id: String, - ret: oneshot::Sender>, - }, - Delete { - uuid: Uuid, - ret: oneshot::Sender>, - }, - GetMeta { - uuid: Uuid, - ret: oneshot::Sender>, - }, - UpdateIndex { - uuid: Uuid, - index_settings: IndexSettings, - ret: oneshot::Sender>, - }, - Snapshot { - uuid: Uuid, - path: PathBuf, - ret: oneshot::Sender>, - }, -} - -struct IndexActor { - read_receiver: Option>, - write_receiver: Option>, - update_handler: Arc, - store: S, -} - -#[derive(Error, Debug)] -pub enum IndexError { - #[error("error with index: {0}")] - Error(#[from] anyhow::Error), - #[error("index already exists")] - IndexAlreadyExists, - #[error("Index doesn't exists")] - UnexistingIndex, - #[error("Heed error: {0}")] - HeedError(#[from] heed::Error), - #[error("Existing primary key")] - ExistingPrimaryKey, -} - -#[async_trait::async_trait] -trait IndexStore { - async fn create(&self, uuid: Uuid, primary_key: Option) -> Result; - async fn get(&self, uuid: Uuid) -> Result>; - async fn delete(&self, uuid: Uuid) -> Result>; -} - -impl IndexActor { - fn new( - read_receiver: mpsc::Receiver, - write_receiver: mpsc::Receiver, - store: S, - ) -> Result { - let options = IndexerOpts::default(); - let update_handler = UpdateHandler::new(&options).map_err(IndexError::Error)?; - let update_handler = Arc::new(update_handler); - let read_receiver = Some(read_receiver); - let write_receiver = Some(write_receiver); - Ok(Self { - read_receiver, - write_receiver, - store, - update_handler, - }) - } - - /// `run` poll the write_receiver and read_receiver concurrently, but while messages send - /// through the read channel are processed concurrently, the messages sent through the write - /// channel are processed one at a time. - async fn run(mut self) { - let mut read_receiver = self - .read_receiver - .take() - .expect("Index Actor must have a inbox at this point."); - - let read_stream = stream! { - loop { - match read_receiver.recv().await { - Some(msg) => yield msg, - None => break, - } - } - }; - - let mut write_receiver = self - .write_receiver - .take() - .expect("Index Actor must have a inbox at this point."); - - let write_stream = stream! { - loop { - match write_receiver.recv().await { - Some(msg) => yield msg, - None => break, - } - } - }; - - pin_mut!(write_stream); - pin_mut!(read_stream); - - let fut1 = read_stream.for_each_concurrent(Some(10), |msg| self.handle_message(msg)); - let fut2 = write_stream.for_each_concurrent(Some(1), |msg| self.handle_message(msg)); - - let fut1: Box + Unpin + Send> = Box::new(fut1); - let fut2: Box + Unpin + Send> = Box::new(fut2); - - tokio::join!(fut1, fut2); - } - - async fn handle_message(&self, msg: IndexMsg) { - use IndexMsg::*; - match msg { - CreateIndex { - uuid, - primary_key, - ret, - } => { - let _ = ret.send(self.handle_create_index(uuid, primary_key).await); - } - Update { ret, meta, data } => { - let _ = ret.send(self.handle_update(meta, data).await); - } - Search { ret, query, uuid } => { - let _ = ret.send(self.handle_search(uuid, query).await); - } - Settings { ret, uuid } => { - let _ = ret.send(self.handle_settings(uuid).await); - } - Documents { - ret, - uuid, - attributes_to_retrieve, - offset, - limit, - } => { - let _ = ret.send( - self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve) - .await, - ); - } - Document { - uuid, - attributes_to_retrieve, - doc_id, - ret, - } => { - let _ = ret.send( - self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve) - .await, - ); - } - Delete { uuid, ret } => { - let _ = ret.send(self.handle_delete(uuid).await); - } - GetMeta { uuid, ret } => { - let _ = ret.send(self.handle_get_meta(uuid).await); - } - UpdateIndex { - uuid, - index_settings, - ret, - } => { - let _ = ret.send(self.handle_update_index(uuid, index_settings).await); - } - Snapshot { uuid, path, ret } => { - let _ = ret.send(self.handle_snapshot(uuid, path).await); - } - } - } - - async fn handle_search(&self, uuid: Uuid, query: SearchQuery) -> anyhow::Result { - let index = self - .store - .get(uuid) - .await? - .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || index.perform_search(query)).await? - } - - async fn handle_create_index( - &self, - uuid: Uuid, - primary_key: Option, - ) -> Result { - let index = self.store.create(uuid, primary_key).await?; - let meta = spawn_blocking(move || IndexMeta::new(&index)) - .await - .map_err(|e| IndexError::Error(e.into()))??; - Ok(meta) - } - - async fn handle_update( - &self, - meta: Processing, - data: File, - ) -> Result { - log::info!("Processing update {}", meta.id()); - let uuid = meta.index_uuid(); - let update_handler = self.update_handler.clone(); - let index = match self.store.get(*uuid).await? { - Some(index) => index, - None => self.store.create(*uuid, None).await?, - }; - spawn_blocking(move || update_handler.handle_update(meta, data, index)) - .await - .map_err(|e| IndexError::Error(e.into())) - } - - async fn handle_settings(&self, uuid: Uuid) -> Result { - let index = self - .store - .get(uuid) - .await? - .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || index.settings().map_err(IndexError::Error)) - .await - .map_err(|e| IndexError::Error(e.into()))? - } - - async fn handle_fetch_documents( - &self, - uuid: Uuid, - offset: usize, - limit: usize, - attributes_to_retrieve: Option>, - ) -> Result> { - let index = self - .store - .get(uuid) - .await? - .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || { - index - .retrieve_documents(offset, limit, attributes_to_retrieve) - .map_err(IndexError::Error) - }) - .await - .map_err(|e| IndexError::Error(e.into()))? - } - - async fn handle_fetch_document( - &self, - uuid: Uuid, - doc_id: String, - attributes_to_retrieve: Option>, - ) -> Result { - let index = self - .store - .get(uuid) - .await? - .ok_or(IndexError::UnexistingIndex)?; - spawn_blocking(move || { - index - .retrieve_document(doc_id, attributes_to_retrieve) - .map_err(IndexError::Error) - }) - .await - .map_err(|e| IndexError::Error(e.into()))? - } - - async fn handle_delete(&self, uuid: Uuid) -> Result<()> { - let index = self.store.delete(uuid).await?; - - if let Some(index) = index { - tokio::task::spawn(async move { - let index = index.0; - let store = get_arc_ownership_blocking(index).await; - spawn_blocking(move || { - store.prepare_for_closing().wait(); - debug!("Index closed"); - }); - }); - } - - Ok(()) - } - - async fn handle_get_meta(&self, uuid: Uuid) -> Result { - match self.store.get(uuid).await? { - Some(index) => { - let meta = spawn_blocking(move || IndexMeta::new(&index)) - .await - .map_err(|e| IndexError::Error(e.into()))??; - Ok(meta) - } - None => Err(IndexError::UnexistingIndex), - } - } - - async fn handle_update_index( - &self, - uuid: Uuid, - index_settings: IndexSettings, - ) -> Result { - let index = self - .store - .get(uuid) - .await? - .ok_or(IndexError::UnexistingIndex)?; - - spawn_blocking(move || match index_settings.primary_key { - Some(ref primary_key) => { - let mut txn = index.write_txn()?; - if index.primary_key(&txn)?.is_some() { - return Err(IndexError::ExistingPrimaryKey); - } - index.put_primary_key(&mut txn, primary_key)?; - let meta = IndexMeta::new_txn(&index, &txn)?; - txn.commit()?; - Ok(meta) - } - None => { - let meta = IndexMeta::new(&index)?; - Ok(meta) - } - }) - .await - .map_err(|e| IndexError::Error(e.into()))? - } - - async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> Result<()> { - use tokio::fs::create_dir_all; - - path.push("indexes"); - create_dir_all(&path) - .await - .map_err(|e| IndexError::Error(e.into()))?; - - if let Some(index) = self.store.get(uuid).await? { - let mut index_path = path.join(format!("index-{}", uuid)); - create_dir_all(&index_path) - .await - .map_err(|e| IndexError::Error(e.into()))?; - index_path.push("data.mdb"); - spawn_blocking(move || -> anyhow::Result<()> { - // Get write txn to wait for ongoing write transaction before snapshot. - let _txn = index.write_txn()?; - index - .env - .copy_to_path(index_path, CompactionOption::Enabled)?; - Ok(()) - }) - .await - .map_err(|e| IndexError::Error(e.into()))? - .map_err(|e| IndexError::Error(e.into()))?; - } - - Ok(()) - } -} - -#[derive(Clone)] -pub struct IndexActorHandle { - read_sender: mpsc::Sender, - write_sender: mpsc::Sender, -} - -impl IndexActorHandle { - pub fn new(path: impl AsRef, index_size: usize) -> anyhow::Result { - let (read_sender, read_receiver) = mpsc::channel(100); - let (write_sender, write_receiver) = mpsc::channel(100); - - let store = HeedIndexStore::new(path, index_size); - let actor = IndexActor::new(read_receiver, write_receiver, store)?; - tokio::task::spawn(actor.run()); - Ok(Self { - read_sender, - write_sender, - }) - } - - pub async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::CreateIndex { - ret, - uuid, - primary_key, - }; - let _ = self.read_sender.send(msg).await; - receiver.await.expect("IndexActor has been killed") - } - - pub async fn update( - &self, - meta: Processing, - data: std::fs::File, - ) -> anyhow::Result { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Update { ret, meta, data }; - let _ = self.write_sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - pub async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Search { uuid, query, ret }; - let _ = self.read_sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - pub async fn settings(&self, uuid: Uuid) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Settings { uuid, ret }; - let _ = self.read_sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - pub async fn documents( - &self, - uuid: Uuid, - offset: usize, - limit: usize, - attributes_to_retrieve: Option>, - ) -> Result> { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Documents { - uuid, - ret, - offset, - attributes_to_retrieve, - limit, - }; - let _ = self.read_sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - pub async fn document( - &self, - uuid: Uuid, - doc_id: String, - attributes_to_retrieve: Option>, - ) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Document { - uuid, - ret, - doc_id, - attributes_to_retrieve, - }; - let _ = self.read_sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - pub async fn delete(&self, uuid: Uuid) -> Result<()> { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Delete { uuid, ret }; - let _ = self.write_sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - pub async fn get_index_meta(&self, uuid: Uuid) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::GetMeta { uuid, ret }; - let _ = self.read_sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - pub async fn update_index( - &self, - uuid: Uuid, - index_settings: IndexSettings, - ) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::UpdateIndex { - uuid, - index_settings, - ret, - }; - let _ = self.read_sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } - - pub async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { - let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Snapshot { uuid, path, ret }; - let _ = self.read_sender.send(msg).await; - Ok(receiver.await.expect("IndexActor has been killed")?) - } -} - -struct HeedIndexStore { - index_store: AsyncMap, - path: PathBuf, - index_size: usize, -} - -impl HeedIndexStore { - fn new(path: impl AsRef, index_size: usize) -> Self { - let path = path.as_ref().join("indexes/"); - let index_store = Arc::new(RwLock::new(HashMap::new())); - Self { - index_store, - path, - index_size, - } - } -} - -#[async_trait::async_trait] -impl IndexStore for HeedIndexStore { - async fn create(&self, uuid: Uuid, primary_key: Option) -> Result { - let path = self.path.join(format!("index-{}", uuid)); - if path.exists() { - return Err(IndexError::IndexAlreadyExists); - } - - let index_size = self.index_size; - let index = spawn_blocking(move || -> Result { - let index = open_index(&path, index_size)?; - if let Some(primary_key) = primary_key { - let mut txn = index.write_txn()?; - index.put_primary_key(&mut txn, &primary_key)?; - txn.commit()?; - } - Ok(index) - }) - .await - .map_err(|e| IndexError::Error(e.into()))??; - - self.index_store.write().await.insert(uuid, index.clone()); - - Ok(index) - } - - async fn get(&self, uuid: Uuid) -> Result> { - let guard = self.index_store.read().await; - match guard.get(&uuid) { - Some(index) => Ok(Some(index.clone())), - None => { - // drop the guard here so we can perform the write after without deadlocking; - drop(guard); - let path = self.path.join(format!("index-{}", uuid)); - if !path.exists() { - return Ok(None); - } - - let index_size = self.index_size; - let index = spawn_blocking(move || open_index(path, index_size)) - .await - .map_err(|e| IndexError::Error(e.into()))??; - self.index_store.write().await.insert(uuid, index.clone()); - Ok(Some(index)) - } - } - } - - async fn delete(&self, uuid: Uuid) -> Result> { - let db_path = self.path.join(format!("index-{}", uuid)); - remove_dir_all(db_path) - .await - .map_err(|e| IndexError::Error(e.into()))?; - let index = self.index_store.write().await.remove(&uuid); - Ok(index) - } -} - -fn open_index(path: impl AsRef, size: usize) -> Result { - create_dir_all(&path).map_err(|e| IndexError::Error(e.into()))?; - let mut options = EnvOpenOptions::new(); - options.map_size(size); - let index = milli::Index::new(options, &path).map_err(IndexError::Error)?; - Ok(Index(Arc::new(index))) -} diff --git a/meilisearch-http/src/index_controller/index_actor/actor.rs b/meilisearch-http/src/index_controller/index_actor/actor.rs new file mode 100644 index 000000000..89831b53a --- /dev/null +++ b/meilisearch-http/src/index_controller/index_actor/actor.rs @@ -0,0 +1,331 @@ +use std::fs::File; +use std::future::Future; +use std::path::PathBuf; +use std::sync::Arc; + +use async_stream::stream; +use futures::pin_mut; +use futures::stream::StreamExt; +use heed::CompactionOption; +use log::debug; +use tokio::sync::mpsc; +use tokio::task::spawn_blocking; +use uuid::Uuid; + +use crate::index::{Document, SearchQuery, SearchResult, Settings}; +use crate::index_controller::update_handler::UpdateHandler; +use crate::index_controller::{updates::Processing, UpdateMeta, get_arc_ownership_blocking}; +use crate::option::IndexerOpts; +use super::{IndexSettings, Result, IndexMsg, IndexStore, IndexError, UpdateResult, IndexMeta}; + +pub struct IndexActor { + read_receiver: Option>, + write_receiver: Option>, + update_handler: Arc, + store: S, +} + +impl IndexActor { + pub fn new( + read_receiver: mpsc::Receiver, + write_receiver: mpsc::Receiver, + store: S, + ) -> Result { + let options = IndexerOpts::default(); + let update_handler = UpdateHandler::new(&options).map_err(IndexError::Error)?; + let update_handler = Arc::new(update_handler); + let read_receiver = Some(read_receiver); + let write_receiver = Some(write_receiver); + Ok(Self { + read_receiver, + write_receiver, + store, + update_handler, + }) + } + + /// `run` poll the write_receiver and read_receiver concurrently, but while messages send + /// through the read channel are processed concurrently, the messages sent through the write + /// channel are processed one at a time. + pub async fn run(mut self) { + let mut read_receiver = self + .read_receiver + .take() + .expect("Index Actor must have a inbox at this point."); + + let read_stream = stream! { + loop { + match read_receiver.recv().await { + Some(msg) => yield msg, + None => break, + } + } + }; + + let mut write_receiver = self + .write_receiver + .take() + .expect("Index Actor must have a inbox at this point."); + + let write_stream = stream! { + loop { + match write_receiver.recv().await { + Some(msg) => yield msg, + None => break, + } + } + }; + + pin_mut!(write_stream); + pin_mut!(read_stream); + + let fut1 = read_stream.for_each_concurrent(Some(10), |msg| self.handle_message(msg)); + let fut2 = write_stream.for_each_concurrent(Some(1), |msg| self.handle_message(msg)); + + let fut1: Box + Unpin + Send> = Box::new(fut1); + let fut2: Box + Unpin + Send> = Box::new(fut2); + + tokio::join!(fut1, fut2); + } + + async fn handle_message(&self, msg: IndexMsg) { + use IndexMsg::*; + match msg { + CreateIndex { + uuid, + primary_key, + ret, + } => { + let _ = ret.send(self.handle_create_index(uuid, primary_key).await); + } + Update { ret, meta, data } => { + let _ = ret.send(self.handle_update(meta, data).await); + } + Search { ret, query, uuid } => { + let _ = ret.send(self.handle_search(uuid, query).await); + } + Settings { ret, uuid } => { + let _ = ret.send(self.handle_settings(uuid).await); + } + Documents { + ret, + uuid, + attributes_to_retrieve, + offset, + limit, + } => { + let _ = ret.send( + self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve) + .await, + ); + } + Document { + uuid, + attributes_to_retrieve, + doc_id, + ret, + } => { + let _ = ret.send( + self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve) + .await, + ); + } + Delete { uuid, ret } => { + let _ = ret.send(self.handle_delete(uuid).await); + } + GetMeta { uuid, ret } => { + let _ = ret.send(self.handle_get_meta(uuid).await); + } + UpdateIndex { + uuid, + index_settings, + ret, + } => { + let _ = ret.send(self.handle_update_index(uuid, index_settings).await); + } + Snapshot { uuid, path, ret } => { + let _ = ret.send(self.handle_snapshot(uuid, path).await); + } + } + } + + async fn handle_search(&self, uuid: Uuid, query: SearchQuery) -> anyhow::Result { + let index = self + .store + .get(uuid) + .await? + .ok_or(IndexError::UnexistingIndex)?; + spawn_blocking(move || index.perform_search(query)).await? + } + + async fn handle_create_index( + &self, + uuid: Uuid, + primary_key: Option, + ) -> Result { + let index = self.store.create(uuid, primary_key).await?; + let meta = spawn_blocking(move || IndexMeta::new(&index)) + .await + .map_err(|e| IndexError::Error(e.into()))??; + Ok(meta) + } + + async fn handle_update( + &self, + meta: Processing, + data: File, + ) -> Result { + debug!("Processing update {}", meta.id()); + let uuid = meta.index_uuid(); + let update_handler = self.update_handler.clone(); + let index = match self.store.get(*uuid).await? { + Some(index) => index, + None => self.store.create(*uuid, None).await?, + }; + spawn_blocking(move || update_handler.handle_update(meta, data, index)) + .await + .map_err(|e| IndexError::Error(e.into())) + } + + async fn handle_settings(&self, uuid: Uuid) -> Result { + let index = self + .store + .get(uuid) + .await? + .ok_or(IndexError::UnexistingIndex)?; + spawn_blocking(move || index.settings().map_err(IndexError::Error)) + .await + .map_err(|e| IndexError::Error(e.into()))? + } + + async fn handle_fetch_documents( + &self, + uuid: Uuid, + offset: usize, + limit: usize, + attributes_to_retrieve: Option>, + ) -> Result> { + let index = self + .store + .get(uuid) + .await? + .ok_or(IndexError::UnexistingIndex)?; + spawn_blocking(move || { + index + .retrieve_documents(offset, limit, attributes_to_retrieve) + .map_err(IndexError::Error) + }) + .await + .map_err(|e| IndexError::Error(e.into()))? + } + + async fn handle_fetch_document( + &self, + uuid: Uuid, + doc_id: String, + attributes_to_retrieve: Option>, + ) -> Result { + let index = self + .store + .get(uuid) + .await? + .ok_or(IndexError::UnexistingIndex)?; + spawn_blocking(move || { + index + .retrieve_document(doc_id, attributes_to_retrieve) + .map_err(IndexError::Error) + }) + .await + .map_err(|e| IndexError::Error(e.into()))? + } + + async fn handle_delete(&self, uuid: Uuid) -> Result<()> { + let index = self.store.delete(uuid).await?; + + if let Some(index) = index { + tokio::task::spawn(async move { + let index = index.0; + let store = get_arc_ownership_blocking(index).await; + spawn_blocking(move || { + store.prepare_for_closing().wait(); + debug!("Index closed"); + }); + }); + } + + Ok(()) + } + + async fn handle_get_meta(&self, uuid: Uuid) -> Result { + match self.store.get(uuid).await? { + Some(index) => { + let meta = spawn_blocking(move || IndexMeta::new(&index)) + .await + .map_err(|e| IndexError::Error(e.into()))??; + Ok(meta) + } + None => Err(IndexError::UnexistingIndex), + } + } + + async fn handle_update_index( + &self, + uuid: Uuid, + index_settings: IndexSettings, + ) -> Result { + let index = self + .store + .get(uuid) + .await? + .ok_or(IndexError::UnexistingIndex)?; + + spawn_blocking(move || match index_settings.primary_key { + Some(ref primary_key) => { + let mut txn = index.write_txn()?; + if index.primary_key(&txn)?.is_some() { + return Err(IndexError::ExistingPrimaryKey); + } + index.put_primary_key(&mut txn, primary_key)?; + let meta = IndexMeta::new_txn(&index, &txn)?; + txn.commit()?; + Ok(meta) + } + None => { + let meta = IndexMeta::new(&index)?; + Ok(meta) + } + }) + .await + .map_err(|e| IndexError::Error(e.into()))? + } + + async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> Result<()> { + use tokio::fs::create_dir_all; + + path.push("indexes"); + create_dir_all(&path) + .await + .map_err(|e| IndexError::Error(e.into()))?; + + if let Some(index) = self.store.get(uuid).await? { + let mut index_path = path.join(format!("index-{}", uuid)); + create_dir_all(&index_path) + .await + .map_err(|e| IndexError::Error(e.into()))?; + index_path.push("data.mdb"); + spawn_blocking(move || -> anyhow::Result<()> { + // Get write txn to wait for ongoing write transaction before snapshot. + let _txn = index.write_txn()?; + index + .env + .copy_to_path(index_path, CompactionOption::Enabled)?; + Ok(()) + }) + .await + .map_err(|e| IndexError::Error(e.into()))? + .map_err(|e| IndexError::Error(e.into()))?; + } + + Ok(()) + } +} diff --git a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs new file mode 100644 index 000000000..9c43bd6e7 --- /dev/null +++ b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs @@ -0,0 +1,141 @@ +use std::path::{PathBuf, Path}; + +use tokio::sync::{mpsc, oneshot}; +use uuid::Uuid; + +use crate::index::{Document, SearchQuery, SearchResult, Settings}; +use crate::index_controller::IndexSettings; +use crate::index_controller::{updates::Processing, UpdateMeta}; +use super::{IndexActorHandle, IndexMsg, IndexMeta, UpdateResult, Result, IndexActor, MapIndexStore}; + +#[derive(Clone)] +pub struct IndexActorHandleImpl { + read_sender: mpsc::Sender, + write_sender: mpsc::Sender, +} + +#[async_trait::async_trait] +impl IndexActorHandle for IndexActorHandleImpl { + async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::CreateIndex { + ret, + uuid, + primary_key, + }; + let _ = self.read_sender.send(msg).await; + receiver.await.expect("IndexActor has been killed") + } + + async fn update( + &self, + meta: Processing, + data: std::fs::File, + ) -> anyhow::Result { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Update { ret, meta, data }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } + + async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Search { uuid, query, ret }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } + + async fn settings(&self, uuid: Uuid) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Settings { uuid, ret }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } + + async fn documents( + &self, + uuid: Uuid, + offset: usize, + limit: usize, + attributes_to_retrieve: Option>, + ) -> Result> { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Documents { + uuid, + ret, + offset, + attributes_to_retrieve, + limit, + }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } + + async fn document( + &self, + uuid: Uuid, + doc_id: String, + attributes_to_retrieve: Option>, + ) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Document { + uuid, + ret, + doc_id, + attributes_to_retrieve, + }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } + + async fn delete(&self, uuid: Uuid) -> Result<()> { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Delete { uuid, ret }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } + + async fn get_index_meta(&self, uuid: Uuid) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::GetMeta { uuid, ret }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } + + async fn update_index( + &self, + uuid: Uuid, + index_settings: IndexSettings, + ) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::UpdateIndex { + uuid, + index_settings, + ret, + }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } + + async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Snapshot { uuid, path, ret }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } +} + +impl IndexActorHandleImpl { + pub fn new(path: impl AsRef, index_size: usize) -> anyhow::Result { + let (read_sender, read_receiver) = mpsc::channel(100); + let (write_sender, write_receiver) = mpsc::channel(100); + + let store = MapIndexStore::new(path, index_size); + let actor = IndexActor::new(read_receiver, write_receiver, store)?; + tokio::task::spawn(actor.run()); + Ok(Self { + read_sender, + write_sender, + }) + } +} diff --git a/meilisearch-http/src/index_controller/index_actor/message.rs b/meilisearch-http/src/index_controller/index_actor/message.rs new file mode 100644 index 000000000..66edb5b77 --- /dev/null +++ b/meilisearch-http/src/index_controller/index_actor/message.rs @@ -0,0 +1,64 @@ +use std::path::PathBuf; + +use tokio::sync::oneshot; +use uuid::Uuid; + +use crate::index::{Document, SearchQuery, SearchResult, Settings}; +use crate::index_controller::{ + updates::Processing, + UpdateMeta, +}; +use super::{IndexSettings, IndexMeta, UpdateResult, Result}; + +pub enum IndexMsg { + CreateIndex { + uuid: Uuid, + primary_key: Option, + ret: oneshot::Sender>, + }, + Update { + meta: Processing, + data: std::fs::File, + ret: oneshot::Sender>, + }, + Search { + uuid: Uuid, + query: SearchQuery, + ret: oneshot::Sender>, + }, + Settings { + uuid: Uuid, + ret: oneshot::Sender>, + }, + Documents { + uuid: Uuid, + attributes_to_retrieve: Option>, + offset: usize, + limit: usize, + ret: oneshot::Sender>>, + }, + Document { + uuid: Uuid, + attributes_to_retrieve: Option>, + doc_id: String, + ret: oneshot::Sender>, + }, + Delete { + uuid: Uuid, + ret: oneshot::Sender>, + }, + GetMeta { + uuid: Uuid, + ret: oneshot::Sender>, + }, + UpdateIndex { + uuid: Uuid, + index_settings: IndexSettings, + ret: oneshot::Sender>, + }, + Snapshot { + uuid: Uuid, + path: PathBuf, + ret: oneshot::Sender>, + }, +} diff --git a/meilisearch-http/src/index_controller/index_actor/mod.rs b/meilisearch-http/src/index_controller/index_actor/mod.rs new file mode 100644 index 000000000..dac7ef06c --- /dev/null +++ b/meilisearch-http/src/index_controller/index_actor/mod.rs @@ -0,0 +1,103 @@ +mod actor; +mod handle_impl; +mod message; +mod store; + +use std::path::PathBuf; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use uuid::Uuid; + +use super::IndexSettings; +use crate::index::UpdateResult as UResult; +use crate::index::{Document, Index, SearchQuery, SearchResult, Settings}; +use crate::index_controller::{ + updates::{Failed, Processed, Processing}, + UpdateMeta, +}; +use message::IndexMsg; +use store::{IndexStore, MapIndexStore}; +use actor::IndexActor; + +pub use handle_impl::IndexActorHandleImpl; + +pub type Result = std::result::Result; +type UpdateResult = std::result::Result, Failed>; + +#[derive(Debug, Serialize, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct IndexMeta { + created_at: DateTime, + updated_at: DateTime, + primary_key: Option, +} + +impl IndexMeta { + fn new(index: &Index) -> Result { + let txn = index.read_txn()?; + Self::new_txn(index, &txn) + } + + fn new_txn(index: &Index, txn: &heed::RoTxn) -> Result { + let created_at = index.created_at(&txn)?; + let updated_at = index.updated_at(&txn)?; + let primary_key = index.primary_key(&txn)?.map(String::from); + Ok(Self { + primary_key, + updated_at, + created_at, + }) + } +} + +#[derive(Error, Debug)] +pub enum IndexError { + #[error("error with index: {0}")] + Error(#[from] anyhow::Error), + #[error("index already exists")] + IndexAlreadyExists, + #[error("Index doesn't exists")] + UnexistingIndex, + #[error("Heed error: {0}")] + HeedError(#[from] heed::Error), + #[error("Existing primary key")] + ExistingPrimaryKey, +} + + +#[async_trait::async_trait] +pub trait IndexActorHandle: Sync + Send + Clone { + async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result; + async fn update( + &self, + meta: Processing, + data: std::fs::File, + ) -> anyhow::Result; + async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result; + async fn settings(&self, uuid: Uuid) -> Result; + + async fn documents( + &self, + uuid: Uuid, + offset: usize, + limit: usize, + attributes_to_retrieve: Option>, + ) -> Result>; + async fn document( + &self, + uuid: Uuid, + doc_id: String, + attributes_to_retrieve: Option>, + ) -> Result; + async fn delete(&self, uuid: Uuid) -> Result<()>; + async fn get_index_meta(&self, uuid: Uuid) -> Result; + async fn update_index( + &self, + uuid: Uuid, + index_settings: IndexSettings, + ) -> Result; + async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>; +} + diff --git a/meilisearch-http/src/index_controller/index_actor/store.rs b/meilisearch-http/src/index_controller/index_actor/store.rs new file mode 100644 index 000000000..a9f3cd479 --- /dev/null +++ b/meilisearch-http/src/index_controller/index_actor/store.rs @@ -0,0 +1,105 @@ +use std::path::{PathBuf, Path}; +use std::sync::Arc; +use std::collections::HashMap; + +use uuid::Uuid; +use tokio::sync::RwLock; +use tokio::task::spawn_blocking; +use tokio::fs; +use heed::EnvOpenOptions; + +use super::{IndexError, Result}; +use crate::index::Index; + +type AsyncMap = Arc>>; + +#[async_trait::async_trait] +pub trait IndexStore { + async fn create(&self, uuid: Uuid, primary_key: Option) -> Result; + async fn get(&self, uuid: Uuid) -> Result>; + async fn delete(&self, uuid: Uuid) -> Result>; +} + +pub struct MapIndexStore { + index_store: AsyncMap, + path: PathBuf, + index_size: usize, +} + +impl MapIndexStore { + pub fn new(path: impl AsRef, index_size: usize) -> Self { + let path = path.as_ref().join("indexes/"); + let index_store = Arc::new(RwLock::new(HashMap::new())); + Self { + index_store, + path, + index_size, + } + } +} + +#[async_trait::async_trait] +impl IndexStore for MapIndexStore { + async fn create(&self, uuid: Uuid, primary_key: Option) -> Result { + let path = self.path.join(format!("index-{}", uuid)); + if path.exists() { + return Err(IndexError::IndexAlreadyExists); + } + + let index_size = self.index_size; + let index = spawn_blocking(move || -> Result { + let index = open_index(&path, index_size)?; + if let Some(primary_key) = primary_key { + let mut txn = index.write_txn()?; + index.put_primary_key(&mut txn, &primary_key)?; + txn.commit()?; + } + Ok(index) + }) + .await + .map_err(|e| IndexError::Error(e.into()))??; + + self.index_store.write().await.insert(uuid, index.clone()); + + Ok(index) + } + + async fn get(&self, uuid: Uuid) -> Result> { + let guard = self.index_store.read().await; + match guard.get(&uuid) { + Some(index) => Ok(Some(index.clone())), + None => { + // drop the guard here so we can perform the write after without deadlocking; + drop(guard); + let path = self.path.join(format!("index-{}", uuid)); + if !path.exists() { + return Ok(None); + } + + let index_size = self.index_size; + let index = spawn_blocking(move || open_index(path, index_size)) + .await + .map_err(|e| IndexError::Error(e.into()))??; + self.index_store.write().await.insert(uuid, index.clone()); + Ok(Some(index)) + } + } + } + + async fn delete(&self, uuid: Uuid) -> Result> { + let db_path = self.path.join(format!("index-{}", uuid)); + fs::remove_dir_all(db_path) + .await + .map_err(|e| IndexError::Error(e.into()))?; + let index = self.index_store.write().await.remove(&uuid); + Ok(index) + } +} + +fn open_index(path: impl AsRef, size: usize) -> Result { + std::fs::create_dir_all(&path).map_err(|e| IndexError::Error(e.into()))?; + let mut options = EnvOpenOptions::new(); + options.map_size(size); + let index = milli::Index::new(options, &path).map_err(IndexError::Error)?; + Ok(Index(Arc::new(index))) +} diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index 9fab9088a..bcd0cd8fe 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -2,7 +2,6 @@ mod index_actor; mod snapshot; mod update_actor; mod update_handler; -mod update_store; mod updates; mod uuid_resolver; @@ -22,6 +21,9 @@ use crate::index::{Document, SearchQuery, SearchResult}; use crate::index::{Facets, Settings, UpdateResult}; use crate::option::Opt; use crate::helpers::compression; +use index_actor::IndexActorHandle; +use update_actor::UpdateActorHandle; +use uuid_resolver::UuidResolverHandle; use snapshot::SnapshotService; pub use updates::{Failed, Processed, Processing}; @@ -58,9 +60,9 @@ pub struct IndexSettings { } pub struct IndexController { - uuid_resolver: uuid_resolver::UuidResolverHandle, - index_handle: index_actor::IndexActorHandle, - update_handle: update_actor::UpdateActorHandle, + uuid_resolver: uuid_resolver::UuidResolverHandleImpl, + index_handle: index_actor::IndexActorHandleImpl, + update_handle: update_actor::UpdateActorHandleImpl, } impl IndexController { @@ -72,9 +74,9 @@ impl IndexController { compression::from_tar_gz(path, &options.db_path)?; } - let uuid_resolver = uuid_resolver::UuidResolverHandle::new(&path)?; - let index_handle = index_actor::IndexActorHandle::new(&path, index_size)?; - let update_handle = update_actor::UpdateActorHandle::new( + let uuid_resolver = uuid_resolver::UuidResolverHandleImpl::new(&path)?; + let index_handle = index_actor::IndexActorHandleImpl::new(&path, index_size)?; + let update_handle = update_actor::UpdateActorHandleImpl::new( index_handle.clone(), &path, update_store_size, @@ -82,7 +84,6 @@ impl IndexController { if options.schedule_snapshot { let snapshot_service = SnapshotService::new( - index_handle.clone(), uuid_resolver.clone(), update_handle.clone(), Duration::from_secs(options.snapshot_interval_sec), diff --git a/meilisearch-http/src/index_controller/snapshot.rs b/meilisearch-http/src/index_controller/snapshot.rs index b3a96baa7..afdcdaf23 100644 --- a/meilisearch-http/src/index_controller/snapshot.rs +++ b/meilisearch-http/src/index_controller/snapshot.rs @@ -8,29 +8,29 @@ use tokio::task::spawn_blocking; use tokio::time::sleep; use crate::helpers::compression; -use super::index_actor::IndexActorHandle; use super::update_actor::UpdateActorHandle; use super::uuid_resolver::UuidResolverHandle; #[allow(dead_code)] -pub struct SnapshotService { - index_handle: IndexActorHandle, - uuid_resolver_handle: UuidResolverHandle, - update_handle: UpdateActorHandle, +pub struct SnapshotService { + uuid_resolver_handle: R, + update_handle: U, snapshot_period: Duration, snapshot_path: PathBuf, } -impl SnapshotService { +impl SnapshotService +where + U: UpdateActorHandle, + R: UuidResolverHandle +{ pub fn new( - index_handle: IndexActorHandle, - uuid_resolver_handle: UuidResolverHandle, - update_handle: UpdateActorHandle, + uuid_resolver_handle: R, + update_handle: U, snapshot_period: Duration, snapshot_path: PathBuf, ) -> Self { Self { - index_handle, uuid_resolver_handle, update_handle, snapshot_period, diff --git a/meilisearch-http/src/index_controller/update_actor.rs b/meilisearch-http/src/index_controller/update_actor.rs deleted file mode 100644 index 905bc805f..000000000 --- a/meilisearch-http/src/index_controller/update_actor.rs +++ /dev/null @@ -1,455 +0,0 @@ -use std::collections::{hash_map::Entry, HashMap}; -use std::fs; -use std::io::SeekFrom; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use super::index_actor::IndexActorHandle; -use log::info; -use oxidized_json_checker::JsonChecker; -use thiserror::Error; -use tokio::fs::OpenOptions; -use tokio::io::{AsyncSeekExt, AsyncWriteExt}; -use tokio::sync::{mpsc, oneshot, RwLock}; -use uuid::Uuid; - -use super::get_arc_ownership_blocking; -use crate::index::UpdateResult; -use crate::index_controller::{UpdateMeta, UpdateStatus}; - -pub type Result = std::result::Result; -type UpdateStore = super::update_store::UpdateStore; -type PayloadData = std::result::Result>; - -#[derive(Debug, Error)] -pub enum UpdateError { - #[error("error with update: {0}")] - Error(Box), - #[error("Index {0} doesn't exist.")] - UnexistingIndex(Uuid), - #[error("Update {0} doesn't exist.")] - UnexistingUpdate(u64), -} - -enum UpdateMsg { - Update { - uuid: Uuid, - meta: UpdateMeta, - data: mpsc::Receiver>, - ret: oneshot::Sender>, - }, - ListUpdates { - uuid: Uuid, - ret: oneshot::Sender>>, - }, - GetUpdate { - uuid: Uuid, - ret: oneshot::Sender>, - id: u64, - }, - Delete { - uuid: Uuid, - ret: oneshot::Sender>, - }, - Create { - uuid: Uuid, - ret: oneshot::Sender>, - }, - Snapshot { - uuid: Uuid, - path: PathBuf, - ret: oneshot::Sender>, - }, -} - -struct UpdateActor { - path: PathBuf, - store: S, - inbox: mpsc::Receiver>, - index_handle: IndexActorHandle, -} - -#[async_trait::async_trait] -trait UpdateStoreStore { - async fn get_or_create(&self, uuid: Uuid) -> Result>; - async fn delete(&self, uuid: Uuid) -> Result>>; - async fn get(&self, uuid: Uuid) -> Result>>; -} - -impl UpdateActor -where - D: AsRef<[u8]> + Sized + 'static, - S: UpdateStoreStore, -{ - fn new( - store: S, - inbox: mpsc::Receiver>, - path: impl AsRef, - index_handle: IndexActorHandle, - ) -> anyhow::Result { - let path = path.as_ref().to_owned(); - fs::create_dir_all(path.join("update_files"))?; - assert!(path.exists()); - Ok(Self { - store, - inbox, - path, - index_handle, - }) - } - - async fn run(mut self) { - use UpdateMsg::*; - - info!("Started update actor."); - - loop { - match self.inbox.recv().await { - Some(Update { - uuid, - meta, - data, - ret, - }) => { - let _ = ret.send(self.handle_update(uuid, meta, data).await); - } - Some(ListUpdates { uuid, ret }) => { - let _ = ret.send(self.handle_list_updates(uuid).await); - } - Some(GetUpdate { uuid, ret, id }) => { - let _ = ret.send(self.handle_get_update(uuid, id).await); - } - Some(Delete { uuid, ret }) => { - let _ = ret.send(self.handle_delete(uuid).await); - } - Some(Create { uuid, ret }) => { - let _ = ret.send(self.handle_create(uuid).await); - } - Some(Snapshot { uuid, path, ret }) => { - let _ = ret.send(self.handle_snapshot(uuid, path).await); - } - None => break, - } - } - } - - async fn handle_update( - &self, - uuid: Uuid, - meta: UpdateMeta, - mut payload: mpsc::Receiver>, - ) -> Result { - let update_store = self.store.get_or_create(uuid).await?; - let update_file_id = uuid::Uuid::new_v4(); - let path = self - .path - .join(format!("update_files/update_{}", update_file_id)); - let mut file = OpenOptions::new() - .read(true) - .write(true) - .create(true) - .open(&path) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; - - while let Some(bytes) = payload.recv().await { - match bytes { - Ok(bytes) => { - file.write_all(bytes.as_ref()) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; - } - Err(e) => { - return Err(UpdateError::Error(e)); - } - } - } - - file.flush() - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; - - file.seek(SeekFrom::Start(0)) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))?; - - let mut file = file.into_std().await; - - tokio::task::spawn_blocking(move || { - use std::io::{copy, sink, BufReader, Seek}; - - // If the payload is empty, ignore the check. - if file - .metadata() - .map_err(|e| UpdateError::Error(Box::new(e)))? - .len() - > 0 - { - // Check that the json payload is valid: - let reader = BufReader::new(&mut file); - let mut checker = JsonChecker::new(reader); - - if copy(&mut checker, &mut sink()).is_err() || checker.finish().is_err() { - // The json file is invalid, we use Serde to get a nice error message: - file.seek(SeekFrom::Start(0)) - .map_err(|e| UpdateError::Error(Box::new(e)))?; - let _: serde_json::Value = serde_json::from_reader(file) - .map_err(|e| UpdateError::Error(Box::new(e)))?; - } - } - - // The payload is valid, we can register it to the update store. - update_store - .register_update(meta, path, uuid) - .map(UpdateStatus::Pending) - .map_err(|e| UpdateError::Error(Box::new(e))) - }) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))? - } - - async fn handle_list_updates(&self, uuid: Uuid) -> Result> { - let update_store = self.store.get(uuid).await?; - tokio::task::spawn_blocking(move || { - let result = update_store - .ok_or(UpdateError::UnexistingIndex(uuid))? - .list() - .map_err(|e| UpdateError::Error(e.into()))?; - Ok(result) - }) - .await - .map_err(|e| UpdateError::Error(Box::new(e)))? - } - - async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result { - let store = self - .store - .get(uuid) - .await? - .ok_or(UpdateError::UnexistingIndex(uuid))?; - let result = store - .meta(id) - .map_err(|e| UpdateError::Error(Box::new(e)))? - .ok_or(UpdateError::UnexistingUpdate(id))?; - Ok(result) - } - - async fn handle_delete(&self, uuid: Uuid) -> Result<()> { - let store = self.store.delete(uuid).await?; - - if let Some(store) = store { - tokio::task::spawn(async move { - let store = get_arc_ownership_blocking(store).await; - tokio::task::spawn_blocking(move || { - store.prepare_for_closing().wait(); - info!("Update store {} was closed.", uuid); - }); - }); - } - - Ok(()) - } - - async fn handle_create(&self, uuid: Uuid) -> Result<()> { - let _ = self.store.get_or_create(uuid).await?; - Ok(()) - } - - async fn handle_snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { - let index_handle = self.index_handle.clone(); - if let Some(update_store) = self.store.get(uuid).await? { - tokio::task::spawn_blocking(move || -> anyhow::Result<()> { - // acquire write lock to prevent further writes during snapshot - // the update lock must be acquired BEFORE the write lock to prevent dead lock - let _lock = update_store.update_lock.lock(); - let mut txn = update_store.env.write_txn()?; - - // create db snapshot - update_store.snapshot(&mut txn, &path, uuid)?; - - futures::executor::block_on( - async move { index_handle.snapshot(uuid, path).await }, - )?; - Ok(()) - }) - .await - .map_err(|e| UpdateError::Error(e.into()))? - .map_err(|e| UpdateError::Error(e.into()))?; - } - - Ok(()) - } -} - -#[derive(Clone)] -pub struct UpdateActorHandle { - sender: mpsc::Sender>, -} - -impl UpdateActorHandle -where - D: AsRef<[u8]> + Sized + 'static + Sync + Send, -{ - pub fn new( - index_handle: IndexActorHandle, - path: impl AsRef, - update_store_size: usize, - ) -> anyhow::Result { - let path = path.as_ref().to_owned().join("updates"); - let (sender, receiver) = mpsc::channel(100); - let store = MapUpdateStoreStore::new(index_handle.clone(), &path, update_store_size); - let actor = UpdateActor::new(store, receiver, path, index_handle)?; - - tokio::task::spawn(actor.run()); - - Ok(Self { sender }) - } - - pub async fn update( - &self, - meta: UpdateMeta, - data: mpsc::Receiver>, - uuid: Uuid, - ) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::Update { - uuid, - data, - meta, - ret, - }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") - } -} - -impl UpdateActorHandle { - pub async fn get_all_updates_status(&self, uuid: Uuid) -> Result> { - let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::ListUpdates { uuid, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") - } - - pub async fn update_status(&self, uuid: Uuid, id: u64) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::GetUpdate { uuid, id, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") - } - - pub async fn delete(&self, uuid: Uuid) -> Result<()> { - let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::Delete { uuid, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") - } - - pub async fn create(&self, uuid: Uuid) -> Result<()> { - let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::Create { uuid, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") - } - - pub async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { - let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::Snapshot { uuid, path, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") - } -} - -struct MapUpdateStoreStore { - db: Arc>>>, - index_handle: IndexActorHandle, - path: PathBuf, - update_store_size: usize, -} - -impl MapUpdateStoreStore { - fn new( - index_handle: IndexActorHandle, - path: impl AsRef, - update_store_size: usize, - ) -> Self { - let db = Arc::new(RwLock::new(HashMap::new())); - let path = path.as_ref().to_owned(); - Self { - db, - index_handle, - path, - update_store_size, - } - } -} - -#[async_trait::async_trait] -impl UpdateStoreStore for MapUpdateStoreStore { - async fn get_or_create(&self, uuid: Uuid) -> Result> { - match self.db.write().await.entry(uuid) { - Entry::Vacant(e) => { - let mut options = heed::EnvOpenOptions::new(); - let update_store_size = self.update_store_size; - options.map_size(update_store_size); - let path = self.path.clone().join(format!("updates-{}", e.key())); - fs::create_dir_all(&path).unwrap(); - let index_handle = self.index_handle.clone(); - let store = UpdateStore::open(options, &path, move |meta, file| { - futures::executor::block_on(index_handle.update(meta, file)) - }) - .map_err(|e| UpdateError::Error(e.into()))?; - let store = e.insert(store); - Ok(store.clone()) - } - Entry::Occupied(e) => Ok(e.get().clone()), - } - } - - async fn get(&self, uuid: Uuid) -> Result>> { - let guard = self.db.read().await; - match guard.get(&uuid) { - Some(uuid) => Ok(Some(uuid.clone())), - None => { - // The index is not found in the found in the loaded indexes, so we attempt to load - // it from disk. We need to acquire a write lock **before** attempting to open the - // index, because someone could be trying to open it at the same time as us. - drop(guard); - let path = self.path.clone().join(format!("updates-{}", uuid)); - if path.exists() { - let mut guard = self.db.write().await; - match guard.entry(uuid) { - Entry::Vacant(entry) => { - // We can safely load the index - let index_handle = self.index_handle.clone(); - let mut options = heed::EnvOpenOptions::new(); - let update_store_size = self.update_store_size; - options.map_size(update_store_size); - let store = UpdateStore::open(options, &path, move |meta, file| { - futures::executor::block_on(index_handle.update(meta, file)) - }) - .map_err(|e| UpdateError::Error(e.into()))?; - let store = entry.insert(store); - Ok(Some(store.clone())) - } - Entry::Occupied(entry) => { - // The index was loaded while we attempted to to iter - Ok(Some(entry.get().clone())) - } - } - } else { - Ok(None) - } - } - } - } - - async fn delete(&self, uuid: Uuid) -> Result>> { - let store = self.db.write().await.remove(&uuid); - let path = self.path.clone().join(format!("updates-{}", uuid)); - if store.is_some() || path.exists() { - fs::remove_dir_all(path).unwrap(); - } - Ok(store) - } -} diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs new file mode 100644 index 000000000..629a87ba4 --- /dev/null +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -0,0 +1,226 @@ +use std::io::SeekFrom; +use std::path::{Path, PathBuf}; + +use log::info; +use tokio::sync::mpsc; +use uuid::Uuid; +use oxidized_json_checker::JsonChecker; +use tokio::fs; +use tokio::io::{AsyncSeekExt, AsyncWriteExt}; + +use super::{PayloadData, UpdateError, UpdateMsg, UpdateStoreStore, Result}; +use crate::index_controller::index_actor::IndexActorHandle; +use crate::index_controller::{UpdateMeta, UpdateStatus, get_arc_ownership_blocking}; + +pub struct UpdateActor { + path: PathBuf, + store: S, + inbox: mpsc::Receiver>, + index_handle: I, +} + +impl UpdateActor +where + D: AsRef<[u8]> + Sized + 'static, + S: UpdateStoreStore, + I: IndexActorHandle + 'static, +{ + pub fn new( + store: S, + inbox: mpsc::Receiver>, + path: impl AsRef, + index_handle: I, + ) -> anyhow::Result { + let path = path.as_ref().to_owned(); + std::fs::create_dir_all(path.join("update_files"))?; + assert!(path.exists()); + Ok(Self { + store, + inbox, + path, + index_handle, + }) + } + + pub async fn run(mut self) { + use UpdateMsg::*; + + info!("Started update actor."); + + loop { + match self.inbox.recv().await { + Some(Update { + uuid, + meta, + data, + ret, + }) => { + let _ = ret.send(self.handle_update(uuid, meta, data).await); + } + Some(ListUpdates { uuid, ret }) => { + let _ = ret.send(self.handle_list_updates(uuid).await); + } + Some(GetUpdate { uuid, ret, id }) => { + let _ = ret.send(self.handle_get_update(uuid, id).await); + } + Some(Delete { uuid, ret }) => { + let _ = ret.send(self.handle_delete(uuid).await); + } + Some(Create { uuid, ret }) => { + let _ = ret.send(self.handle_create(uuid).await); + } + Some(Snapshot { uuid, path, ret }) => { + let _ = ret.send(self.handle_snapshot(uuid, path).await); + } + None => break, + } + } + } + + async fn handle_update( + &self, + uuid: Uuid, + meta: UpdateMeta, + mut payload: mpsc::Receiver>, + ) -> Result { + let update_store = self.store.get_or_create(uuid).await?; + let update_file_id = uuid::Uuid::new_v4(); + let path = self + .path + .join(format!("update_files/update_{}", update_file_id)); + let mut file = fs::OpenOptions::new() + .read(true) + .write(true) + .create(true) + .open(&path) + .await + .map_err(|e| UpdateError::Error(Box::new(e)))?; + + while let Some(bytes) = payload.recv().await { + match bytes { + Ok(bytes) => { + file.write_all(bytes.as_ref()) + .await + .map_err(|e| UpdateError::Error(Box::new(e)))?; + } + Err(e) => { + return Err(UpdateError::Error(e)); + } + } + } + + file.flush() + .await + .map_err(|e| UpdateError::Error(Box::new(e)))?; + + file.seek(SeekFrom::Start(0)) + .await + .map_err(|e| UpdateError::Error(Box::new(e)))?; + + let mut file = file.into_std().await; + + tokio::task::spawn_blocking(move || { + use std::io::{copy, sink, BufReader, Seek}; + + // If the payload is empty, ignore the check. + if file + .metadata() + .map_err(|e| UpdateError::Error(Box::new(e)))? + .len() + > 0 + { + // Check that the json payload is valid: + let reader = BufReader::new(&mut file); + let mut checker = JsonChecker::new(reader); + + if copy(&mut checker, &mut sink()).is_err() || checker.finish().is_err() { + // The json file is invalid, we use Serde to get a nice error message: + file.seek(SeekFrom::Start(0)) + .map_err(|e| UpdateError::Error(Box::new(e)))?; + let _: serde_json::Value = serde_json::from_reader(file) + .map_err(|e| UpdateError::Error(Box::new(e)))?; + } + } + + // The payload is valid, we can register it to the update store. + update_store + .register_update(meta, path, uuid) + .map(UpdateStatus::Pending) + .map_err(|e| UpdateError::Error(Box::new(e))) + }) + .await + .map_err(|e| UpdateError::Error(Box::new(e)))? + } + + async fn handle_list_updates(&self, uuid: Uuid) -> Result> { + let update_store = self.store.get(uuid).await?; + tokio::task::spawn_blocking(move || { + let result = update_store + .ok_or(UpdateError::UnexistingIndex(uuid))? + .list() + .map_err(|e| UpdateError::Error(e.into()))?; + Ok(result) + }) + .await + .map_err(|e| UpdateError::Error(Box::new(e)))? + } + + async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result { + let store = self + .store + .get(uuid) + .await? + .ok_or(UpdateError::UnexistingIndex(uuid))?; + let result = store + .meta(id) + .map_err(|e| UpdateError::Error(Box::new(e)))? + .ok_or(UpdateError::UnexistingUpdate(id))?; + Ok(result) + } + + async fn handle_delete(&self, uuid: Uuid) -> Result<()> { + let store = self.store.delete(uuid).await?; + + if let Some(store) = store { + tokio::task::spawn(async move { + let store = get_arc_ownership_blocking(store).await; + tokio::task::spawn_blocking(move || { + store.prepare_for_closing().wait(); + info!("Update store {} was closed.", uuid); + }); + }); + } + + Ok(()) + } + + async fn handle_create(&self, uuid: Uuid) -> Result<()> { + let _ = self.store.get_or_create(uuid).await?; + Ok(()) + } + + async fn handle_snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { + let index_handle = self.index_handle.clone(); + if let Some(update_store) = self.store.get(uuid).await? { + tokio::task::spawn_blocking(move || -> anyhow::Result<()> { + // acquire write lock to prevent further writes during snapshot + // the update lock must be acquired BEFORE the write lock to prevent dead lock + let _lock = update_store.update_lock.lock(); + let mut txn = update_store.env.write_txn()?; + + // create db snapshot + update_store.snapshot(&mut txn, &path, uuid)?; + + futures::executor::block_on( + async move { index_handle.snapshot(uuid, path).await }, + )?; + Ok(()) + }) + .await + .map_err(|e| UpdateError::Error(e.into()))? + .map_err(|e| UpdateError::Error(e.into()))?; + } + + Ok(()) + } +} diff --git a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs new file mode 100644 index 000000000..43b2ff8c2 --- /dev/null +++ b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs @@ -0,0 +1,96 @@ +use std::path::{Path, PathBuf}; + +use tokio::sync::{mpsc, oneshot}; +use uuid::Uuid; + +use super::{ + MapUpdateStoreStore, PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta, + UpdateMsg, UpdateStatus, +}; +use crate::index_controller::IndexActorHandle; + +#[derive(Clone)] +pub struct UpdateActorHandleImpl { + sender: mpsc::Sender>, +} + +impl UpdateActorHandleImpl +where + D: AsRef<[u8]> + Sized + 'static + Sync + Send, +{ + pub fn new( + index_handle: I, + path: impl AsRef, + update_store_size: usize, + ) -> anyhow::Result + where + I: IndexActorHandle + 'static, + { + let path = path.as_ref().to_owned().join("updates"); + let (sender, receiver) = mpsc::channel(100); + let store = MapUpdateStoreStore::new(index_handle.clone(), &path, update_store_size); + let actor = UpdateActor::new(store, receiver, path, index_handle)?; + + tokio::task::spawn(actor.run()); + + Ok(Self { sender }) + } +} +#[async_trait::async_trait] +impl UpdateActorHandle for UpdateActorHandleImpl +where + D: AsRef<[u8]> + Sized + 'static + Sync + Send, +{ + type Data = D; + + async fn update( + &self, + meta: UpdateMeta, + data: mpsc::Receiver>, + uuid: Uuid, + ) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = UpdateMsg::Update { + uuid, + data, + meta, + ret, + }; + let _ = self.sender.send(msg).await; + receiver.await.expect("update actor killed.") + } + async fn get_all_updates_status(&self, uuid: Uuid) -> Result> { + let (ret, receiver) = oneshot::channel(); + let msg = UpdateMsg::ListUpdates { uuid, ret }; + let _ = self.sender.send(msg).await; + receiver.await.expect("update actor killed.") + } + + async fn update_status(&self, uuid: Uuid, id: u64) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = UpdateMsg::GetUpdate { uuid, id, ret }; + let _ = self.sender.send(msg).await; + receiver.await.expect("update actor killed.") + } + + async fn delete(&self, uuid: Uuid) -> Result<()> { + let (ret, receiver) = oneshot::channel(); + let msg = UpdateMsg::Delete { uuid, ret }; + let _ = self.sender.send(msg).await; + receiver.await.expect("update actor killed.") + } + + async fn create(&self, uuid: Uuid) -> Result<()> { + let (ret, receiver) = oneshot::channel(); + let msg = UpdateMsg::Create { uuid, ret }; + let _ = self.sender.send(msg).await; + receiver.await.expect("update actor killed.") + } + + async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { + let (ret, receiver) = oneshot::channel(); + let msg = UpdateMsg::Snapshot { uuid, path, ret }; + let _ = self.sender.send(msg).await; + receiver.await.expect("update actor killed.") + } +} diff --git a/meilisearch-http/src/index_controller/update_actor/message.rs b/meilisearch-http/src/index_controller/update_actor/message.rs new file mode 100644 index 000000000..628a1ebcb --- /dev/null +++ b/meilisearch-http/src/index_controller/update_actor/message.rs @@ -0,0 +1,37 @@ +use std::path::PathBuf; + +use uuid::Uuid; +use tokio::sync::{oneshot, mpsc}; + +use super::{Result, PayloadData, UpdateStatus, UpdateMeta}; + +pub enum UpdateMsg { + Update { + uuid: Uuid, + meta: UpdateMeta, + data: mpsc::Receiver>, + ret: oneshot::Sender>, + }, + ListUpdates { + uuid: Uuid, + ret: oneshot::Sender>>, + }, + GetUpdate { + uuid: Uuid, + ret: oneshot::Sender>, + id: u64, + }, + Delete { + uuid: Uuid, + ret: oneshot::Sender>, + }, + Create { + uuid: Uuid, + ret: oneshot::Sender>, + }, + Snapshot { + uuid: Uuid, + path: PathBuf, + ret: oneshot::Sender>, + }, +} diff --git a/meilisearch-http/src/index_controller/update_actor/mod.rs b/meilisearch-http/src/index_controller/update_actor/mod.rs new file mode 100644 index 000000000..740323cdc --- /dev/null +++ b/meilisearch-http/src/index_controller/update_actor/mod.rs @@ -0,0 +1,51 @@ +mod actor; +mod store; +mod message; +mod handle_impl; +mod update_store; + +use std::path::PathBuf; + +use thiserror::Error; +use tokio::sync::mpsc; +use uuid::Uuid; + +use crate::index::UpdateResult; +use crate::index_controller::{UpdateMeta, UpdateStatus}; + +use actor::UpdateActor; +use message::UpdateMsg; +use store::{UpdateStoreStore, MapUpdateStoreStore}; + +pub use handle_impl::UpdateActorHandleImpl; + +pub type Result = std::result::Result; +type UpdateStore = update_store::UpdateStore; +type PayloadData = std::result::Result>; + +#[derive(Debug, Error)] +pub enum UpdateError { + #[error("error with update: {0}")] + Error(Box), + #[error("Index {0} doesn't exist.")] + UnexistingIndex(Uuid), + #[error("Update {0} doesn't exist.")] + UnexistingUpdate(u64), +} + +#[async_trait::async_trait] +pub trait UpdateActorHandle { + type Data: AsRef<[u8]> + Sized + 'static + Sync + Send; + + async fn get_all_updates_status(&self, uuid: Uuid) -> Result>; + async fn update_status(&self, uuid: Uuid, id: u64) -> Result; + async fn delete(&self, uuid: Uuid) -> Result<()>; + async fn create(&self, uuid: Uuid) -> Result<()>; + async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>; + async fn update( + &self, + meta: UpdateMeta, + data: mpsc::Receiver>, + uuid: Uuid, + ) -> Result ; +} diff --git a/meilisearch-http/src/index_controller/update_actor/store.rs b/meilisearch-http/src/index_controller/update_actor/store.rs new file mode 100644 index 000000000..9320c488f --- /dev/null +++ b/meilisearch-http/src/index_controller/update_actor/store.rs @@ -0,0 +1,112 @@ +use std::collections::HashMap; +use std::collections::hash_map::Entry; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use uuid::Uuid; +use tokio::sync::RwLock; +use tokio::fs; + +use crate::index_controller::IndexActorHandle; +use super::{UpdateStore, UpdateError, Result}; + +#[async_trait::async_trait] +pub trait UpdateStoreStore { + async fn get_or_create(&self, uuid: Uuid) -> Result>; + async fn delete(&self, uuid: Uuid) -> Result>>; + async fn get(&self, uuid: Uuid) -> Result>>; +} + +pub struct MapUpdateStoreStore { + db: Arc>>>, + index_handle: I, + path: PathBuf, + update_store_size: usize, +} + +impl MapUpdateStoreStore { + pub fn new( + index_handle: I, + path: impl AsRef, + update_store_size: usize, + ) -> Self { + let db = Arc::new(RwLock::new(HashMap::new())); + let path = path.as_ref().to_owned(); + Self { + db, + index_handle, + path, + update_store_size, + } + } +} + +#[async_trait::async_trait] +impl UpdateStoreStore for MapUpdateStoreStore { + async fn get_or_create(&self, uuid: Uuid) -> Result> { + match self.db.write().await.entry(uuid) { + Entry::Vacant(e) => { + let mut options = heed::EnvOpenOptions::new(); + let update_store_size = self.update_store_size; + options.map_size(update_store_size); + let path = self.path.clone().join(format!("updates-{}", e.key())); + fs::create_dir_all(&path).await.unwrap(); + let index_handle = self.index_handle.clone(); + let store = UpdateStore::open(options, &path, move |meta, file| { + futures::executor::block_on(index_handle.update(meta, file)) + }) + .map_err(|e| UpdateError::Error(e.into()))?; + let store = e.insert(store); + Ok(store.clone()) + } + Entry::Occupied(e) => Ok(e.get().clone()), + } + } + + async fn get(&self, uuid: Uuid) -> Result>> { + let guard = self.db.read().await; + match guard.get(&uuid) { + Some(uuid) => Ok(Some(uuid.clone())), + None => { + // The index is not found in the found in the loaded indexes, so we attempt to load + // it from disk. We need to acquire a write lock **before** attempting to open the + // index, because someone could be trying to open it at the same time as us. + drop(guard); + let path = self.path.clone().join(format!("updates-{}", uuid)); + if path.exists() { + let mut guard = self.db.write().await; + match guard.entry(uuid) { + Entry::Vacant(entry) => { + // We can safely load the index + let index_handle = self.index_handle.clone(); + let mut options = heed::EnvOpenOptions::new(); + let update_store_size = self.update_store_size; + options.map_size(update_store_size); + let store = UpdateStore::open(options, &path, move |meta, file| { + futures::executor::block_on(index_handle.update(meta, file)) + }) + .map_err(|e| UpdateError::Error(e.into()))?; + let store = entry.insert(store); + Ok(Some(store.clone())) + } + Entry::Occupied(entry) => { + // The index was loaded while we attempted to to iter + Ok(Some(entry.get().clone())) + } + } + } else { + Ok(None) + } + } + } + } + + async fn delete(&self, uuid: Uuid) -> Result>> { + let store = self.db.write().await.remove(&uuid); + let path = self.path.clone().join(format!("updates-{}", uuid)); + if store.is_some() || path.exists() { + fs::remove_dir_all(path).await.unwrap(); + } + Ok(store) + } +} diff --git a/meilisearch-http/src/index_controller/update_store.rs b/meilisearch-http/src/index_controller/update_actor/update_store.rs similarity index 100% rename from meilisearch-http/src/index_controller/update_store.rs rename to meilisearch-http/src/index_controller/update_actor/update_store.rs diff --git a/meilisearch-http/src/index_controller/uuid_resolver.rs b/meilisearch-http/src/index_controller/uuid_resolver.rs deleted file mode 100644 index 88b62fe1f..000000000 --- a/meilisearch-http/src/index_controller/uuid_resolver.rs +++ /dev/null @@ -1,343 +0,0 @@ -use std::fs::create_dir_all; -use std::path::{Path, PathBuf}; - -use heed::{ - types::{ByteSlice, Str}, - Database, Env, EnvOpenOptions,CompactionOption -}; -use log::{info, warn}; -use thiserror::Error; -use tokio::sync::{mpsc, oneshot}; -use uuid::Uuid; - -const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB - -pub type Result = std::result::Result; - -#[derive(Debug)] -enum UuidResolveMsg { - Resolve { - uid: String, - ret: oneshot::Sender>, - }, - GetOrCreate { - uid: String, - ret: oneshot::Sender>, - }, - Create { - uid: String, - ret: oneshot::Sender>, - }, - Delete { - uid: String, - ret: oneshot::Sender>, - }, - List { - ret: oneshot::Sender>>, - }, - SnapshotRequest { - path: PathBuf, - ret: oneshot::Sender>>, - }, -} - -struct UuidResolverActor { - inbox: mpsc::Receiver, - store: S, -} - -impl UuidResolverActor { - fn new(inbox: mpsc::Receiver, store: S) -> Self { - Self { inbox, store } - } - - async fn run(mut self) { - use UuidResolveMsg::*; - - info!("uuid resolver started"); - - loop { - match self.inbox.recv().await { - Some(Create { uid: name, ret }) => { - let _ = ret.send(self.handle_create(name).await); - } - Some(GetOrCreate { uid: name, ret }) => { - let _ = ret.send(self.handle_get_or_create(name).await); - } - Some(Resolve { uid: name, ret }) => { - let _ = ret.send(self.handle_resolve(name).await); - } - Some(Delete { uid: name, ret }) => { - let _ = ret.send(self.handle_delete(name).await); - } - Some(List { ret }) => { - let _ = ret.send(self.handle_list().await); - } - Some(SnapshotRequest { path, ret }) => { - let _ = ret.send(self.handle_snapshot(path).await); - } - // all senders have been dropped, need to quit. - None => break, - } - } - - warn!("exiting uuid resolver loop"); - } - - async fn handle_create(&self, uid: String) -> Result { - if !is_index_uid_valid(&uid) { - return Err(UuidError::BadlyFormatted(uid)); - } - self.store.create_uuid(uid, true).await - } - - async fn handle_get_or_create(&self, uid: String) -> Result { - if !is_index_uid_valid(&uid) { - return Err(UuidError::BadlyFormatted(uid)); - } - self.store.create_uuid(uid, false).await - } - - async fn handle_resolve(&self, uid: String) -> Result { - self.store - .get_uuid(uid.clone()) - .await? - .ok_or(UuidError::UnexistingIndex(uid)) - } - - async fn handle_delete(&self, uid: String) -> Result { - self.store - .delete(uid.clone()) - .await? - .ok_or(UuidError::UnexistingIndex(uid)) - } - - async fn handle_list(&self) -> Result> { - let result = self.store.list().await?; - Ok(result) - } - - async fn handle_snapshot(&self, path: PathBuf) -> Result> { - self.store.snapshot(path).await - } -} - -fn is_index_uid_valid(uid: &str) -> bool { - uid.chars() - .all(|x| x.is_ascii_alphanumeric() || x == '-' || x == '_') -} - -#[derive(Clone)] -pub struct UuidResolverHandle { - sender: mpsc::Sender, -} - -impl UuidResolverHandle { - pub fn new(path: impl AsRef) -> anyhow::Result { - let (sender, reveiver) = mpsc::channel(100); - let store = HeedUuidStore::new(path)?; - let actor = UuidResolverActor::new(reveiver, store); - tokio::spawn(actor.run()); - Ok(Self { sender }) - } - - pub async fn resolve(&self, name: String) -> anyhow::Result { - let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::Resolve { uid: name, ret }; - let _ = self.sender.send(msg).await; - Ok(receiver - .await - .expect("Uuid resolver actor has been killed")?) - } - - pub async fn get_or_create(&self, name: String) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::GetOrCreate { uid: name, ret }; - let _ = self.sender.send(msg).await; - Ok(receiver - .await - .expect("Uuid resolver actor has been killed")?) - } - - pub async fn create(&self, name: String) -> anyhow::Result { - let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::Create { uid: name, ret }; - let _ = self.sender.send(msg).await; - Ok(receiver - .await - .expect("Uuid resolver actor has been killed")?) - } - - pub async fn delete(&self, name: String) -> anyhow::Result { - let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::Delete { uid: name, ret }; - let _ = self.sender.send(msg).await; - Ok(receiver - .await - .expect("Uuid resolver actor has been killed")?) - } - - pub async fn list(&self) -> anyhow::Result> { - let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::List { ret }; - let _ = self.sender.send(msg).await; - Ok(receiver - .await - .expect("Uuid resolver actor has been killed")?) - } - - pub async fn snapshot(&self, path: PathBuf) -> Result> { - let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::SnapshotRequest { path, ret }; - let _ = self.sender.send(msg).await; - Ok(receiver - .await - .expect("Uuid resolver actor has been killed")?) - } -} - -#[derive(Debug, Error)] -pub enum UuidError { - #[error("Name already exist.")] - NameAlreadyExist, - #[error("Index \"{0}\" doesn't exist.")] - UnexistingIndex(String), - #[error("Error performing task: {0}")] - TokioTask(#[from] tokio::task::JoinError), - #[error("Database error: {0}")] - Heed(#[from] heed::Error), - #[error("Uuid error: {0}")] - Uuid(#[from] uuid::Error), - #[error("Badly formatted index uid: {0}")] - BadlyFormatted(String), -} - -#[async_trait::async_trait] -trait UuidStore { - // Create a new entry for `name`. Return an error if `err` and the entry already exists, return - // the uuid otherwise. - async fn create_uuid(&self, uid: String, err: bool) -> Result; - async fn get_uuid(&self, uid: String) -> Result>; - async fn delete(&self, uid: String) -> Result>; - async fn list(&self) -> Result>; - async fn snapshot(&self, path: PathBuf) -> Result>; -} - -struct HeedUuidStore { - env: Env, - db: Database, -} - -impl HeedUuidStore { - fn new(path: impl AsRef) -> anyhow::Result { - let path = path.as_ref().join("index_uuids"); - create_dir_all(&path)?; - let mut options = EnvOpenOptions::new(); - options.map_size(UUID_STORE_SIZE); // 1GB - let env = options.open(path)?; - let db = env.create_database(None)?; - Ok(Self { env, db }) - } -} - -#[async_trait::async_trait] -impl UuidStore for HeedUuidStore { - async fn create_uuid(&self, name: String, err: bool) -> Result { - let env = self.env.clone(); - let db = self.db; - tokio::task::spawn_blocking(move || { - let mut txn = env.write_txn()?; - match db.get(&txn, &name)? { - Some(uuid) => { - if err { - Err(UuidError::NameAlreadyExist) - } else { - let uuid = Uuid::from_slice(uuid)?; - Ok(uuid) - } - } - None => { - let uuid = Uuid::new_v4(); - db.put(&mut txn, &name, uuid.as_bytes())?; - txn.commit()?; - Ok(uuid) - } - } - }) - .await? - } - async fn get_uuid(&self, name: String) -> Result> { - let env = self.env.clone(); - let db = self.db; - tokio::task::spawn_blocking(move || { - let txn = env.read_txn()?; - match db.get(&txn, &name)? { - Some(uuid) => { - let uuid = Uuid::from_slice(uuid)?; - Ok(Some(uuid)) - } - None => Ok(None), - } - }) - .await? - } - - async fn delete(&self, uid: String) -> Result> { - let env = self.env.clone(); - let db = self.db; - tokio::task::spawn_blocking(move || { - let mut txn = env.write_txn()?; - match db.get(&txn, &uid)? { - Some(uuid) => { - let uuid = Uuid::from_slice(uuid)?; - db.delete(&mut txn, &uid)?; - txn.commit()?; - Ok(Some(uuid)) - } - None => Ok(None), - } - }) - .await? - } - - async fn list(&self) -> Result> { - let env = self.env.clone(); - let db = self.db; - tokio::task::spawn_blocking(move || { - let txn = env.read_txn()?; - let mut entries = Vec::new(); - for entry in db.iter(&txn)? { - let (name, uuid) = entry?; - let uuid = Uuid::from_slice(uuid)?; - entries.push((name.to_owned(), uuid)) - } - Ok(entries) - }) - .await? - } - - async fn snapshot(&self, mut path: PathBuf) -> Result> { - let env = self.env.clone(); - let db = self.db; - tokio::task::spawn_blocking(move || { - // Write transaction to acquire a lock on the database. - let txn = env.write_txn()?; - let mut entries = Vec::new(); - for entry in db.iter(&txn)? { - let (_, uuid) = entry?; - let uuid = Uuid::from_slice(uuid)?; - entries.push(uuid) - } - - // only perform snapshot if there are indexes - if !entries.is_empty() { - path.push("index_uuids"); - create_dir_all(&path).unwrap(); - path.push("data.mdb"); - env.copy_to_path(path, CompactionOption::Enabled)?; - } - Ok(entries) - }) - .await? - } -} diff --git a/meilisearch-http/src/index_controller/uuid_resolver/actor.rs b/meilisearch-http/src/index_controller/uuid_resolver/actor.rs new file mode 100644 index 000000000..4c0a13ad7 --- /dev/null +++ b/meilisearch-http/src/index_controller/uuid_resolver/actor.rs @@ -0,0 +1,94 @@ +use std::path::PathBuf; + +use log::{info, warn}; +use tokio::sync::mpsc; +use uuid::Uuid; + +use super::{UuidResolveMsg, UuidStore, Result, UuidError}; + +pub struct UuidResolverActor { + inbox: mpsc::Receiver, + store: S, +} + +impl UuidResolverActor { + pub fn new(inbox: mpsc::Receiver, store: S) -> Self { + Self { inbox, store } + } + + pub async fn run(mut self) { + use UuidResolveMsg::*; + + info!("uuid resolver started"); + + loop { + match self.inbox.recv().await { + Some(Create { uid: name, ret }) => { + let _ = ret.send(self.handle_create(name).await); + } + Some(GetOrCreate { uid: name, ret }) => { + let _ = ret.send(self.handle_get_or_create(name).await); + } + Some(Resolve { uid: name, ret }) => { + let _ = ret.send(self.handle_resolve(name).await); + } + Some(Delete { uid: name, ret }) => { + let _ = ret.send(self.handle_delete(name).await); + } + Some(List { ret }) => { + let _ = ret.send(self.handle_list().await); + } + Some(SnapshotRequest { path, ret }) => { + let _ = ret.send(self.handle_snapshot(path).await); + } + // all senders have been dropped, need to quit. + None => break, + } + } + + warn!("exiting uuid resolver loop"); + } + + async fn handle_create(&self, uid: String) -> Result { + if !is_index_uid_valid(&uid) { + return Err(UuidError::BadlyFormatted(uid)); + } + self.store.create_uuid(uid, true).await + } + + async fn handle_get_or_create(&self, uid: String) -> Result { + if !is_index_uid_valid(&uid) { + return Err(UuidError::BadlyFormatted(uid)); + } + self.store.create_uuid(uid, false).await + } + + async fn handle_resolve(&self, uid: String) -> Result { + self.store + .get_uuid(uid.clone()) + .await? + .ok_or(UuidError::UnexistingIndex(uid)) + } + + async fn handle_delete(&self, uid: String) -> Result { + self.store + .delete(uid.clone()) + .await? + .ok_or(UuidError::UnexistingIndex(uid)) + } + + async fn handle_list(&self) -> Result> { + let result = self.store.list().await?; + Ok(result) + } + + async fn handle_snapshot(&self, path: PathBuf) -> Result> { + self.store.snapshot(path).await + } +} + +fn is_index_uid_valid(uid: &str) -> bool { + uid.chars() + .all(|x| x.is_ascii_alphanumeric() || x == '-' || x == '_') +} + diff --git a/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs b/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs new file mode 100644 index 000000000..265ea8422 --- /dev/null +++ b/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs @@ -0,0 +1,78 @@ +use std::path::{Path, PathBuf}; + +use tokio::sync::{mpsc, oneshot}; +use uuid::Uuid; + +use super::{HeedUuidStore, UuidResolverActor, UuidResolveMsg, UuidResolverHandle, Result}; + +#[derive(Clone)] +pub struct UuidResolverHandleImpl { + sender: mpsc::Sender, +} + +impl UuidResolverHandleImpl { + pub fn new(path: impl AsRef) -> anyhow::Result { + let (sender, reveiver) = mpsc::channel(100); + let store = HeedUuidStore::new(path)?; + let actor = UuidResolverActor::new(reveiver, store); + tokio::spawn(actor.run()); + Ok(Self { sender }) + } +} + +#[async_trait::async_trait] +impl UuidResolverHandle for UuidResolverHandleImpl { + async fn resolve(&self, name: String) -> anyhow::Result { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::Resolve { uid: name, ret }; + let _ = self.sender.send(msg).await; + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) + } + + async fn get_or_create(&self, name: String) -> Result { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::GetOrCreate { uid: name, ret }; + let _ = self.sender.send(msg).await; + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) + } + + async fn create(&self, name: String) -> anyhow::Result { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::Create { uid: name, ret }; + let _ = self.sender.send(msg).await; + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) + } + + async fn delete(&self, name: String) -> anyhow::Result { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::Delete { uid: name, ret }; + let _ = self.sender.send(msg).await; + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) + } + + async fn list(&self) -> anyhow::Result> { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::List { ret }; + let _ = self.sender.send(msg).await; + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) + } + + async fn snapshot(&self, path: PathBuf) -> Result> { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::SnapshotRequest { path, ret }; + let _ = self.sender.send(msg).await; + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) + } +} diff --git a/meilisearch-http/src/index_controller/uuid_resolver/message.rs b/meilisearch-http/src/index_controller/uuid_resolver/message.rs new file mode 100644 index 000000000..0e8323a82 --- /dev/null +++ b/meilisearch-http/src/index_controller/uuid_resolver/message.rs @@ -0,0 +1,33 @@ +use std::path::PathBuf; + +use tokio::sync::oneshot; +use uuid::Uuid; + +use super::Result; + +#[derive(Debug)] +pub enum UuidResolveMsg { + Resolve { + uid: String, + ret: oneshot::Sender>, + }, + GetOrCreate { + uid: String, + ret: oneshot::Sender>, + }, + Create { + uid: String, + ret: oneshot::Sender>, + }, + Delete { + uid: String, + ret: oneshot::Sender>, + }, + List { + ret: oneshot::Sender>>, + }, + SnapshotRequest { + path: PathBuf, + ret: oneshot::Sender>>, + }, +} diff --git a/meilisearch-http/src/index_controller/uuid_resolver/mod.rs b/meilisearch-http/src/index_controller/uuid_resolver/mod.rs new file mode 100644 index 000000000..08cbe70e0 --- /dev/null +++ b/meilisearch-http/src/index_controller/uuid_resolver/mod.rs @@ -0,0 +1,45 @@ +mod actor; +mod handle_impl; +mod message; +mod store; + +use std::path::PathBuf; + +use thiserror::Error; +use uuid::Uuid; + +use actor::UuidResolverActor; +use message::UuidResolveMsg; +use store::{HeedUuidStore, UuidStore}; + +pub use handle_impl::UuidResolverHandleImpl; + +const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB + +pub type Result = std::result::Result; + +#[async_trait::async_trait] +pub trait UuidResolverHandle { + async fn resolve(&self, name: String) -> anyhow::Result; + async fn get_or_create(&self, name: String) -> Result; + async fn create(&self, name: String) -> anyhow::Result; + async fn delete(&self, name: String) -> anyhow::Result; + async fn list(&self) -> anyhow::Result>; + async fn snapshot(&self, path: PathBuf) -> Result>; +} + +#[derive(Debug, Error)] +pub enum UuidError { + #[error("Name already exist.")] + NameAlreadyExist, + #[error("Index \"{0}\" doesn't exist.")] + UnexistingIndex(String), + #[error("Error performing task: {0}")] + TokioTask(#[from] tokio::task::JoinError), + #[error("Database error: {0}")] + Heed(#[from] heed::Error), + #[error("Uuid error: {0}")] + Uuid(#[from] uuid::Error), + #[error("Badly formatted index uid: {0}")] + BadlyFormatted(String), +} diff --git a/meilisearch-http/src/index_controller/uuid_resolver/store.rs b/meilisearch-http/src/index_controller/uuid_resolver/store.rs new file mode 100644 index 000000000..e821f2b2b --- /dev/null +++ b/meilisearch-http/src/index_controller/uuid_resolver/store.rs @@ -0,0 +1,140 @@ +use std::fs::create_dir_all; +use std::path::{Path, PathBuf}; + +use heed::{ + types::{ByteSlice, Str}, + Database, Env, EnvOpenOptions,CompactionOption +}; +use uuid::Uuid; + +use super::{UUID_STORE_SIZE, UuidError, Result}; + +#[async_trait::async_trait] +pub trait UuidStore { + // Create a new entry for `name`. Return an error if `err` and the entry already exists, return + // the uuid otherwise. + async fn create_uuid(&self, uid: String, err: bool) -> Result; + async fn get_uuid(&self, uid: String) -> Result>; + async fn delete(&self, uid: String) -> Result>; + async fn list(&self) -> Result>; + async fn snapshot(&self, path: PathBuf) -> Result>; +} + +pub struct HeedUuidStore { + env: Env, + db: Database, +} + +impl HeedUuidStore { + pub fn new(path: impl AsRef) -> anyhow::Result { + let path = path.as_ref().join("index_uuids"); + create_dir_all(&path)?; + let mut options = EnvOpenOptions::new(); + options.map_size(UUID_STORE_SIZE); // 1GB + let env = options.open(path)?; + let db = env.create_database(None)?; + Ok(Self { env, db }) + } +} + +#[async_trait::async_trait] +impl UuidStore for HeedUuidStore { + async fn create_uuid(&self, name: String, err: bool) -> Result { + let env = self.env.clone(); + let db = self.db; + tokio::task::spawn_blocking(move || { + let mut txn = env.write_txn()?; + match db.get(&txn, &name)? { + Some(uuid) => { + if err { + Err(UuidError::NameAlreadyExist) + } else { + let uuid = Uuid::from_slice(uuid)?; + Ok(uuid) + } + } + None => { + let uuid = Uuid::new_v4(); + db.put(&mut txn, &name, uuid.as_bytes())?; + txn.commit()?; + Ok(uuid) + } + } + }) + .await? + } + async fn get_uuid(&self, name: String) -> Result> { + let env = self.env.clone(); + let db = self.db; + tokio::task::spawn_blocking(move || { + let txn = env.read_txn()?; + match db.get(&txn, &name)? { + Some(uuid) => { + let uuid = Uuid::from_slice(uuid)?; + Ok(Some(uuid)) + } + None => Ok(None), + } + }) + .await? + } + + async fn delete(&self, uid: String) -> Result> { + let env = self.env.clone(); + let db = self.db; + tokio::task::spawn_blocking(move || { + let mut txn = env.write_txn()?; + match db.get(&txn, &uid)? { + Some(uuid) => { + let uuid = Uuid::from_slice(uuid)?; + db.delete(&mut txn, &uid)?; + txn.commit()?; + Ok(Some(uuid)) + } + None => Ok(None), + } + }) + .await? + } + + async fn list(&self) -> Result> { + let env = self.env.clone(); + let db = self.db; + tokio::task::spawn_blocking(move || { + let txn = env.read_txn()?; + let mut entries = Vec::new(); + for entry in db.iter(&txn)? { + let (name, uuid) = entry?; + let uuid = Uuid::from_slice(uuid)?; + entries.push((name.to_owned(), uuid)) + } + Ok(entries) + }) + .await? + } + + async fn snapshot(&self, mut path: PathBuf) -> Result> { + let env = self.env.clone(); + let db = self.db; + tokio::task::spawn_blocking(move || { + // Write transaction to acquire a lock on the database. + let txn = env.write_txn()?; + let mut entries = Vec::new(); + for entry in db.iter(&txn)? { + let (_, uuid) = entry?; + let uuid = Uuid::from_slice(uuid)?; + entries.push(uuid) + } + + // only perform snapshot if there are indexes + if !entries.is_empty() { + path.push("index_uuids"); + create_dir_all(&path).unwrap(); + path.push("data.mdb"); + env.copy_to_path(path, CompactionOption::Enabled)?; + } + Ok(entries) + }) + .await? + } +} diff --git a/meilisearch-http/tests/common/server.rs b/meilisearch-http/tests/common/server.rs index 43caf1dc6..fb5147811 100644 --- a/meilisearch-http/tests/common/server.rs +++ b/meilisearch-http/tests/common/server.rs @@ -43,7 +43,7 @@ impl Server { ignore_snapshot_if_db_exists: false, snapshot_dir: ".".into(), schedule_snapshot: false, - snapshot_interval_sec: None, + snapshot_interval_sec: 0, import_dump: None, indexer_options: IndexerOpts::default(), #[cfg(all(not(debug_assertions), feature = "sentry"))]