sequential index snapshot

This commit is contained in:
mpostma 2021-03-20 11:50:57 +01:00
parent 35a7b800eb
commit 520f7c09ba
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
4 changed files with 75 additions and 53 deletions

View File

@ -104,7 +104,7 @@ enum IndexMsg {
ret: oneshot::Sender<Result<IndexMeta>>, ret: oneshot::Sender<Result<IndexMeta>>,
}, },
Snapshot { Snapshot {
uuids: Vec<Uuid>, uuid: Uuid,
path: PathBuf, path: PathBuf,
ret: oneshot::Sender<Result<()>>, ret: oneshot::Sender<Result<()>>,
} }
@ -256,8 +256,8 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
} => { } => {
let _ = ret.send(self.handle_update_index(uuid, index_settings).await); let _ = ret.send(self.handle_update_index(uuid, index_settings).await);
} }
Snapshot { uuids, path, ret } => { Snapshot { uuid, path, ret } => {
let _ = ret.send(self.handle_snapshot(uuids, path).await); let _ = ret.send(self.handle_snapshot(uuid, path).await);
} }
} }
} }
@ -412,7 +412,7 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.map_err(|e| IndexError::Error(e.into()))? .map_err(|e| IndexError::Error(e.into()))?
} }
async fn handle_snapshot(&self, uuids: Vec<Uuid>, mut path: PathBuf) -> Result<()> { async fn handle_snapshot(&self, uuid: Uuid, mut path: PathBuf) -> Result<()> {
use tokio::fs::create_dir_all; use tokio::fs::create_dir_all;
path.push("indexes"); path.push("indexes");
@ -421,25 +421,14 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
.await .await
.map_err(|e| IndexError::Error(e.into()))?; .map_err(|e| IndexError::Error(e.into()))?;
let mut handles = Vec::new();
for uuid in uuids {
if let Some(index) = self.store.get(uuid).await? { if let Some(index) = self.store.get(uuid).await? {
let index_path = path.join(format!("index-{}", uuid)); let index_path = path.join(format!("index-{}", uuid));
let handle = spawn_blocking(move || -> anyhow::Result<()> { spawn_blocking(move || -> anyhow::Result<()> {
// Get write txn to wait for ongoing write transaction before snapshot. // Get write txn to wait for ongoing write transaction before snapshot.
let _txn = index.write_txn()?; let _txn = index.write_txn()?;
index.env.copy_to_path(index_path, CompactionOption::Enabled)?; index.env.copy_to_path(index_path, CompactionOption::Enabled)?;
Ok(()) Ok(())
}); });
handles.push(handle);
}
}
for handle in handles {
handle
.await
.map_err(|e| IndexError::Error(e.into()))?
.map_err(|e| IndexError::Error(e.into()))?;
} }
Ok(()) Ok(())
@ -567,10 +556,10 @@ impl IndexActorHandle {
Ok(receiver.await.expect("IndexActor has been killed")?) Ok(receiver.await.expect("IndexActor has been killed")?)
} }
pub async fn snapshot(&self, uuids: Vec<Uuid>, path: PathBuf) -> Result<()> { pub async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Snapshot { let msg = IndexMsg::Snapshot {
uuids, uuid,
path, path,
ret, ret,
}; };

View File

@ -50,10 +50,9 @@ impl<B> SnapshotService<B> {
.join(format!("tmp-{}", Uuid::new_v4())); .join(format!("tmp-{}", Uuid::new_v4()));
create_dir_all(&temp_snapshot_path)?; create_dir_all(&temp_snapshot_path)?;
let uuids = self.uuid_resolver_handle.snapshot(temp_snapshot_path.clone()).await?; let uuids = self.uuid_resolver_handle.snapshot(temp_snapshot_path.clone()).await?;
let index_snapshot = self.index_handle.snapshot(uuids.clone(), temp_snapshot_path.clone()); for uuid in uuids {
let updates_snapshot = self.update_handle.snapshot(uuids.clone(), temp_snapshot_path.clone()); self.update_handle.snapshot(uuid, temp_snapshot_path.clone()).await?;
let (first, second) = tokio::join!(updates_snapshot, index_snapshot); }
println!("results: {:?}, {:?}", first, second);
Ok(()) Ok(())
} }
} }

View File

@ -1,15 +1,16 @@
use std::collections::{hash_map::Entry, HashMap}; use std::collections::{hash_map::Entry, HashMap};
use std::io::SeekFrom;
use std::fs::{create_dir_all, remove_dir_all}; use std::fs::{create_dir_all, remove_dir_all};
use std::io::SeekFrom;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use super::index_actor::IndexActorHandle;
use heed::CompactionOption;
use log::info; use log::info;
use oxidized_json_checker::JsonChecker; use oxidized_json_checker::JsonChecker;
use super::index_actor::IndexActorHandle;
use thiserror::Error; use thiserror::Error;
use tokio::fs::OpenOptions; use tokio::fs::OpenOptions;
use tokio::io::{AsyncWriteExt, AsyncSeekExt}; use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::sync::{mpsc, oneshot, RwLock};
use uuid::Uuid; use uuid::Uuid;
@ -56,16 +57,17 @@ enum UpdateMsg<D> {
ret: oneshot::Sender<Result<()>>, ret: oneshot::Sender<Result<()>>,
}, },
Snapshot { Snapshot {
uuids: Vec<Uuid>, uuid: Uuid,
path: PathBuf, path: PathBuf,
ret: oneshot::Sender<Result<()>>, ret: oneshot::Sender<Result<()>>,
} },
} }
struct UpdateActor<D, S> { struct UpdateActor<D, S> {
path: PathBuf, path: PathBuf,
store: S, store: S,
inbox: mpsc::Receiver<UpdateMsg<D>>, inbox: mpsc::Receiver<UpdateMsg<D>>,
index_handle: IndexActorHandle,
} }
#[async_trait::async_trait] #[async_trait::async_trait]
@ -84,11 +86,17 @@ where
store: S, store: S,
inbox: mpsc::Receiver<UpdateMsg<D>>, inbox: mpsc::Receiver<UpdateMsg<D>>,
path: impl AsRef<Path>, path: impl AsRef<Path>,
index_handle: IndexActorHandle,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let path = path.as_ref().to_owned().join("update_files"); let path = path.as_ref().to_owned();
create_dir_all(&path)?; create_dir_all(path.join("update_files"))?;
assert!(path.exists()); assert!(path.exists());
Ok(Self { store, inbox, path }) Ok(Self {
store,
inbox,
path,
index_handle,
})
} }
async fn run(mut self) { async fn run(mut self) {
@ -118,8 +126,8 @@ where
Some(Create { uuid, ret }) => { Some(Create { uuid, ret }) => {
let _ = ret.send(self.handle_create(uuid).await); let _ = ret.send(self.handle_create(uuid).await);
} }
Some(Snapshot { uuids, path, ret }) => { Some(Snapshot { uuid, path, ret }) => {
let _ = ret.send(self.handle_snapshot(uuids, path).await); let _ = ret.send(self.handle_snapshot(uuid, path).await);
} }
None => break, None => break,
} }
@ -134,7 +142,9 @@ where
) -> Result<UpdateStatus> { ) -> Result<UpdateStatus> {
let update_store = self.store.get_or_create(uuid).await?; let update_store = self.store.get_or_create(uuid).await?;
let update_file_id = uuid::Uuid::new_v4(); let update_file_id = uuid::Uuid::new_v4();
let path = self.path.join(format!("update_{}", update_file_id)); let path = self
.path
.join(format!("update_files/update_{}", update_file_id));
let mut file = OpenOptions::new() let mut file = OpenOptions::new()
.read(true) .read(true)
.write(true) .write(true)
@ -167,10 +177,15 @@ where
let mut file = file.into_std().await; let mut file = file.into_std().await;
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
use std::io::{BufReader, sink, copy, Seek}; use std::io::{copy, sink, BufReader, Seek};
// If the payload is empty, ignore the check. // If the payload is empty, ignore the check.
if file.metadata().map_err(|e| UpdateError::Error(Box::new(e)))?.len() > 0 { if file
.metadata()
.map_err(|e| UpdateError::Error(Box::new(e)))?
.len()
> 0
{
// Check that the json payload is valid: // Check that the json payload is valid:
let reader = BufReader::new(&mut file); let reader = BufReader::new(&mut file);
let mut checker = JsonChecker::new(reader); let mut checker = JsonChecker::new(reader);
@ -241,13 +256,32 @@ where
Ok(()) Ok(())
} }
async fn handle_snapshot(&self, uuids: Vec<Uuid>, path: PathBuf) -> Result<()> { async fn handle_snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
use tokio::time; use tokio::fs;
use std::time::Duration;
let update_path = path.join("updates");
fs::create_dir_all(&update_path)
.await
.map_err(|e| UpdateError::Error(e.into()))?;
let index_handle = self.index_handle.clone();
if let Some(update_store) = self.store.get(uuid).await? {
let snapshot_path = update_path.join(format!("update-{}", uuid));
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
let _txn = update_store.env.write_txn()?;
update_store
.env
.copy_to_path(&snapshot_path, CompactionOption::Enabled)?;
futures::executor::block_on(
async move { index_handle.snapshot(uuid, path).await },
)?;
Ok(())
})
.await
.map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?;
}
println!("performing update snapshot");
time::sleep(Duration::from_secs(2)).await;
println!("Update snapshot done");
Ok(()) Ok(())
} }
} }
@ -268,8 +302,8 @@ where
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let path = path.as_ref().to_owned().join("updates"); let path = path.as_ref().to_owned().join("updates");
let (sender, receiver) = mpsc::channel(100); let (sender, receiver) = mpsc::channel(100);
let store = MapUpdateStoreStore::new(index_handle, &path, update_store_size); let store = MapUpdateStoreStore::new(index_handle.clone(), &path, update_store_size);
let actor = UpdateActor::new(store, receiver, path)?; let actor = UpdateActor::new(store, receiver, path, index_handle)?;
tokio::task::spawn(actor.run()); tokio::task::spawn(actor.run());
@ -323,9 +357,9 @@ impl<D> UpdateActorHandle<D> {
receiver.await.expect("update actor killed.") receiver.await.expect("update actor killed.")
} }
pub async fn snapshot(&self, uuids: Vec<Uuid>, path: PathBuf) -> Result<()> { pub async fn snapshot(&self, uuid: Uuid, path: PathBuf) -> Result<()> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Snapshot { uuids, path, ret }; let msg = UpdateMsg::Snapshot { uuid, path, ret };
let _ = self.sender.send(msg).await; let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.") receiver.await.expect("update actor killed.")
} }

View File

@ -16,7 +16,7 @@ type BEU64 = heed::zerocopy::U64<heed::byteorder::BE>;
#[derive(Clone)] #[derive(Clone)]
pub struct UpdateStore<M, N, E> { pub struct UpdateStore<M, N, E> {
env: Env, pub env: Env,
pending_meta: Database<OwnedType<BEU64>, SerdeJson<Pending<M>>>, pending_meta: Database<OwnedType<BEU64>, SerdeJson<Pending<M>>>,
pending: Database<OwnedType<BEU64>, SerdeJson<PathBuf>>, pending: Database<OwnedType<BEU64>, SerdeJson<PathBuf>>,
processed_meta: Database<OwnedType<BEU64>, SerdeJson<Processed<M, N>>>, processed_meta: Database<OwnedType<BEU64>, SerdeJson<Processed<M, N>>>,