diff --git a/meilisearch-http/src/index_controller/index_actor.rs b/meilisearch-http/src/index_controller/index_actor.rs index cc6d67528..9c8f58a62 100644 --- a/meilisearch-http/src/index_controller/index_actor.rs +++ b/meilisearch-http/src/index_controller/index_actor.rs @@ -8,7 +8,7 @@ use async_stream::stream; use chrono::{DateTime, Utc}; use futures::pin_mut; use futures::stream::StreamExt; -use heed::EnvOpenOptions; +use heed::{CompactionOption, EnvOpenOptions}; use log::debug; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -103,6 +103,11 @@ enum IndexMsg { index_settings: IndexSettings, ret: oneshot::Sender>, }, + Snapshot { + uuids: Vec, + path: PathBuf, + ret: oneshot::Sender>, + } } struct IndexActor { @@ -251,6 +256,9 @@ impl IndexActor { } => { let _ = ret.send(self.handle_update_index(uuid, index_settings).await); } + Snapshot { uuids, path, ret } => { + let _ = ret.send(self.handle_snapshot(uuids, path).await); + } } } @@ -403,6 +411,39 @@ impl IndexActor { .await .map_err(|e| IndexError::Error(e.into()))? } + + async fn handle_snapshot(&self, uuids: Vec, mut path: PathBuf) -> Result<()> { + use tokio::fs::create_dir_all; + + path.push("indexes"); + println!("performing index snapshot in {:?}", path); + create_dir_all(&path) + .await + .map_err(|e| IndexError::Error(e.into()))?; + + let mut handles = Vec::new(); + for uuid in uuids { + if let Some(index) = self.store.get(uuid).await? { + let index_path = path.join(format!("index-{}", uuid)); + let handle = spawn_blocking(move || -> anyhow::Result<()> { + // Get write txn to wait for ongoing write transaction before snapshot. + let _txn = index.write_txn()?; + index.env.copy_to_path(index_path, CompactionOption::Enabled)?; + Ok(()) + }); + handles.push(handle); + } + } + + for handle in handles { + handle + .await + .map_err(|e| IndexError::Error(e.into()))? + .map_err(|e| IndexError::Error(e.into()))?; + } + + Ok(()) + } } #[derive(Clone)] @@ -525,6 +566,17 @@ impl IndexActorHandle { let _ = self.read_sender.send(msg).await; Ok(receiver.await.expect("IndexActor has been killed")?) } + + pub async fn snapshot(&self, uuids: Vec, path: PathBuf) -> Result<()> { + let (ret, receiver) = oneshot::channel(); + let msg = IndexMsg::Snapshot { + uuids, + path, + ret, + }; + let _ = self.read_sender.send(msg).await; + Ok(receiver.await.expect("IndexActor has been killed")?) + } } struct HeedIndexStore { diff --git a/meilisearch-http/src/index_controller/snapshot.rs b/meilisearch-http/src/index_controller/snapshot.rs index 6d77941bb..85b39f506 100644 --- a/meilisearch-http/src/index_controller/snapshot.rs +++ b/meilisearch-http/src/index_controller/snapshot.rs @@ -1,7 +1,9 @@ use std::path::PathBuf; use std::time::Duration; +use std::fs::create_dir_all; use tokio::time::interval; +use uuid::Uuid; use super::index_actor::IndexActorHandle; use super::update_actor::UpdateActorHandle; @@ -38,11 +40,20 @@ impl SnapshotService { loop { interval.tick().await; - self.perform_snapshot().await; + self.perform_snapshot().await.unwrap(); } } - async fn perform_snapshot(&self) { - println!("performing snapshot in {:?}", self.snapshot_path); + async fn perform_snapshot(&self) -> anyhow::Result<()> { + let temp_snapshot_path = self + .snapshot_path + .join(format!("tmp-{}", Uuid::new_v4())); + create_dir_all(&temp_snapshot_path)?; + let uuids = self.uuid_resolver_handle.snapshot(temp_snapshot_path.clone()).await?; + let index_snapshot = self.index_handle.snapshot(uuids.clone(), temp_snapshot_path.clone()); + let updates_snapshot = self.update_handle.snapshot(uuids.clone(), temp_snapshot_path.clone()); + let (first, second) = tokio::join!(updates_snapshot, index_snapshot); + println!("results: {:?}, {:?}", first, second); + Ok(()) } } diff --git a/meilisearch-http/src/index_controller/update_actor.rs b/meilisearch-http/src/index_controller/update_actor.rs index abf2ab8bc..6caba133b 100644 --- a/meilisearch-http/src/index_controller/update_actor.rs +++ b/meilisearch-http/src/index_controller/update_actor.rs @@ -55,6 +55,11 @@ enum UpdateMsg { uuid: Uuid, ret: oneshot::Sender>, }, + Snapshot { + uuids: Vec, + path: PathBuf, + ret: oneshot::Sender>, + } } struct UpdateActor { @@ -113,6 +118,9 @@ where Some(Create { uuid, ret }) => { let _ = ret.send(self.handle_create(uuid).await); } + Some(Snapshot { uuids, path, ret }) => { + let _ = ret.send(self.handle_snapshot(uuids, path).await); + } None => break, } } @@ -232,6 +240,16 @@ where let _ = self.store.get_or_create(uuid).await?; Ok(()) } + + async fn handle_snapshot(&self, uuids: Vec, path: PathBuf) -> Result<()> { + use tokio::time; + use std::time::Duration; + + println!("performing update snapshot"); + time::sleep(Duration::from_secs(2)).await; + println!("Update snapshot done"); + Ok(()) + } } #[derive(Clone)] @@ -274,7 +292,9 @@ where let _ = self.sender.send(msg).await; receiver.await.expect("update actor killed.") } +} +impl UpdateActorHandle { pub async fn get_all_updates_status(&self, uuid: Uuid) -> Result> { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::ListUpdates { uuid, ret }; @@ -302,6 +322,13 @@ where let _ = self.sender.send(msg).await; receiver.await.expect("update actor killed.") } + + pub async fn snapshot(&self, uuids: Vec, path: PathBuf) -> Result<()> { + let (ret, receiver) = oneshot::channel(); + let msg = UpdateMsg::Snapshot { uuids, path, ret }; + let _ = self.sender.send(msg).await; + receiver.await.expect("update actor killed.") + } } struct MapUpdateStoreStore { diff --git a/meilisearch-http/src/index_controller/uuid_resolver.rs b/meilisearch-http/src/index_controller/uuid_resolver.rs index 2ee9c6b17..c31d776b3 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver.rs @@ -1,4 +1,5 @@ -use std::{fs::create_dir_all, path::Path}; +use std::fs::create_dir_all; +use std::path::{Path, PathBuf}; use heed::{ types::{ByteSlice, Str}, @@ -8,6 +9,7 @@ use log::{info, warn}; use thiserror::Error; use tokio::sync::{mpsc, oneshot}; use uuid::Uuid; +use heed::CompactionOption; pub type Result = std::result::Result; @@ -32,6 +34,10 @@ enum UuidResolveMsg { List { ret: oneshot::Sender>>, }, + SnapshotRequest { + path: PathBuf, + ret: oneshot::Sender>>, + }, } struct UuidResolverActor { @@ -66,6 +72,9 @@ impl UuidResolverActor { Some(List { ret }) => { let _ = ret.send(self.handle_list().await); } + Some(SnapshotRequest { path, ret }) => { + let _ = ret.send(self.handle_snapshot(path).await); + } // all senders have been dropped, need to quit. None => break, } @@ -106,6 +115,10 @@ impl UuidResolverActor { let result = self.store.list().await?; Ok(result) } + + async fn handle_snapshot(&self, path: PathBuf) -> Result> { + self.store.snapshot(path).await + } } fn is_index_uid_valid(uid: &str) -> bool { @@ -171,6 +184,15 @@ impl UuidResolverHandle { .await .expect("Uuid resolver actor has been killed")?) } + + pub async fn snapshot(&self, path: PathBuf) -> Result> { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::SnapshotRequest { path, ret }; + let _ = self.sender.send(msg).await; + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) + } } #[derive(Debug, Error)] @@ -197,6 +219,7 @@ trait UuidStore { async fn get_uuid(&self, uid: String) -> Result>; async fn delete(&self, uid: String) -> Result>; async fn list(&self) -> Result>; + async fn snapshot(&self, path: PathBuf) -> Result>; } struct HeedUuidStore { @@ -242,7 +265,6 @@ impl UuidStore for HeedUuidStore { }) .await? } - async fn get_uuid(&self, name: String) -> Result> { let env = self.env.clone(); let db = self.db; @@ -292,4 +314,23 @@ impl UuidStore for HeedUuidStore { }) .await? } + + async fn snapshot(&self, mut path: PathBuf) -> Result> { + let env = self.env.clone(); + let db = self.db; + tokio::task::spawn_blocking(move || { + // Write transaction to acquire a lock on the database. + let txn = env.write_txn()?; + let mut entries = Vec::new(); + for entry in db.iter(&txn)? { + let (_, uuid) = entry?; + let uuid = Uuid::from_slice(uuid)?; + entries.push(uuid) + } + path.push("uuids"); + env.copy_to_path(path, CompactionOption::Enabled)?; + Ok(entries) + }) + .await? + } }