restore snapshots

This commit is contained in:
mpostma 2021-09-27 16:48:03 +02:00
parent b9d189bf12
commit 90018755c5
15 changed files with 397 additions and 317 deletions

View File

@ -1,4 +1,4 @@
pub mod compression;
//pub mod compression;
mod env;
pub use env::EnvSizer;

View File

@ -1,4 +1,4 @@
use std::env;
use std::{env, path::Path, time::Duration};
use actix_web::HttpServer;
use meilisearch_http::{create_app, Opt};
@ -12,6 +12,7 @@ use meilisearch_http::analytics;
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
/// does all the setup before meilisearch is launched
fn setup(opt: &Opt) -> anyhow::Result<()> {
let mut log_builder = env_logger::Builder::new();
log_builder.parse_filters(&opt.log_level);
@ -22,12 +23,19 @@ fn setup(opt: &Opt) -> anyhow::Result<()> {
log_builder.init();
Ok(())
}
/// Cleans and setup the temporary file folder in the database directory. This must be done after
/// the meilisearch instance has been created, to not interfere with the snapshot and dump loading.
fn setup_temp_dir(db_path: impl AsRef<Path>) -> anyhow::Result<()> {
// Set the tempfile directory in the current db path, to avoid cross device references. Also
// remove the previous outstanding files found there
//
// TODO: if two processes open the same db, one might delete the other tmpdir. Need to make
// sure that no one is using it before deleting it.
let temp_path = opt.db_path.join("tmp");
let temp_path = db_path.as_ref().join("tmp");
// Ignore error if tempdir doesn't exist
let _ = std::fs::remove_dir_all(&temp_path);
std::fs::create_dir_all(&temp_path)?;
@ -48,15 +56,21 @@ fn setup_meilisearch(opt: &Opt) -> anyhow::Result<MeiliSearch> {
.set_ignore_missing_snapshot(opt.ignore_missing_snapshot)
.set_ignore_snapshot_if_db_exists(opt.ignore_snapshot_if_db_exists)
.set_dump_dst(opt.dumps_dir.clone())
.set_snapshot_interval(Duration::from_secs(opt.snapshot_interval_sec))
.set_snapshot_dir(opt.snapshot_dir.clone());
if let Some(ref path) = opt.import_snapshot {
meilisearch.set_import_snapshot(path.clone());
}
if let Some(ref path) = opt.import_dump {
meilisearch.set_dump_src(path.clone());
}
if opt.schedule_snapshot {
meilisearch.set_schedule_snapshot();
}
meilisearch.build(opt.db_path.clone(), opt.indexer_options.clone())
}
@ -78,6 +92,8 @@ async fn main() -> anyhow::Result<()> {
let meilisearch = setup_meilisearch(&opt)?;
setup_temp_dir(&opt.db_path)?;
#[cfg(all(not(debug_assertions), feature = "analytics"))]
if !opt.no_analytics {
let analytics_data = meilisearch.clone();

View File

@ -16,11 +16,11 @@ pub fn to_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Resul
Ok(())
}
pub fn from_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
let f = File::open(&src)?;
let gz = GzDecoder::new(f);
let mut ar = Archive::new(gz);
create_dir_all(&dest)?;
ar.unpack(&dest)?;
Ok(())
}
//pub fn from_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
//let f = File::open(&src)?;
//let gz = GzDecoder::new(f);
//let mut ar = Archive::new(gz);
//create_dir_all(&dest)?;
//ar.unpack(&dest)?;
//Ok(())
//}

View File

@ -71,11 +71,15 @@ impl IndexMeta {
}
}
#[derive(Clone)]
#[derive(Clone, derivative::Derivative)]
#[derivative(Debug)]
pub struct Index {
pub uuid: Uuid,
#[derivative(Debug="ignore")]
pub inner: Arc<milli::Index>,
#[derivative(Debug="ignore")]
update_file_store: Arc<UpdateFileStore>,
#[derivative(Debug="ignore")]
update_handler: Arc<UpdateHandler>,
}
@ -258,4 +262,13 @@ impl Index {
displayed_fields_ids.retain(|fid| attributes_to_retrieve_ids.contains(fid));
Ok(displayed_fields_ids)
}
pub fn snapshot(&self, path: impl AsRef<Path>) -> Result<()> {
let mut dst = path.as_ref().join(format!("indexes/{}/", self.uuid));
create_dir_all(&dst)?;
dst.push("data.mdb");
let _txn = self.write_txn()?;
self.inner.env.copy_to_path(dst, heed::CompactionOption::Enabled)?;
Ok(())
}
}

View File

@ -57,7 +57,7 @@ impl IndexStore for MapIndexStore {
if let Some(index) = lock.get(&uuid) {
return Ok(index.clone());
}
let path = self.path.join(format!("index-{}", uuid));
let path = self.path.join(format!("{}", uuid));
if path.exists() {
return Err(IndexResolverError::IndexAlreadyExists);
}
@ -92,7 +92,7 @@ impl IndexStore for MapIndexStore {
None => {
// drop the guard here so we can perform the write after without deadlocking;
drop(guard);
let path = self.path.join(format!("index-{}", uuid));
let path = self.path.join(format!("{}", uuid));
if !path.exists() {
return Ok(None);
}
@ -108,7 +108,7 @@ impl IndexStore for MapIndexStore {
}
async fn delete(&self, uuid: Uuid) -> Result<Option<Index>> {
let db_path = self.path.join(format!("index-{}", uuid));
let db_path = self.path.join(format!("{}", uuid));
fs::remove_dir_all(db_path).await?;
let index = self.index_store.write().await.remove(&uuid);
Ok(index)

View File

@ -45,10 +45,18 @@ where U: UuidStore,
pub async fn get_size(&self) -> Result<u64> {
todo!()
//Ok(self.index_store.get_size()? + self.index_uuid_store.get_size().await?)
}
pub async fn perform_snapshot(&self, _path: impl AsRef<Path>) -> Result<()> {
todo!()
pub async fn snapshot(&self, path: impl AsRef<Path>) -> Result<Vec<Index>> {
let uuids = self.index_uuid_store.snapshot(path.as_ref().to_owned()).await?;
let mut indexes = Vec::new();
for uuid in uuids {
indexes.push(self.get_index_by_uuid(uuid).await?);
}
Ok(indexes)
}
pub async fn create_index(&self, uid: String, primary_key: Option<String>) -> Result<(Uuid, Index)> {

View File

@ -46,8 +46,9 @@ impl HeedUuidStore {
create_dir_all(&path)?;
let mut options = EnvOpenOptions::new();
options.map_size(UUID_STORE_SIZE); // 1GB
options.max_dbs(1);
let env = options.open(path)?;
let db = env.create_database(None)?;
let db = env.create_database(Some("uuids"))?;
Ok(Self { env, db })
}

View File

@ -20,6 +20,7 @@ use snapshot::load_snapshot;
use crate::index::{Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked};
use crate::index_controller::index_resolver::create_index_resolver;
use crate::index_controller::snapshot::SnapshotService;
use crate::options::IndexerOpts;
use error::Result;
use crate::index::error::Result as IndexResult;
@ -75,7 +76,7 @@ pub struct IndexSettings {
#[derive(Clone)]
pub struct IndexController {
index_resolver: Arc<HardStateIndexResolver>,
update_handle: updates::UpdateSender,
update_sender: updates::UpdateSender,
dump_handle: dump_actor::DumpActorHandleImpl,
}
@ -113,8 +114,10 @@ pub struct IndexControllerBuilder {
max_update_store_size: Option<usize>,
snapshot_dir: Option<PathBuf>,
import_snapshot: Option<PathBuf>,
snapshot_interval: Option<Duration>,
ignore_snapshot_if_db_exists: bool,
ignore_missing_snapshot: bool,
schedule_snapshot: bool,
dump_src: Option<PathBuf>,
dump_dst: Option<PathBuf>,
}
@ -155,36 +158,36 @@ impl IndexControllerBuilder {
let index_resolver = Arc::new(create_index_resolver(&db_path, index_size, &indexer_options)?);
#[allow(unreachable_code)]
let update_handle = updates::create_update_handler(index_resolver.clone(), &db_path, update_store_size)?;
let update_sender = updates::create_update_handler(index_resolver.clone(), &db_path, update_store_size)?;
let dump_path = self.dump_dst.ok_or_else(|| anyhow::anyhow!("Missing dump directory path"))?;
let dump_handle = dump_actor::DumpActorHandleImpl::new(
dump_path,
index_resolver.clone(),
update_handle.clone(),
update_sender.clone(),
index_size,
update_store_size,
)?;
//if options.schedule_snapshot {
//let snapshot_service = SnapshotService::new(
//uuid_resolver.clone(),
//update_handle.clone(),
//Duration::from_secs(options.snapshot_interval_sec),
//options.snapshot_dir.clone(),
//options
//.db_path
//.file_name()
//.map(|n| n.to_owned().into_string().expect("invalid path"))
//.unwrap_or_else(|| String::from("data.ms")),
//);
if self.schedule_snapshot {
let snapshot_service = SnapshotService::new(
index_resolver.clone(),
update_sender.clone(),
self.snapshot_interval.ok_or_else(|| anyhow::anyhow!("Snapshot interval not provided."))?,
self.snapshot_dir.ok_or_else(|| anyhow::anyhow!("Snapshot path not provided."))?,
db_path
.as_ref()
.file_name()
.map(|n| n.to_owned().into_string().expect("invalid path"))
.unwrap_or_else(|| String::from("data.ms")),
);
//tokio::task::spawn(snapshot_service.run());
//}
tokio::task::spawn(snapshot_service.run());
}
Ok(IndexController {
index_resolver,
update_handle,
update_sender,
dump_handle,
})
}
@ -238,6 +241,18 @@ impl IndexControllerBuilder {
self.import_snapshot.replace(import_snapshot);
self
}
/// Set the index controller builder's snapshot interval sec.
pub fn set_snapshot_interval(&mut self, snapshot_interval: Duration) -> &mut Self {
self.snapshot_interval = Some(snapshot_interval);
self
}
/// Set the index controller builder's schedule snapshot.
pub fn set_schedule_snapshot(&mut self) -> &mut Self {
self.schedule_snapshot = true;
self
}
}
impl IndexController {
@ -248,12 +263,12 @@ impl IndexController {
pub async fn register_update(&self, uid: String, update: Update) -> Result<UpdateStatus> {
match self.index_resolver.get_uuid(uid).await {
Ok(uuid) => {
let update_result = UpdateMsg::update(&self.update_handle, uuid, update).await?;
let update_result = UpdateMsg::update(&self.update_sender, uuid, update).await?;
Ok(update_result)
}
Err(IndexResolverError::UnexistingIndex(name)) => {
let (uuid, _) = self.index_resolver.create_index(name, None).await?;
let update_result = UpdateMsg::update(&self.update_handle, uuid, update).await?;
let update_result = UpdateMsg::update(&self.update_sender, uuid, update).await?;
// ignore if index creation fails now, since it may already have been created
Ok(update_result)
@ -389,13 +404,13 @@ impl IndexController {
pub async fn update_status(&self, uid: String, id: u64) -> Result<UpdateStatus> {
let uuid = self.index_resolver.get_uuid(uid).await?;
let result = UpdateMsg::get_update(&self.update_handle, uuid, id).await?;
let result = UpdateMsg::get_update(&self.update_sender, uuid, id).await?;
Ok(result)
}
pub async fn all_update_status(&self, uid: String) -> Result<Vec<UpdateStatus>> {
let uuid = self.index_resolver.get_uuid(uid).await?;
let result = UpdateMsg::list_updates(&self.update_handle, uuid).await?;
let result = UpdateMsg::list_updates(&self.update_sender, uuid).await?;
Ok(result)
}
@ -490,7 +505,7 @@ impl IndexController {
}
pub async fn get_index_stats(&self, uid: String) -> Result<IndexStats> {
let update_infos = UpdateMsg::get_info(&self.update_handle).await?;
let update_infos = UpdateMsg::get_info(&self.update_sender).await?;
let index = self.index_resolver.get_index(uid).await?;
let uuid = index.uuid;
let mut stats = spawn_blocking(move || index.stats()).await??;
@ -500,7 +515,7 @@ impl IndexController {
}
pub async fn get_all_stats(&self) -> Result<Stats> {
let update_infos = UpdateMsg::get_info(&self.update_handle).await?;
let update_infos = UpdateMsg::get_info(&self.update_sender).await?;
let mut database_size = self.get_uuids_size().await? + update_infos.size;
let mut last_update: Option<DateTime<_>> = None;
let mut indexes = BTreeMap::new();

View File

@ -1,88 +1,94 @@
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use anyhow::bail;
use log::{error, info, trace};
use tokio::task::spawn_blocking;
use tokio::time::sleep;
use tokio::fs;
//pub struct SnapshotService<U, R> {
//uuid_resolver_handle: R,
//update_handle: U,
//snapshot_period: Duration,
//snapshot_path: PathBuf,
//db_name: String,
//}
use crate::index_controller::updates::UpdateMsg;
//impl<U, R> SnapshotService<U, R>
//where
//U: UpdateActorHandle,
//R: UuidResolverHandle,
//{
//pub fn new(
//uuid_resolver_handle: R,
//update_handle: U,
//snapshot_period: Duration,
//snapshot_path: PathBuf,
//db_name: String,
//) -> Self {
//Self {
//uuid_resolver_handle,
//update_handle,
//snapshot_period,
//snapshot_path,
//db_name,
//}
//}
use super::updates::UpdateSender;
use super::index_resolver::HardStateIndexResolver;
//pub async fn run(self) {
//info!(
//"Snapshot scheduled every {}s.",
//self.snapshot_period.as_secs()
//);
//loop {
//if let Err(e) = self.perform_snapshot().await {
//error!("Error while performing snapshot: {}", e);
//}
//sleep(self.snapshot_period).await;
//}
//}
pub struct SnapshotService {
index_resolver: Arc<HardStateIndexResolver>,
update_sender: UpdateSender,
snapshot_period: Duration,
snapshot_path: PathBuf,
db_name: String,
}
//async fn perform_snapshot(&self) -> anyhow::Result<()> {
//trace!("Performing snapshot.");
impl SnapshotService {
pub fn new(
index_resolver: Arc<HardStateIndexResolver>,
update_sender: UpdateSender,
snapshot_period: Duration,
snapshot_path: PathBuf,
db_name: String,
) -> Self {
Self {
index_resolver,
update_sender,
snapshot_period,
snapshot_path,
db_name,
}
}
//let snapshot_dir = self.snapshot_path.clone();
//fs::create_dir_all(&snapshot_dir).await?;
//let temp_snapshot_dir =
//spawn_blocking(move || tempfile::tempdir_in(snapshot_dir)).await??;
//let temp_snapshot_path = temp_snapshot_dir.path().to_owned();
pub async fn run(self) {
info!(
"Snapshot scheduled every {}s.",
self.snapshot_period.as_secs()
);
loop {
if let Err(e) = self.perform_snapshot().await {
error!("Error while performing snapshot: {}", e);
}
sleep(self.snapshot_period).await;
}
}
//let uuids = self
//.uuid_resolver_handle
//.snapshot(temp_snapshot_path.clone())
//.await?;
async fn perform_snapshot(&self) -> anyhow::Result<()> {
trace!("Performing snapshot.");
//if uuids.is_empty() {
//return Ok(());
//}
let snapshot_dir = self.snapshot_path.clone();
fs::create_dir_all(&snapshot_dir).await?;
let temp_snapshot_dir =
spawn_blocking(move || tempfile::tempdir_in(snapshot_dir)).await??;
let temp_snapshot_path = temp_snapshot_dir.path().to_owned();
//self.update_handle
//.snapshot(uuids, temp_snapshot_path.clone())
//.await?;
//let snapshot_dir = self.snapshot_path.clone();
//let snapshot_path = self
//.snapshot_path
//.join(format!("{}.snapshot", self.db_name));
//let snapshot_path = spawn_blocking(move || -> anyhow::Result<PathBuf> {
//let temp_snapshot_file = tempfile::NamedTempFile::new_in(snapshot_dir)?;
//let temp_snapshot_file_path = temp_snapshot_file.path().to_owned();
//compression::to_tar_gz(temp_snapshot_path, temp_snapshot_file_path)?;
//temp_snapshot_file.persist(&snapshot_path)?;
//Ok(snapshot_path)
//})
//.await??;
let indexes = self
.index_resolver
.snapshot(temp_snapshot_path.clone())
.await?;
//trace!("Created snapshot in {:?}.", snapshot_path);
if indexes.is_empty() {
return Ok(());
}
//Ok(())
//}
//}
UpdateMsg::snapshot(&self.update_sender, temp_snapshot_path.clone(), indexes).await?;
let snapshot_dir = self.snapshot_path.clone();
let snapshot_path = self
.snapshot_path
.join(format!("{}.snapshot", self.db_name));
let snapshot_path = spawn_blocking(move || -> anyhow::Result<PathBuf> {
let temp_snapshot_file = tempfile::NamedTempFile::new_in(snapshot_dir)?;
let temp_snapshot_file_path = temp_snapshot_file.path().to_owned();
crate::compression::to_tar_gz(temp_snapshot_path, temp_snapshot_file_path)?;
temp_snapshot_file.persist(&snapshot_path)?;
Ok(snapshot_path)
})
.await??;
trace!("Created snapshot in {:?}.", snapshot_path);
Ok(())
}
}
pub fn load_snapshot(
db_path: impl AsRef<Path>,
@ -120,140 +126,140 @@ pub fn load_snapshot(
}
}
#[cfg(test)]
mod test {
use std::iter::FromIterator;
use std::{collections::HashSet, sync::Arc};
//#[cfg(test)]
//mod test {
//use std::iter::FromIterator;
//use std::{collections::HashSet, sync::Arc};
use futures::future::{err, ok};
use rand::Rng;
use tokio::time::timeout;
use uuid::Uuid;
//use futures::future::{err, ok};
//use rand::Rng;
//use tokio::time::timeout;
//use uuid::Uuid;
use super::*;
use crate::index_controller::index_actor::MockIndexActorHandle;
use crate::index_controller::updates::{
error::UpdateActorError, MockUpdateActorHandle, UpdateActorHandleImpl,
};
use crate::index_controller::uuid_resolver::{
error::UuidResolverError, MockUuidResolverHandle,
};
//use super::*;
//use crate::index_controller::index_actor::MockIndexActorHandle;
//use crate::index_controller::updates::{
//error::UpdateActorError, MockUpdateActorHandle, UpdateActorHandleImpl,
//};
//use crate::index_controller::uuid_resolver::{
//error::UuidResolverError, MockUuidResolverHandle,
//};
#[actix_rt::test]
async fn test_normal() {
let mut rng = rand::thread_rng();
let uuids_num: usize = rng.gen_range(5..10);
let uuids = (0..uuids_num)
.map(|_| Uuid::new_v4())
.collect::<HashSet<_>>();
//#[actix_rt::test]
//async fn test_normal() {
//let mut rng = rand::thread_rng();
//let uuids_num: usize = rng.gen_range(5..10);
//let uuids = (0..uuids_num)
//.map(|_| Uuid::new_v4())
//.collect::<HashSet<_>>();
let mut uuid_resolver = MockUuidResolverHandle::new();
let uuids_clone = uuids.clone();
uuid_resolver
.expect_snapshot()
.times(1)
.returning(move |_| Box::pin(ok(uuids_clone.clone())));
//let mut uuid_resolver = MockUuidResolverHandle::new();
//let uuids_clone = uuids.clone();
//uuid_resolver
//.expect_snapshot()
//.times(1)
//.returning(move |_| Box::pin(ok(uuids_clone.clone())));
let uuids_clone = uuids.clone();
let mut index_handle = MockIndexActorHandle::new();
index_handle
.expect_snapshot()
.withf(move |uuid, _path| uuids_clone.contains(uuid))
.times(uuids_num)
.returning(move |_, _| Box::pin(ok(())));
//let uuids_clone = uuids.clone();
//let mut index_handle = MockIndexActorHandle::new();
//index_handle
//.expect_snapshot()
//.withf(move |uuid, _path| uuids_clone.contains(uuid))
//.times(uuids_num)
//.returning(move |_, _| Box::pin(ok(())));
let dir = tempfile::tempdir_in(".").unwrap();
let handle = Arc::new(index_handle);
let update_handle =
UpdateActorHandleImpl::<Vec<u8>>::new(handle.clone(), dir.path(), 4096 * 100).unwrap();
//let dir = tempfile::tempdir_in(".").unwrap();
//let handle = Arc::new(index_handle);
//let update_handle =
//UpdateActorHandleImpl::<Vec<u8>>::new(handle.clone(), dir.path(), 4096 * 100).unwrap();
let snapshot_path = tempfile::tempdir_in(".").unwrap();
let snapshot_service = SnapshotService::new(
uuid_resolver,
update_handle,
Duration::from_millis(100),
snapshot_path.path().to_owned(),
"data.ms".to_string(),
);
//let snapshot_path = tempfile::tempdir_in(".").unwrap();
//let snapshot_service = SnapshotService::new(
//uuid_resolver,
//update_handle,
//Duration::from_millis(100),
//snapshot_path.path().to_owned(),
//"data.ms".to_string(),
//);
snapshot_service.perform_snapshot().await.unwrap();
}
//snapshot_service.perform_snapshot().await.unwrap();
//}
#[actix_rt::test]
async fn error_performing_uuid_snapshot() {
let mut uuid_resolver = MockUuidResolverHandle::new();
uuid_resolver
.expect_snapshot()
.times(1)
//#[actix_rt::test]
//async fn error_performing_uuid_snapshot() {
//let mut uuid_resolver = MockUuidResolverHandle::new();
//uuid_resolver
//.expect_snapshot()
//.times(1)
//abitrary error
.returning(|_| Box::pin(err(UuidResolverError::NameAlreadyExist)));
//.returning(|_| Box::pin(err(UuidResolverError::NameAlreadyExist)));
let update_handle = MockUpdateActorHandle::new();
//let update_handle = MockUpdateActorHandle::new();
let snapshot_path = tempfile::tempdir_in(".").unwrap();
let snapshot_service = SnapshotService::new(
uuid_resolver,
update_handle,
Duration::from_millis(100),
snapshot_path.path().to_owned(),
"data.ms".to_string(),
);
//let snapshot_path = tempfile::tempdir_in(".").unwrap();
//let snapshot_service = SnapshotService::new(
//uuid_resolver,
//update_handle,
//Duration::from_millis(100),
//snapshot_path.path().to_owned(),
//"data.ms".to_string(),
//);
assert!(snapshot_service.perform_snapshot().await.is_err());
//assert!(snapshot_service.perform_snapshot().await.is_err());
//Nothing was written to the file
assert!(!snapshot_path.path().join("data.ms.snapshot").exists());
}
//assert!(!snapshot_path.path().join("data.ms.snapshot").exists());
//}
#[actix_rt::test]
async fn error_performing_index_snapshot() {
let uuid = Uuid::new_v4();
let mut uuid_resolver = MockUuidResolverHandle::new();
uuid_resolver
.expect_snapshot()
.times(1)
.returning(move |_| Box::pin(ok(HashSet::from_iter(Some(uuid)))));
//#[actix_rt::test]
//async fn error_performing_index_snapshot() {
//let uuid = Uuid::new_v4();
//let mut uuid_resolver = MockUuidResolverHandle::new();
//uuid_resolver
//.expect_snapshot()
//.times(1)
//.returning(move |_| Box::pin(ok(HashSet::from_iter(Some(uuid)))));
let mut update_handle = MockUpdateActorHandle::new();
update_handle
.expect_snapshot()
//let mut update_handle = MockUpdateActorHandle::new();
//update_handle
//.expect_snapshot()
//abitrary error
.returning(|_, _| Box::pin(err(UpdateActorError::UnexistingUpdate(0))));
//.returning(|_, _| Box::pin(err(UpdateActorError::UnexistingUpdate(0))));
let snapshot_path = tempfile::tempdir_in(".").unwrap();
let snapshot_service = SnapshotService::new(
uuid_resolver,
update_handle,
Duration::from_millis(100),
snapshot_path.path().to_owned(),
"data.ms".to_string(),
);
//let snapshot_path = tempfile::tempdir_in(".").unwrap();
//let snapshot_service = SnapshotService::new(
//uuid_resolver,
//update_handle,
//Duration::from_millis(100),
//snapshot_path.path().to_owned(),
//"data.ms".to_string(),
//);
assert!(snapshot_service.perform_snapshot().await.is_err());
//assert!(snapshot_service.perform_snapshot().await.is_err());
//Nothing was written to the file
assert!(!snapshot_path.path().join("data.ms.snapshot").exists());
}
//assert!(!snapshot_path.path().join("data.ms.snapshot").exists());
//}
#[actix_rt::test]
async fn test_loop() {
let mut uuid_resolver = MockUuidResolverHandle::new();
uuid_resolver
.expect_snapshot()
//#[actix_rt::test]
//async fn test_loop() {
//let mut uuid_resolver = MockUuidResolverHandle::new();
//uuid_resolver
//.expect_snapshot()
//we expect the funtion to be called between 2 and 3 time in the given interval.
.times(2..4)
//.times(2..4)
//abitrary error, to short-circuit the function
.returning(move |_| Box::pin(err(UuidResolverError::NameAlreadyExist)));
//.returning(move |_| Box::pin(err(UuidResolverError::NameAlreadyExist)));
let update_handle = MockUpdateActorHandle::new();
//let update_handle = MockUpdateActorHandle::new();
let snapshot_path = tempfile::tempdir_in(".").unwrap();
let snapshot_service = SnapshotService::new(
uuid_resolver,
update_handle,
Duration::from_millis(100),
snapshot_path.path().to_owned(),
"data.ms".to_string(),
);
//let snapshot_path = tempfile::tempdir_in(".").unwrap();
//let snapshot_service = SnapshotService::new(
//uuid_resolver,
//update_handle,
//Duration::from_millis(100),
//snapshot_path.path().to_owned(),
//"data.ms".to_string(),
//);
let _ = timeout(Duration::from_millis(300), snapshot_service.run()).await;
}
}
//let _ = timeout(Duration::from_millis(300), snapshot_service.run()).await;
//}
//}

View File

@ -2,9 +2,13 @@ use std::fs::File;
use std::path::{Path, PathBuf};
use std::ops::{Deref, DerefMut};
//use milli::documents::DocumentBatchReader;
//use serde_json::Map;
use tempfile::NamedTempFile;
use uuid::Uuid;
const UPDATE_FILES_PATH: &str = "updates/updates_files";
use super::error::Result;
pub struct UpdateFile {
@ -14,7 +18,6 @@ pub struct UpdateFile {
impl UpdateFile {
pub fn persist(self) {
println!("persisting in {}", self.path.display());
self.file.persist(&self.path).unwrap();
}
}
@ -40,11 +43,14 @@ pub struct UpdateFileStore {
impl UpdateFileStore {
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref().join("updates/updates_files");
let path = path.as_ref().join(UPDATE_FILES_PATH);
std::fs::create_dir_all(&path).unwrap();
Ok(Self { path })
}
/// Created a new temporary update file.
///
/// A call to persist is needed to persist in the database.
pub fn new_update(&self) -> Result<(Uuid, UpdateFile)> {
let file = NamedTempFile::new().unwrap();
let uuid = Uuid::new_v4();
@ -54,10 +60,45 @@ impl UpdateFileStore {
Ok((uuid, update_file))
}
/// Returns a the file corresponding to the requested uuid.
pub fn get_update(&self, uuid: Uuid) -> Result<File> {
let path = self.path.join(uuid.to_string());
println!("reading in {}", path.display());
let file = File::open(path).unwrap();
Ok(file)
}
/// Copies the content of the update file poited to by uuid to dst directory.
pub fn snapshot(&self, uuid: Uuid, dst: impl AsRef<Path>) -> Result<()> {
let src = self.path.join(uuid.to_string());
let mut dst = dst.as_ref().join(UPDATE_FILES_PATH);
std::fs::create_dir_all(&dst).unwrap();
dst.push(uuid.to_string());
std::fs::copy(src, dst).unwrap();
Ok(())
}
/// Peform a dump of the given update file uuid into the provided snapshot path.
pub fn dump(&self, _uuid: Uuid, _snapshot_path: impl AsRef<Path>) -> Result<()> {
todo!()
//let update_file_path = self.path.join(uuid.to_string());
//let snapshot_file_path: snapshot_path.as_ref().join(format!("update_files/uuid", uuid));
//let update_file = File::open(update_file_path).unwrap();
//let mut document_reader = DocumentBatchReader::from_reader(update_file).unwrap();
//let mut document_buffer = Map::new();
//// TODO: we need to find a way to do this more efficiently. (create a custom serializer to
//// jsonl for example...)
//while let Some((index, document)) = document_reader.next_document_with_index().unwrap() {
//for (field_id, content) in document.iter() {
//let field_name = index.get_by_left(&field_id).unwrap();
//let content = serde_json::from_slice(content).unwrap();
//document_buffer.insert(field_name.to_string(), content);
//}
//}
//Ok(())
}
}

View File

@ -4,6 +4,8 @@ use std::path::PathBuf;
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
use crate::index::Index;
use super::error::Result;
use super::{Update, UpdateStatus, UpdateStoreInfo};
@ -28,7 +30,7 @@ pub enum UpdateMsg {
ret: oneshot::Sender<Result<()>>,
},
Snapshot {
uuids: HashSet<Uuid>,
indexes: Vec<Index>,
path: PathBuf,
ret: oneshot::Sender<Result<()>>,
},
@ -43,17 +45,20 @@ pub enum UpdateMsg {
}
impl UpdateMsg {
pub async fn snapshot(sender: &mpsc::Sender<Self>, path: PathBuf, indexes: Vec<Index>) -> Result<()> {
let (ret, rcv) = oneshot::channel();
let msg = Self::Snapshot { path, indexes, ret };
sender.send(msg).await?;
rcv.await?
}
pub async fn dump(
sender: &mpsc::Sender<Self>,
uuids: HashSet<Uuid>,
path: PathBuf,
) -> Result<()> {
let (ret, rcv) = oneshot::channel();
let msg = Self::Dump {
path,
uuids,
ret,
};
let msg = Self::Dump { path, uuids, ret };
sender.send(msg).await?;
rcv.await?
}
@ -63,11 +68,7 @@ impl UpdateMsg {
update: Update,
) -> Result<UpdateStatus> {
let (ret, rcv) = oneshot::channel();
let msg = Self::Update {
uuid,
update,
ret,
};
let msg = Self::Update { uuid, update, ret };
sender.send(msg).await?;
rcv.await?
}
@ -78,11 +79,7 @@ impl UpdateMsg {
id: u64,
) -> Result<UpdateStatus> {
let (ret, rcv) = oneshot::channel();
let msg = Self::GetUpdate {
uuid,
id,
ret,
};
let msg = Self::GetUpdate { uuid, id, ret };
sender.send(msg).await?;
rcv.await?
}
@ -92,21 +89,14 @@ impl UpdateMsg {
uuid: Uuid,
) -> Result<Vec<UpdateStatus>> {
let (ret, rcv) = oneshot::channel();
let msg = Self::ListUpdates {
uuid,
ret,
};
let msg = Self::ListUpdates { uuid, ret };
sender.send(msg).await?;
rcv.await?
}
pub async fn get_info(
sender: &mpsc::Sender<Self>,
) -> Result<UpdateStoreInfo> {
pub async fn get_info(sender: &mpsc::Sender<Self>) -> Result<UpdateStoreInfo> {
let (ret, rcv) = oneshot::channel();
let msg = Self::GetInfo {
ret,
};
let msg = Self::GetInfo { ret };
sender.send(msg).await?;
rcv.await?
}

View File

@ -24,7 +24,7 @@ use uuid::Uuid;
use self::error::{Result, UpdateLoopError};
pub use self::message::UpdateMsg;
use self::store::{UpdateStore, UpdateStoreInfo};
use crate::index::{Settings, Unchecked};
use crate::index::{Index, Settings, Unchecked};
use crate::index_controller::update_file_store::UpdateFileStore;
use status::UpdateStatus;
@ -123,12 +123,11 @@ impl UpdateLoop {
let must_exit = Arc::new(AtomicBool::new(false));
let store = UpdateStore::open(options, &path, index_resolver.clone(), must_exit.clone())?;
let update_file_store = UpdateFileStore::new(&path).unwrap();
let store = UpdateStore::open(options, &path, index_resolver.clone(), must_exit.clone(), update_file_store.clone())?;
let inbox = Some(inbox);
let update_file_store = UpdateFileStore::new(&path).unwrap();
Ok(Self {
store,
inbox,
@ -179,8 +178,8 @@ impl UpdateLoop {
Delete { uuid, ret } => {
let _ = ret.send(self.handle_delete(uuid).await);
}
Snapshot { uuids, path, ret } => {
let _ = ret.send(self.handle_snapshot(uuids, path).await);
Snapshot { indexes, path, ret } => {
let _ = ret.send(self.handle_snapshot(indexes, path).await);
}
GetInfo { ret } => {
let _ = ret.send(self.handle_get_info().await);
@ -270,15 +269,13 @@ impl UpdateLoop {
Ok(())
}
async fn handle_snapshot(&self, _uuids: HashSet<Uuid>,_pathh: PathBuf) -> Result<()> {
todo!()
//let index_handle = self.index_resolver.clone();
//let update_store = self.store.clone();
async fn handle_snapshot(&self, indexes: Vec<Index>, path: PathBuf) -> Result<()> {
let update_store = self.store.clone();
//tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle))
//.await??;
tokio::task::spawn_blocking(move || update_store.snapshot(indexes, path))
.await??;
//Ok(())
Ok(())
}
async fn handle_dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {

View File

@ -45,16 +45,17 @@ impl UpdateStore {
uuids: &HashSet<Uuid>,
path: impl AsRef<Path>,
) -> Result<()> {
let dump_data_path = path.as_ref().join("data.jsonl");
let mut dump_data_file = File::create(dump_data_path)?;
//let dump_data_path = path.as_ref().join("data.jsonl");
//let mut dump_data_file = File::create(dump_data_path)?;
let update_files_path = path.as_ref().join(super::UPDATE_DIR);
create_dir_all(&update_files_path)?;
//let update_files_path = path.as_ref().join(super::UPDATE_DIR);
//create_dir_all(&update_files_path)?;
self.dump_pending(txn, uuids, &mut dump_data_file, &path)?;
self.dump_completed(txn, uuids, &mut dump_data_file)?;
//self.dump_pending(txn, uuids, &mut dump_data_file, &path)?;
//self.dump_completed(txn, uuids, &mut dump_data_file)?;
Ok(())
//Ok(())
todo!()
}
fn dump_pending(

View File

@ -22,6 +22,7 @@ use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tokio::time::timeout;
use uuid::Uuid;
use rayon::prelude::*;
use codec::*;
@ -31,12 +32,11 @@ use super::status::{Enqueued, Processing};
use crate::EnvSizer;
use crate::index_controller::update_files_path;
use crate::index_controller::updates::*;
use crate::index::Index;
#[allow(clippy::upper_case_acronyms)]
type BEU64 = U64<heed::byteorder::BE>;
const UPDATE_DIR: &str = "update_files";
#[derive(Debug)]
pub struct UpdateStoreInfo {
/// Size of the update store in bytes.
@ -108,6 +108,7 @@ pub struct UpdateStore {
state: Arc<StateLock>,
/// Wake up the loop when a new event occurs.
notification_sender: mpsc::Sender<()>,
update_file_store: UpdateFileStore,
path: PathBuf,
}
@ -115,6 +116,7 @@ impl UpdateStore {
fn new(
mut options: EnvOpenOptions,
path: impl AsRef<Path>,
update_file_store: UpdateFileStore,
) -> anyhow::Result<(Self, mpsc::Receiver<()>)> {
options.max_dbs(5);
@ -138,6 +140,7 @@ impl UpdateStore {
state,
notification_sender,
path: path.as_ref().to_owned(),
update_file_store,
},
notification_receiver,
))
@ -148,8 +151,9 @@ impl UpdateStore {
path: impl AsRef<Path>,
index_resolver: Arc<HardStateIndexResolver>,
must_exit: Arc<AtomicBool>,
update_file_store: UpdateFileStore,
) -> anyhow::Result<Arc<Self>> {
let (update_store, mut notification_receiver) = Self::new(options, path)?;
let (update_store, mut notification_receiver) = Self::new(options, path, update_file_store)?;
let update_store = Arc::new(update_store);
// Send a first notification to trigger the process.
@ -482,13 +486,13 @@ impl UpdateStore {
pub fn snapshot(
&self,
_uuids: &HashSet<Uuid>,
indexes: Vec<Index>,
path: impl AsRef<Path>,
handle: Arc<HardStateIndexResolver>,
) -> Result<()> {
let state_lock = self.state.write();
state_lock.swap(State::Snapshoting);
let txn = self.env.write_txn()?;
let update_path = path.as_ref().join("updates");
@ -501,42 +505,28 @@ impl UpdateStore {
// create db snapshot
self.env.copy_to_path(&db_path, CompactionOption::Enabled)?;
let update_files_path = update_path.join(UPDATE_DIR);
create_dir_all(&update_files_path)?;
let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
let uuids: HashSet<_> = indexes.iter().map(|i| i.uuid).collect();
for entry in pendings {
let ((_, _uuid, _), _pending) = entry?;
//if uuids.contains(&uuid) {
//if let Enqueued {
//content: Some(uuid),
//..
//} = pending.decode()?
//{
//let path = update_uuid_to_file_path(&self.path, uuid);
//copy(path, &update_files_path)?;
//}
//}
let ((_, uuid, _), pending) = entry?;
if uuids.contains(&uuid) {
if let Enqueued {
meta: RegisterUpdate::DocumentAddition {
content_uuid, ..
},
..
} = pending.decode()?
{
self.update_file_store.snapshot(content_uuid, &path).unwrap();
}
}
}
let _path = &path.as_ref().to_path_buf();
let _handle = &handle;
// Perform the snapshot of each index concurently. Only a third of the capabilities of
// the index actor at a time not to put too much pressure on the index actor
todo!()
//let mut stream = futures::stream::iter(uuids.iter())
//.map(move |uuid| IndexMsg::snapshot(handle,*uuid, path.clone()))
//.buffer_unordered(CONCURRENT_INDEX_MSG / 3);
let path = path.as_ref().to_owned();
indexes.par_iter().try_for_each(|index| index.snapshot(&path)).unwrap();
//Handle::current().block_on(async {
//while let Some(res) = stream.next().await {
//res?;
//}
//Ok(()) as Result<()>
//})?;
//Ok(())
Ok(())
}
pub fn get_info(&self) -> Result<UpdateStoreInfo> {

View File

@ -7,6 +7,8 @@ pub mod index_controller;
pub use index_controller::{IndexController as MeiliSearch, updates::RegisterUpdate};
mod compression;
use walkdir::WalkDir;
pub trait EnvSizer {