diff --git a/meilisearch-http/src/data/mod.rs b/meilisearch-http/src/data/mod.rs index 3aee18217..ab50e83cc 100644 --- a/meilisearch-http/src/data/mod.rs +++ b/meilisearch-http/src/data/mod.rs @@ -127,7 +127,7 @@ impl Data { stats.database_size += index_stats.size; stats.database_size += self .index_controller - .get_updates_size(index.uid.clone()) + .get_updates_size() .await?; stats.last_update = Some(match stats.last_update { diff --git a/meilisearch-http/src/index/mod.rs b/meilisearch-http/src/index/mod.rs index 57a939aaa..0bb20d3ed 100644 --- a/meilisearch-http/src/index/mod.rs +++ b/meilisearch-http/src/index/mod.rs @@ -6,9 +6,9 @@ use anyhow::{bail, Context}; use milli::obkv_to_json; use serde_json::{Map, Value}; +use crate::helpers::EnvSizer; pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT}; pub use updates::{Facets, Settings, UpdateResult}; -use crate::helpers::EnvSizer; mod search; mod updates; diff --git a/meilisearch-http/src/index_controller/index_actor/actor.rs b/meilisearch-http/src/index_controller/index_actor/actor.rs index af755f65f..06aa90562 100644 --- a/meilisearch-http/src/index_controller/index_actor/actor.rs +++ b/meilisearch-http/src/index_controller/index_actor/actor.rs @@ -19,6 +19,8 @@ use crate::option::IndexerOpts; use super::{IndexError, IndexMeta, IndexMsg, IndexSettings, IndexStore, Result, UpdateResult}; +pub const CONCURRENT_INDEX_MSG: usize = 10; + pub struct IndexActor { receiver: Option>, update_handler: Arc, @@ -27,10 +29,7 @@ pub struct IndexActor { } impl IndexActor { - pub fn new( - receiver: mpsc::Receiver, - store: S, - ) -> Result { + pub fn new(receiver: mpsc::Receiver, store: S) -> Result { let options = IndexerOpts::default(); let update_handler = UpdateHandler::new(&options).map_err(IndexError::Error)?; let update_handler = Arc::new(update_handler); @@ -40,7 +39,6 @@ impl IndexActor { store, update_handler, processing: RwLock::new(None), - store, }) } @@ -62,7 +60,9 @@ impl IndexActor { } }; - stream.for_each_concurrent(Some(10), |msg| self.handle_message(msg)).await; + stream + .for_each_concurrent(Some(CONCURRENT_INDEX_MSG), |msg| self.handle_message(msg)) + .await; } async fn handle_message(&self, msg: IndexMsg) { @@ -75,7 +75,12 @@ impl IndexActor { } => { let _ = ret.send(self.handle_create_index(uuid, primary_key).await); } - Update { ret, meta, data, uuid } => { + Update { + ret, + meta, + data, + uuid, + } => { let _ = ret.send(self.handle_update(uuid, meta, data).await); } Search { ret, query, uuid } => { diff --git a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs index 629fd0db7..d23357d38 100644 --- a/meilisearch-http/src/index_controller/index_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/index_actor/handle_impl.rs @@ -36,7 +36,12 @@ impl IndexActorHandle for IndexActorHandleImpl { data: std::fs::File, ) -> anyhow::Result { let (ret, receiver) = oneshot::channel(); - let msg = IndexMsg::Update { ret, meta, data, uuid }; + let msg = IndexMsg::Update { + ret, + meta, + data, + uuid, + }; let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } @@ -126,7 +131,7 @@ impl IndexActorHandle for IndexActorHandleImpl { async fn get_index_stats(&self, uuid: Uuid) -> Result { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::GetStats { uuid, ret }; - let _ = self.read_sender.send(msg).await; + let _ = self.sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } } @@ -138,8 +143,6 @@ impl IndexActorHandleImpl { let store = MapIndexStore::new(path, index_size); let actor = IndexActor::new(receiver, store)?; tokio::task::spawn(actor.run()); - Ok(Self { - sender, - }) + Ok(Self { sender }) } } diff --git a/meilisearch-http/src/index_controller/index_actor/mod.rs b/meilisearch-http/src/index_controller/index_actor/mod.rs index 70a69822d..e44891b5a 100644 --- a/meilisearch-http/src/index_controller/index_actor/mod.rs +++ b/meilisearch-http/src/index_controller/index_actor/mod.rs @@ -8,6 +8,7 @@ use thiserror::Error; use uuid::Uuid; use actor::IndexActor; +pub use actor::CONCURRENT_INDEX_MSG; pub use handle_impl::IndexActorHandleImpl; use message::IndexMsg; use store::{IndexStore, MapIndexStore}; diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index cf8d036c7..9d9ed02c1 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -356,10 +356,8 @@ impl IndexController { Ok(self.index_handle.get_index_stats(uuid).await?) } - pub async fn get_updates_size(&self, uid: String) -> anyhow::Result { - let uuid = self.uuid_resolver.get(uid.clone()).await?; - - Ok(self.update_handle.get_size(uuid).await?) + pub async fn get_updates_size(&self) -> anyhow::Result { + Ok(self.update_handle.get_size().await?) } pub async fn get_uuids_size(&self) -> anyhow::Result { diff --git a/meilisearch-http/src/index_controller/snapshot.rs b/meilisearch-http/src/index_controller/snapshot.rs index 8557fe04e..eba5a5f3b 100644 --- a/meilisearch-http/src/index_controller/snapshot.rs +++ b/meilisearch-http/src/index_controller/snapshot.rs @@ -71,15 +71,10 @@ where return Ok(()); } - let tasks = uuids - .iter() - .map(|&uuid| { - self.update_handle - .snapshot(uuid, temp_snapshot_path.clone()) - }) - .collect::>(); - futures::future::try_join_all(tasks).await?; + self.update_handle + .snapshot(uuids, temp_snapshot_path.clone()) + .await?; let snapshot_dir = self.snapshot_path.clone(); let snapshot_path = self diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs index 8cf86d126..2a752d0b3 100644 --- a/meilisearch-http/src/index_controller/update_actor/actor.rs +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -8,12 +8,12 @@ use tokio::fs; use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use tokio::sync::mpsc; use uuid::Uuid; +use futures::StreamExt; use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore}; -use crate::index_controller::index_actor::IndexActorHandle; +use crate::index_controller::index_actor::{IndexActorHandle, CONCURRENT_INDEX_MSG}; use crate::index_controller::{UpdateMeta, UpdateStatus}; - pub struct UpdateActor { path: PathBuf, store: Arc, @@ -32,10 +32,7 @@ where path: impl AsRef, index_handle: I, ) -> anyhow::Result { - let path = path - .as_ref() - .to_owned() - .join("updates"); + let path = path.as_ref().to_owned().join("updates"); std::fs::create_dir_all(&path)?; @@ -81,11 +78,11 @@ where Some(Delete { uuid, ret }) => { let _ = ret.send(self.handle_delete(uuid).await); } - Some(Snapshot { uuid, path, ret }) => { - let _ = ret.send(self.handle_snapshot(uuid, path).await); + Some(Snapshot { uuids, path, ret }) => { + let _ = ret.send(self.handle_snapshot(uuids, path).await); } - Some(GetSize { uuid, ret }) => { - let _ = ret.send(self.handle_get_size(uuid).await); + Some(GetSize { ret }) => { + let _ = ret.send(self.handle_get_size().await); } None => break, } @@ -200,7 +197,7 @@ where Ok(()) } - async fn handle_snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { + async fn handle_snapshot(&self, uuids: Vec, path: PathBuf) -> Result<()> { let index_handle = self.index_handle.clone(); let update_store = self.store.clone(); tokio::task::spawn_blocking(move || -> anyhow::Result<()> { @@ -210,32 +207,41 @@ where let mut txn = update_store.env.write_txn()?; // create db snapshot - update_store.snapshot(&mut txn, &path, uuid)?; + update_store.snapshot(&mut txn, &path)?; - futures::executor::block_on( - async move { index_handle.snapshot(uuid, path).await }, - )?; - Ok(()) + // Perform the snapshot of each index concurently. Only a third of the capabilities of + // the index actor at a time not to put too much pressure on the index actor + let path = &path; + let handle = &index_handle; + + let mut stream = futures::stream::iter(uuids.iter()) + .map(|&uuid| handle.snapshot(uuid, path.clone())) + .buffer_unordered(CONCURRENT_INDEX_MSG / 3); + + futures::executor::block_on(async { + while let Some(res) = stream.next().await { + res?; + } + Ok(()) + }) }) .await - .map_err(|e| UpdateError::Error(e.into()))? - .map_err(|e| UpdateError::Error(e.into()))?; + .map_err(|e| UpdateError::Error(e.into()))? + .map_err(|e| UpdateError::Error(e.into()))?; Ok(()) } - async fn handle_get_size(&self, uuid: Uuid) -> Result { - let size = match self.store.get(uuid).await? { - Some(update_store) => tokio::task::spawn_blocking(move || -> anyhow::Result { - let txn = update_store.env.read_txn()?; + async fn handle_get_size(&self) -> Result { + let update_store = self.store.clone(); + let size = tokio::task::spawn_blocking(move || -> anyhow::Result { + let txn = update_store.env.read_txn()?; - update_store.get_size(&txn) - }) - .await - .map_err(|e| UpdateError::Error(e.into()))? - .map_err(|e| UpdateError::Error(e.into()))?, - None => 0, - }; + update_store.get_size(&txn) + }) + .await + .map_err(|e| UpdateError::Error(e.into()))? + .map_err(|e| UpdateError::Error(e.into()))?; Ok(size) } diff --git a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs index bbb52d17c..5c5a3e051 100644 --- a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs @@ -6,8 +6,7 @@ use uuid::Uuid; use crate::index_controller::IndexActorHandle; use super::{ - PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta, - UpdateMsg, UpdateStatus, + PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta, UpdateMsg, UpdateStatus, }; #[derive(Clone)] @@ -27,7 +26,7 @@ where where I: IndexActorHandle + Clone + Send + Sync + 'static, { - let path = path.as_ref().to_owned().join("updates"); + let path = path.as_ref().to_owned(); let (sender, receiver) = mpsc::channel(100); let actor = UpdateActor::new(update_store_size, receiver, path, index_handle)?; @@ -64,16 +63,16 @@ where receiver.await.expect("update actor killed.") } - async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { + async fn snapshot(&self, uuids: Vec, path: PathBuf) -> Result<()> { let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::Snapshot { uuid, path, ret }; + let msg = UpdateMsg::Snapshot { uuids, path, ret }; let _ = self.sender.send(msg).await; receiver.await.expect("update actor killed.") } - async fn get_size(&self, uuid: Uuid) -> Result { + async fn get_size(&self) -> Result { let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::GetSize { uuid, ret }; + let msg = UpdateMsg::GetSize { ret }; let _ = self.sender.send(msg).await; receiver.await.expect("update actor killed.") } diff --git a/meilisearch-http/src/index_controller/update_actor/message.rs b/meilisearch-http/src/index_controller/update_actor/message.rs index c863c803e..0f0005862 100644 --- a/meilisearch-http/src/index_controller/update_actor/message.rs +++ b/meilisearch-http/src/index_controller/update_actor/message.rs @@ -26,12 +26,11 @@ pub enum UpdateMsg { ret: oneshot::Sender>, }, Snapshot { - uuid: Uuid, + uuids: Vec, path: PathBuf, ret: oneshot::Sender>, }, GetSize { - uuid: Uuid, ret: oneshot::Sender>, }, } diff --git a/meilisearch-http/src/index_controller/update_actor/mod.rs b/meilisearch-http/src/index_controller/update_actor/mod.rs index 1c9461f6a..0f815dbf3 100644 --- a/meilisearch-http/src/index_controller/update_actor/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/mod.rs @@ -40,8 +40,8 @@ pub trait UpdateActorHandle { async fn get_all_updates_status(&self, uuid: Uuid) -> Result>; async fn update_status(&self, uuid: Uuid, id: u64) -> Result; async fn delete(&self, uuid: Uuid) -> Result<()>; - async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()>; - async fn get_size(&self, uuid: Uuid) -> Result; + async fn snapshot(&self, uuids: Vec, path: PathBuf) -> Result<()>; + async fn get_size(&self) -> Result; async fn update( &self, meta: UpdateMeta, diff --git a/meilisearch-http/src/index_controller/update_actor/update_store.rs b/meilisearch-http/src/index_controller/update_actor/update_store.rs index a8f5fe800..79693ba1f 100644 --- a/meilisearch-http/src/index_controller/update_actor/update_store.rs +++ b/meilisearch-http/src/index_controller/update_actor/update_store.rs @@ -5,6 +5,7 @@ use std::mem::size_of; use std::path::{Path, PathBuf}; use std::sync::Arc; +use anyhow::Context; use bytemuck::{Pod, Zeroable}; use heed::types::{ByteSlice, DecodeIgnore, SerdeJson}; use heed::{BytesDecode, BytesEncode, CompactionOption, Database, Env, EnvOpenOptions}; @@ -106,7 +107,7 @@ where mut options: EnvOpenOptions, path: P, update_handler: U, - ) -> heed::Result> + ) -> anyhow::Result> where P: AsRef, U: HandleUpdate + Sync + Clone + Send + 'static, @@ -127,6 +128,11 @@ where let update_lock = Arc::new(Mutex::new(())); + // Init update loop to perform any pending updates at launch. + // Since we just launched the update store, and we still own the receiving end of the + // channel, this call is guarenteed to succeed. + notification_sender.try_send(()).expect("Failed to init update store"); + let update_store = Arc::new(UpdateStore { env, pending, @@ -277,8 +283,11 @@ where // to the update handler. Processing store is non persistent to be able recover // from a failure let processing = pending.processing(); - self.processing.write().replace((index_uuid, processing.clone())); - let file = File::open(&content_path)?; + self.processing + .write() + .replace((index_uuid, processing.clone())); + let file = File::open(&content_path) + .with_context(|| format!("file at path: {:?}", &content_path))?; // Process the pending update using the provided user function. let result = handler.handle_update(index_uuid, processing, file)?; drop(rtxn); @@ -521,9 +530,10 @@ where fn delete_all( txn: &mut heed::RwTxn, uuid: Uuid, - db: Database + db: Database, ) -> anyhow::Result<()> - where A: for<'a> heed::BytesDecode<'a> + where + A: for<'a> heed::BytesDecode<'a>, { let mut iter = db.prefix_iter_mut(txn, uuid.as_bytes())?; while let Some(_) = iter.next() { @@ -553,19 +563,17 @@ where &self, txn: &mut heed::RwTxn, path: impl AsRef, - uuid: Uuid, ) -> anyhow::Result<()> { let update_path = path.as_ref().join("updates"); create_dir_all(&update_path)?; - let mut snapshot_path = update_path.join(format!("update-{}", uuid)); // acquire write lock to prevent further writes during snapshot - create_dir_all(&snapshot_path)?; - snapshot_path.push("data.mdb"); + create_dir_all(&update_path)?; + let db_path = update_path.join("data.mdb"); // create db snapshot self.env - .copy_to_path(&snapshot_path, CompactionOption::Enabled)?; + .copy_to_path(&db_path, CompactionOption::Enabled)?; let update_files_path = update_path.join("update_files"); create_dir_all(&update_files_path)?; diff --git a/meilisearch-http/src/routes/document.rs b/meilisearch-http/src/routes/document.rs index ed5d88230..357a2b16a 100644 --- a/meilisearch-http/src/routes/document.rs +++ b/meilisearch-http/src/routes/document.rs @@ -84,9 +84,9 @@ async fn delete_document( .delete_documents(path.index_uid.clone(), vec![path.document_id.clone()]) .await { - Ok(update_status) => { - Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) - } + Ok(update_status) => Ok( + HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })) + ), Err(e) => { Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() }))) } @@ -163,9 +163,9 @@ async fn add_documents( .await; match addition_result { - Ok(update_status) => { - Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) - } + Ok(update_status) => Ok( + HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })) + ), Err(e) => { Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() }))) } @@ -242,9 +242,9 @@ async fn delete_documents( .collect(); match data.delete_documents(path.index_uid.clone(), ids).await { - Ok(update_status) => { - Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) - } + Ok(update_status) => Ok( + HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })) + ), Err(e) => { Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() }))) } @@ -258,9 +258,9 @@ async fn clear_all_documents( path: web::Path, ) -> Result { match data.clear_documents(path.index_uid.clone()).await { - Ok(update_status) => { - Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() }))) - } + Ok(update_status) => Ok( + HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_status.id() })) + ), Err(e) => { Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() }))) } diff --git a/meilisearch-http/src/routes/settings/mod.rs b/meilisearch-http/src/routes/settings/mod.rs index 54cc53d7e..8ca51dbf5 100644 --- a/meilisearch-http/src/routes/settings/mod.rs +++ b/meilisearch-http/src/routes/settings/mod.rs @@ -143,9 +143,9 @@ async fn update_all( .update_settings(index_uid.into_inner(), body.into_inner(), true) .await { - Ok(update_result) => { - Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_result.id() }))) - } + Ok(update_result) => Ok( + HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_result.id() })) + ), Err(e) => { Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() }))) } @@ -175,9 +175,9 @@ async fn delete_all( .update_settings(index_uid.into_inner(), settings, false) .await { - Ok(update_result) => { - Ok(HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_result.id() }))) - } + Ok(update_result) => Ok( + HttpResponse::Accepted().json(serde_json::json!({ "updateId": update_result.id() })) + ), Err(e) => { Ok(HttpResponse::BadRequest().json(serde_json::json!({ "error": e.to_string() }))) } diff --git a/meilisearch-http/tests/settings/get_settings.rs b/meilisearch-http/tests/settings/get_settings.rs index 3412f45af..4230e19f8 100644 --- a/meilisearch-http/tests/settings/get_settings.rs +++ b/meilisearch-http/tests/settings/get_settings.rs @@ -23,13 +23,7 @@ async fn get_settings() { assert_eq!(settings["distinctAttribute"], json!(null)); assert_eq!( settings["rankingRules"], - json!([ - "words", - "typo", - "proximity", - "attribute", - "exactness" - ]) + json!(["words", "typo", "proximity", "attribute", "exactness"]) ); assert_eq!(settings["stopWords"], json!([])); }