fix snapshot bugs

This commit is contained in:
mpostma 2021-03-22 19:19:37 +01:00
parent d73fbdef2e
commit e9da191b7d
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
6 changed files with 36 additions and 21 deletions

View File

@ -1,29 +1,28 @@
use flate2::read::GzDecoder;
use flate2::write::GzEncoder;
use flate2::Compression;
use std::fs::{create_dir_all, File}; use std::fs::{create_dir_all, File};
use std::io::Write;
use std::path::Path; use std::path::Path;
use flate2::{Compression, write::GzEncoder, read::GzDecoder};
use tar::{Archive, Builder}; use tar::{Archive, Builder};
use crate::error::Error; use crate::error::Error;
pub fn to_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> Result<(), Error> { pub fn to_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> Result<(), Error> {
let f = File::create(dest)?; let mut f = File::create(dest)?;
let gz_encoder = GzEncoder::new(f, Compression::default()); let gz_encoder = GzEncoder::new(&mut f, Compression::default());
let mut tar_encoder = Builder::new(gz_encoder); let mut tar_encoder = Builder::new(gz_encoder);
tar_encoder.append_dir_all(".", src)?; tar_encoder.append_dir_all(".", src)?;
let gz_encoder = tar_encoder.into_inner()?; let gz_encoder = tar_encoder.into_inner()?;
gz_encoder.finish()?; gz_encoder.finish()?;
f.flush()?;
Ok(()) Ok(())
} }
pub fn from_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> Result<(), Error> { pub fn from_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> Result<(), Error> {
println!("inflating from {:?} to {:?}", src.as_ref(), dest.as_ref());
let f = File::open(&src)?; let f = File::open(&src)?;
let gz = GzDecoder::new(f); let gz = GzDecoder::new(f);
let mut ar = Archive::new(gz); let mut ar = Archive::new(gz);
create_dir_all(&dest)?; create_dir_all(&dest)?;
ar.unpack(&dest)?; ar.unpack(&dest)?;
println!("here");
Ok(()) Ok(())
} }

View File

@ -26,7 +26,6 @@ use crate::index_controller::{
UpdateMeta, UpdateMeta,
}; };
use crate::option::IndexerOpts; use crate::option::IndexerOpts;
use crate::helpers::compression;
pub type Result<T> = std::result::Result<T, IndexError>; pub type Result<T> = std::result::Result<T, IndexError>;
type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>; type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
@ -417,7 +416,6 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
use tokio::fs::create_dir_all; use tokio::fs::create_dir_all;
path.push("indexes"); path.push("indexes");
println!("performing index snapshot in {:?}", path);
create_dir_all(&path) create_dir_all(&path)
.await .await
.map_err(|e| IndexError::Error(e.into()))?; .map_err(|e| IndexError::Error(e.into()))?;
@ -435,7 +433,10 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.env .env
.copy_to_path(index_path, CompactionOption::Enabled)?; .copy_to_path(index_path, CompactionOption::Enabled)?;
Ok(()) Ok(())
}); })
.await
.map_err(|e| IndexError::Error(e.into()))?
.map_err(|e| IndexError::Error(e.into()))?;
} }
Ok(()) Ok(())

View File

@ -21,6 +21,7 @@ use tokio::time::sleep;
use crate::index::{Document, SearchQuery, SearchResult}; use crate::index::{Document, SearchQuery, SearchResult};
use crate::index::{Facets, Settings, UpdateResult}; use crate::index::{Facets, Settings, UpdateResult};
use crate::option::Opt; use crate::option::Opt;
use crate::helpers::compression;
use snapshot::SnapshotService; use snapshot::SnapshotService;
pub use updates::{Failed, Processed, Processing}; pub use updates::{Failed, Processed, Processing};
@ -67,6 +68,10 @@ impl IndexController {
let index_size = options.max_mdb_size.get_bytes() as usize; let index_size = options.max_mdb_size.get_bytes() as usize;
let update_store_size = options.max_udb_size.get_bytes() as usize; let update_store_size = options.max_udb_size.get_bytes() as usize;
if let Some(ref path) = options.import_snapshot {
compression::from_tar_gz(path, &options.db_path)?;
}
let uuid_resolver = uuid_resolver::UuidResolverHandle::new(&path)?; let uuid_resolver = uuid_resolver::UuidResolverHandle::new(&path)?;
let index_handle = index_actor::IndexActorHandle::new(&path, index_size)?; let index_handle = index_actor::IndexActorHandle::new(&path, index_size)?;
let update_handle = update_actor::UpdateActorHandle::new( let update_handle = update_actor::UpdateActorHandle::new(

View File

@ -2,9 +2,10 @@ use std::path::PathBuf;
use std::time::Duration; use std::time::Duration;
use anyhow::bail; use anyhow::bail;
use log::{error, info};
use tokio::fs; use tokio::fs;
use tokio::task::spawn_blocking; use tokio::task::spawn_blocking;
use tokio::time::interval; use tokio::time::sleep;
use crate::helpers::compression; use crate::helpers::compression;
use super::index_actor::IndexActorHandle; use super::index_actor::IndexActorHandle;
@ -38,11 +39,12 @@ impl<B> SnapshotService<B> {
} }
pub async fn run(self) { pub async fn run(self) {
let mut interval = interval(self.snapshot_period);
loop { loop {
interval.tick().await; sleep(self.snapshot_period).await;
self.perform_snapshot().await.unwrap(); if let Err(e) = self.perform_snapshot().await {
error!("{}", e);
}
} }
} }
@ -57,6 +59,11 @@ impl<B> SnapshotService<B> {
fs::create_dir_all(&temp_snapshot_path).await?; fs::create_dir_all(&temp_snapshot_path).await?;
let uuids = self.uuid_resolver_handle.snapshot(temp_snapshot_path.clone()).await?; let uuids = self.uuid_resolver_handle.snapshot(temp_snapshot_path.clone()).await?;
if uuids.is_empty() {
return Ok(())
}
for uuid in uuids { for uuid in uuids {
self.update_handle.snapshot(uuid, temp_snapshot_path.clone()).await?; self.update_handle.snapshot(uuid, temp_snapshot_path.clone()).await?;
} }
@ -69,6 +76,8 @@ impl<B> SnapshotService<B> {
fs::rename(temp_snapshot_file, &self.snapshot_path).await?; fs::rename(temp_snapshot_file, &self.snapshot_path).await?;
info!("Created snapshot in {:?}.", self.snapshot_path);
Ok(()) Ok(())
} }
} }

View File

@ -16,7 +16,6 @@ use uuid::Uuid;
use super::get_arc_ownership_blocking; use super::get_arc_ownership_blocking;
use crate::index::UpdateResult; use crate::index::UpdateResult;
use crate::index_controller::{UpdateMeta, UpdateStatus}; use crate::index_controller::{UpdateMeta, UpdateStatus};
use crate::helpers::compression;
pub type Result<T> = std::result::Result<T, UpdateError>; pub type Result<T> = std::result::Result<T, UpdateError>;
type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>; type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>;

View File

@ -10,8 +10,6 @@ use thiserror::Error;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use uuid::Uuid; use uuid::Uuid;
use crate::helpers::compression;
const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB const UUID_STORE_SIZE: usize = 1_073_741_824; //1GiB
pub type Result<T> = std::result::Result<T, UuidError>; pub type Result<T> = std::result::Result<T, UuidError>;
@ -330,10 +328,14 @@ impl UuidStore for HeedUuidStore {
let uuid = Uuid::from_slice(uuid)?; let uuid = Uuid::from_slice(uuid)?;
entries.push(uuid) entries.push(uuid)
} }
// only perform snapshot if there are indexes
if !entries.is_empty() {
path.push("index_uuids"); path.push("index_uuids");
create_dir_all(&path).unwrap(); create_dir_all(&path).unwrap();
path.push("data.mdb"); path.push("data.mdb");
env.copy_to_path(path, CompactionOption::Enabled)?; env.copy_to_path(path, CompactionOption::Enabled)?;
}
Ok(entries) Ok(entries)
}) })
.await? .await?