diff --git a/meilisearch-http/src/index_controller/index_actor.rs b/meilisearch-http/src/index_controller/index_actor.rs index 9c8f58a62..bfb7127f3 100644 --- a/meilisearch-http/src/index_controller/index_actor.rs +++ b/meilisearch-http/src/index_controller/index_actor.rs @@ -104,7 +104,7 @@ enum IndexMsg { ret: oneshot::Sender>, }, Snapshot { - uuids: Vec, + uuid: Uuid, path: PathBuf, ret: oneshot::Sender>, } @@ -256,8 +256,8 @@ impl IndexActor { } => { let _ = ret.send(self.handle_update_index(uuid, index_settings).await); } - Snapshot { uuids, path, ret } => { - let _ = ret.send(self.handle_snapshot(uuids, path).await); + Snapshot { uuid, path, ret } => { + let _ = ret.send(self.handle_snapshot(uuid, path).await); } } } @@ -412,7 +412,7 @@ impl IndexActor { .map_err(|e| IndexError::Error(e.into()))? } - async fn handle_snapshot(&self, uuids: Vec, mut path: PathBuf) -> Result<()> { + async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> Result<()> { use tokio::fs::create_dir_all; path.push("indexes"); @@ -421,25 +421,14 @@ impl IndexActor { .await .map_err(|e| IndexError::Error(e.into()))?; - let mut handles = Vec::new(); - for uuid in uuids { - if let Some(index) = self.store.get(uuid).await? { - let index_path = path.join(format!("index-{}", uuid)); - let handle = spawn_blocking(move || -> anyhow::Result<()> { - // Get write txn to wait for ongoing write transaction before snapshot. - let _txn = index.write_txn()?; - index.env.copy_to_path(index_path, CompactionOption::Enabled)?; - Ok(()) - }); - handles.push(handle); - } - } - - for handle in handles { - handle - .await - .map_err(|e| IndexError::Error(e.into()))? - .map_err(|e| IndexError::Error(e.into()))?; + if let Some(index) = self.store.get(uuid).await? { + let index_path = path.join(format!("index-{}", uuid)); + spawn_blocking(move || -> anyhow::Result<()> { + // Get write txn to wait for ongoing write transaction before snapshot. + let _txn = index.write_txn()?; + index.env.copy_to_path(index_path, CompactionOption::Enabled)?; + Ok(()) + }); } Ok(()) @@ -567,10 +556,10 @@ impl IndexActorHandle { Ok(receiver.await.expect("IndexActor has been killed")?) } - pub async fn snapshot(&self, uuids: Vec, path: PathBuf) -> Result<()> { + pub async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { let (ret, receiver) = oneshot::channel(); let msg = IndexMsg::Snapshot { - uuids, + uuid, path, ret, }; diff --git a/meilisearch-http/src/index_controller/snapshot.rs b/meilisearch-http/src/index_controller/snapshot.rs index 85b39f506..75f6c1f82 100644 --- a/meilisearch-http/src/index_controller/snapshot.rs +++ b/meilisearch-http/src/index_controller/snapshot.rs @@ -50,10 +50,9 @@ impl SnapshotService { .join(format!("tmp-{}", Uuid::new_v4())); create_dir_all(&temp_snapshot_path)?; let uuids = self.uuid_resolver_handle.snapshot(temp_snapshot_path.clone()).await?; - let index_snapshot = self.index_handle.snapshot(uuids.clone(), temp_snapshot_path.clone()); - let updates_snapshot = self.update_handle.snapshot(uuids.clone(), temp_snapshot_path.clone()); - let (first, second) = tokio::join!(updates_snapshot, index_snapshot); - println!("results: {:?}, {:?}", first, second); + for uuid in uuids { + self.update_handle.snapshot(uuid, temp_snapshot_path.clone()).await?; + } Ok(()) } } diff --git a/meilisearch-http/src/index_controller/update_actor.rs b/meilisearch-http/src/index_controller/update_actor.rs index 6caba133b..6e017dcf5 100644 --- a/meilisearch-http/src/index_controller/update_actor.rs +++ b/meilisearch-http/src/index_controller/update_actor.rs @@ -1,15 +1,16 @@ use std::collections::{hash_map::Entry, HashMap}; -use std::io::SeekFrom; use std::fs::{create_dir_all, remove_dir_all}; +use std::io::SeekFrom; use std::path::{Path, PathBuf}; use std::sync::Arc; +use super::index_actor::IndexActorHandle; +use heed::CompactionOption; use log::info; use oxidized_json_checker::JsonChecker; -use super::index_actor::IndexActorHandle; use thiserror::Error; use tokio::fs::OpenOptions; -use tokio::io::{AsyncWriteExt, AsyncSeekExt}; +use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use tokio::sync::{mpsc, oneshot, RwLock}; use uuid::Uuid; @@ -56,16 +57,17 @@ enum UpdateMsg { ret: oneshot::Sender>, }, Snapshot { - uuids: Vec, + uuid: Uuid, path: PathBuf, ret: oneshot::Sender>, - } + }, } struct UpdateActor { path: PathBuf, store: S, inbox: mpsc::Receiver>, + index_handle: IndexActorHandle, } #[async_trait::async_trait] @@ -84,11 +86,17 @@ where store: S, inbox: mpsc::Receiver>, path: impl AsRef, + index_handle: IndexActorHandle, ) -> anyhow::Result { - let path = path.as_ref().to_owned().join("update_files"); - create_dir_all(&path)?; + let path = path.as_ref().to_owned(); + create_dir_all(path.join("update_files"))?; assert!(path.exists()); - Ok(Self { store, inbox, path }) + Ok(Self { + store, + inbox, + path, + index_handle, + }) } async fn run(mut self) { @@ -118,8 +126,8 @@ where Some(Create { uuid, ret }) => { let _ = ret.send(self.handle_create(uuid).await); } - Some(Snapshot { uuids, path, ret }) => { - let _ = ret.send(self.handle_snapshot(uuids, path).await); + Some(Snapshot { uuid, path, ret }) => { + let _ = ret.send(self.handle_snapshot(uuid, path).await); } None => break, } @@ -134,7 +142,9 @@ where ) -> Result { let update_store = self.store.get_or_create(uuid).await?; let update_file_id = uuid::Uuid::new_v4(); - let path = self.path.join(format!("update_{}", update_file_id)); + let path = self + .path + .join(format!("update_files/update_{}", update_file_id)); let mut file = OpenOptions::new() .read(true) .write(true) @@ -167,10 +177,15 @@ where let mut file = file.into_std().await; tokio::task::spawn_blocking(move || { - use std::io::{BufReader, sink, copy, Seek}; + use std::io::{copy, sink, BufReader, Seek}; // If the payload is empty, ignore the check. - if file.metadata().map_err(|e| UpdateError::Error(Box::new(e)))?.len() > 0 { + if file + .metadata() + .map_err(|e| UpdateError::Error(Box::new(e)))? + .len() + > 0 + { // Check that the json payload is valid: let reader = BufReader::new(&mut file); let mut checker = JsonChecker::new(reader); @@ -241,13 +256,32 @@ where Ok(()) } - async fn handle_snapshot(&self, uuids: Vec, path: PathBuf) -> Result<()> { - use tokio::time; - use std::time::Duration; + async fn handle_snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { + use tokio::fs; + + let update_path = path.join("updates"); + fs::create_dir_all(&update_path) + .await + .map_err(|e| UpdateError::Error(e.into()))?; + + let index_handle = self.index_handle.clone(); + if let Some(update_store) = self.store.get(uuid).await? { + let snapshot_path = update_path.join(format!("update-{}", uuid)); + tokio::task::spawn_blocking(move || -> anyhow::Result<()> { + let _txn = update_store.env.write_txn()?; + update_store + .env + .copy_to_path(&snapshot_path, CompactionOption::Enabled)?; + futures::executor::block_on( + async move { index_handle.snapshot(uuid, path).await }, + )?; + Ok(()) + }) + .await + .map_err(|e| UpdateError::Error(e.into()))? + .map_err(|e| UpdateError::Error(e.into()))?; + } - println!("performing update snapshot"); - time::sleep(Duration::from_secs(2)).await; - println!("Update snapshot done"); Ok(()) } } @@ -268,8 +302,8 @@ where ) -> anyhow::Result { let path = path.as_ref().to_owned().join("updates"); let (sender, receiver) = mpsc::channel(100); - let store = MapUpdateStoreStore::new(index_handle, &path, update_store_size); - let actor = UpdateActor::new(store, receiver, path)?; + let store = MapUpdateStoreStore::new(index_handle.clone(), &path, update_store_size); + let actor = UpdateActor::new(store, receiver, path, index_handle)?; tokio::task::spawn(actor.run()); @@ -323,9 +357,9 @@ impl UpdateActorHandle { receiver.await.expect("update actor killed.") } - pub async fn snapshot(&self, uuids: Vec, path: PathBuf) -> Result<()> { + pub async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> { let (ret, receiver) = oneshot::channel(); - let msg = UpdateMsg::Snapshot { uuids, path, ret }; + let msg = UpdateMsg::Snapshot { uuid, path, ret }; let _ = self.sender.send(msg).await; receiver.await.expect("update actor killed.") } diff --git a/meilisearch-http/src/index_controller/update_store.rs b/meilisearch-http/src/index_controller/update_store.rs index 6de30ab7f..5280ed94e 100644 --- a/meilisearch-http/src/index_controller/update_store.rs +++ b/meilisearch-http/src/index_controller/update_store.rs @@ -16,7 +16,7 @@ type BEU64 = heed::zerocopy::U64; #[derive(Clone)] pub struct UpdateStore { - env: Env, + pub env: Env, pending_meta: Database, SerdeJson>>, pending: Database, SerdeJson>, processed_meta: Database, SerdeJson>>,