2021-04-22 10:14:29 +02:00
|
|
|
use std::collections::HashSet;
|
2021-03-23 11:00:50 +01:00
|
|
|
use std::io::SeekFrom;
|
|
|
|
use std::path::{Path, PathBuf};
|
2021-06-09 16:19:45 +02:00
|
|
|
use std::sync::atomic::AtomicBool;
|
2021-04-13 17:14:02 +02:00
|
|
|
use std::sync::Arc;
|
2021-03-23 11:00:50 +01:00
|
|
|
|
2021-06-02 17:45:28 +02:00
|
|
|
use async_stream::stream;
|
|
|
|
use futures::StreamExt;
|
2021-03-23 11:00:50 +01:00
|
|
|
use log::info;
|
|
|
|
use oxidized_json_checker::JsonChecker;
|
|
|
|
use tokio::fs;
|
2021-04-22 10:14:29 +02:00
|
|
|
use tokio::io::AsyncWriteExt;
|
2021-03-24 11:29:11 +01:00
|
|
|
use tokio::sync::mpsc;
|
|
|
|
use uuid::Uuid;
|
2021-03-23 11:00:50 +01:00
|
|
|
|
2021-06-14 21:26:35 +02:00
|
|
|
use super::error::{Result, UpdateActorError};
|
2021-06-15 17:39:07 +02:00
|
|
|
use super::{PayloadData, UpdateMsg, UpdateStore, UpdateStoreInfo};
|
2021-05-25 09:46:11 +02:00
|
|
|
use crate::index_controller::index_actor::IndexActorHandle;
|
2021-04-13 17:14:02 +02:00
|
|
|
use crate::index_controller::{UpdateMeta, UpdateStatus};
|
2021-03-23 11:00:50 +01:00
|
|
|
|
2021-04-13 17:14:02 +02:00
|
|
|
pub struct UpdateActor<D, I> {
|
2021-03-23 11:00:50 +01:00
|
|
|
path: PathBuf,
|
2021-04-13 17:14:02 +02:00
|
|
|
store: Arc<UpdateStore>,
|
2021-06-02 17:45:28 +02:00
|
|
|
inbox: Option<mpsc::Receiver<UpdateMsg<D>>>,
|
2021-03-23 11:00:50 +01:00
|
|
|
index_handle: I,
|
2021-06-09 16:19:45 +02:00
|
|
|
must_exit: Arc<AtomicBool>,
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|
|
|
|
|
2021-04-13 17:14:02 +02:00
|
|
|
impl<D, I> UpdateActor<D, I>
|
2021-03-23 11:00:50 +01:00
|
|
|
where
|
|
|
|
D: AsRef<[u8]> + Sized + 'static,
|
2021-03-23 16:19:01 +01:00
|
|
|
I: IndexActorHandle + Clone + Send + Sync + 'static,
|
2021-03-23 11:00:50 +01:00
|
|
|
{
|
|
|
|
pub fn new(
|
2021-04-13 17:14:02 +02:00
|
|
|
update_db_size: usize,
|
2021-03-23 11:00:50 +01:00
|
|
|
inbox: mpsc::Receiver<UpdateMsg<D>>,
|
|
|
|
path: impl AsRef<Path>,
|
|
|
|
index_handle: I,
|
2021-06-15 17:39:07 +02:00
|
|
|
) -> anyhow::Result<Self> {
|
2021-04-22 10:14:29 +02:00
|
|
|
let path = path.as_ref().join("updates");
|
2021-04-13 17:14:02 +02:00
|
|
|
|
|
|
|
std::fs::create_dir_all(&path)?;
|
|
|
|
|
|
|
|
let mut options = heed::EnvOpenOptions::new();
|
|
|
|
options.map_size(update_db_size);
|
|
|
|
|
2021-06-09 16:19:45 +02:00
|
|
|
let must_exit = Arc::new(AtomicBool::new(false));
|
|
|
|
|
|
|
|
let store = UpdateStore::open(options, &path, index_handle.clone(), must_exit.clone())?;
|
2021-03-23 11:00:50 +01:00
|
|
|
std::fs::create_dir_all(path.join("update_files"))?;
|
2021-06-02 17:45:28 +02:00
|
|
|
let inbox = Some(inbox);
|
2021-05-25 09:46:11 +02:00
|
|
|
Ok(Self {
|
|
|
|
path,
|
|
|
|
store,
|
|
|
|
inbox,
|
|
|
|
index_handle,
|
2021-06-09 16:19:45 +02:00
|
|
|
must_exit,
|
2021-05-25 09:46:11 +02:00
|
|
|
})
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn run(mut self) {
|
|
|
|
use UpdateMsg::*;
|
|
|
|
|
|
|
|
info!("Started update actor.");
|
|
|
|
|
2021-06-02 17:45:28 +02:00
|
|
|
let mut inbox = self
|
|
|
|
.inbox
|
|
|
|
.take()
|
|
|
|
.expect("A receiver should be present by now.");
|
2021-06-09 16:19:45 +02:00
|
|
|
|
2021-06-02 17:45:28 +02:00
|
|
|
let must_exit = self.must_exit.clone();
|
|
|
|
let stream = stream! {
|
|
|
|
loop {
|
|
|
|
let msg = inbox.recv().await;
|
2021-06-09 16:19:45 +02:00
|
|
|
|
2021-06-02 17:45:28 +02:00
|
|
|
if must_exit.load(std::sync::atomic::Ordering::Relaxed) {
|
|
|
|
break;
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|
2021-06-02 17:45:28 +02:00
|
|
|
|
|
|
|
match msg {
|
|
|
|
Some(msg) => yield msg,
|
|
|
|
None => break,
|
2021-04-09 14:41:24 +02:00
|
|
|
}
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|
2021-06-02 17:45:28 +02:00
|
|
|
};
|
|
|
|
|
|
|
|
stream
|
|
|
|
.for_each_concurrent(Some(10), |msg| async {
|
|
|
|
match msg {
|
|
|
|
Update {
|
|
|
|
uuid,
|
|
|
|
meta,
|
|
|
|
data,
|
|
|
|
ret,
|
|
|
|
} => {
|
|
|
|
let _ = ret.send(self.handle_update(uuid, meta, data).await);
|
|
|
|
}
|
|
|
|
ListUpdates { uuid, ret } => {
|
|
|
|
let _ = ret.send(self.handle_list_updates(uuid).await);
|
|
|
|
}
|
|
|
|
GetUpdate { uuid, ret, id } => {
|
|
|
|
let _ = ret.send(self.handle_get_update(uuid, id).await);
|
|
|
|
}
|
|
|
|
Delete { uuid, ret } => {
|
|
|
|
let _ = ret.send(self.handle_delete(uuid).await);
|
|
|
|
}
|
|
|
|
Snapshot { uuids, path, ret } => {
|
|
|
|
let _ = ret.send(self.handle_snapshot(uuids, path).await);
|
|
|
|
}
|
|
|
|
GetInfo { ret } => {
|
|
|
|
let _ = ret.send(self.handle_get_info().await);
|
|
|
|
}
|
|
|
|
Dump { uuids, path, ret } => {
|
|
|
|
let _ = ret.send(self.handle_dump(uuids, path).await);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.await;
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
async fn handle_update(
|
|
|
|
&self,
|
|
|
|
uuid: Uuid,
|
|
|
|
meta: UpdateMeta,
|
2021-06-23 14:45:26 +02:00
|
|
|
payload: mpsc::Receiver<PayloadData<D>>,
|
2021-03-23 11:00:50 +01:00
|
|
|
) -> Result<UpdateStatus> {
|
2021-04-22 10:14:29 +02:00
|
|
|
let file_path = match meta {
|
2021-06-09 17:10:10 +02:00
|
|
|
UpdateMeta::DocumentsAddition { .. } => {
|
2021-04-22 10:14:29 +02:00
|
|
|
let update_file_id = uuid::Uuid::new_v4();
|
|
|
|
let path = self
|
|
|
|
.path
|
|
|
|
.join(format!("update_files/update_{}", update_file_id));
|
|
|
|
let mut file = fs::OpenOptions::new()
|
|
|
|
.read(true)
|
|
|
|
.write(true)
|
|
|
|
.create(true)
|
|
|
|
.open(&path)
|
2021-05-25 09:46:11 +02:00
|
|
|
.await?;
|
2021-04-22 10:14:29 +02:00
|
|
|
|
2021-06-23 14:48:33 +02:00
|
|
|
async fn write_to_file<D>(
|
|
|
|
file: &mut fs::File,
|
|
|
|
mut payload: mpsc::Receiver<PayloadData<D>>,
|
|
|
|
) -> Result<usize>
|
2021-06-23 14:45:26 +02:00
|
|
|
where
|
|
|
|
D: AsRef<[u8]> + Sized + 'static,
|
|
|
|
{
|
|
|
|
let mut file_len = 0;
|
2021-06-23 16:34:07 +02:00
|
|
|
|
2021-06-23 14:45:26 +02:00
|
|
|
while let Some(bytes) = payload.recv().await {
|
|
|
|
let bytes = bytes?;
|
|
|
|
file_len += bytes.as_ref().len();
|
|
|
|
file.write_all(bytes.as_ref()).await?;
|
|
|
|
}
|
2021-06-23 16:34:07 +02:00
|
|
|
|
|
|
|
file.flush().await?;
|
|
|
|
|
2021-06-23 14:45:26 +02:00
|
|
|
Ok(file_len)
|
2021-04-22 10:14:29 +02:00
|
|
|
}
|
|
|
|
|
2021-06-23 14:45:26 +02:00
|
|
|
let file_len = write_to_file(&mut file, payload).await;
|
|
|
|
|
|
|
|
match file_len {
|
|
|
|
Ok(len) if len > 0 => {
|
|
|
|
let file = file.into_std().await;
|
|
|
|
Some((file, update_file_id))
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
fs::remove_file(&path).await?;
|
2021-06-23 14:48:33 +02:00
|
|
|
return Err(e);
|
2021-06-23 14:45:26 +02:00
|
|
|
}
|
|
|
|
_ => {
|
|
|
|
fs::remove_file(&path).await?;
|
|
|
|
None
|
|
|
|
}
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|
|
|
|
}
|
2021-05-25 09:46:11 +02:00
|
|
|
_ => None,
|
2021-04-22 10:14:29 +02:00
|
|
|
};
|
2021-03-23 11:00:50 +01:00
|
|
|
|
2021-04-13 17:14:02 +02:00
|
|
|
let update_store = self.store.clone();
|
|
|
|
|
2021-03-23 11:00:50 +01:00
|
|
|
tokio::task::spawn_blocking(move || {
|
|
|
|
use std::io::{copy, sink, BufReader, Seek};
|
|
|
|
|
|
|
|
// If the payload is empty, ignore the check.
|
2021-05-29 00:08:17 +02:00
|
|
|
let update_uuid = if let Some((mut file, uuid)) = file_path {
|
2021-04-22 10:14:29 +02:00
|
|
|
// set the file back to the beginning
|
2021-05-25 09:46:11 +02:00
|
|
|
file.seek(SeekFrom::Start(0))?;
|
2021-03-23 11:00:50 +01:00
|
|
|
// Check that the json payload is valid:
|
|
|
|
let reader = BufReader::new(&mut file);
|
|
|
|
let mut checker = JsonChecker::new(reader);
|
|
|
|
|
|
|
|
if copy(&mut checker, &mut sink()).is_err() || checker.finish().is_err() {
|
|
|
|
// The json file is invalid, we use Serde to get a nice error message:
|
2021-05-25 09:46:11 +02:00
|
|
|
file.seek(SeekFrom::Start(0))?;
|
2021-06-21 18:56:22 +02:00
|
|
|
let _: serde_json::Value = serde_json::from_reader(file)
|
|
|
|
.map_err(|e| UpdateActorError::InvalidPayload(Box::new(e)))?;
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|
2021-05-29 00:08:17 +02:00
|
|
|
Some(uuid)
|
2021-04-22 10:14:29 +02:00
|
|
|
} else {
|
|
|
|
None
|
|
|
|
};
|
2021-03-23 11:00:50 +01:00
|
|
|
|
|
|
|
// The payload is valid, we can register it to the update store.
|
2021-05-25 09:46:11 +02:00
|
|
|
let status = update_store
|
2021-05-29 00:08:17 +02:00
|
|
|
.register_update(meta, update_uuid, uuid)
|
2021-05-25 09:46:11 +02:00
|
|
|
.map(UpdateStatus::Enqueued)?;
|
|
|
|
Ok(status)
|
2021-03-23 11:00:50 +01:00
|
|
|
})
|
2021-05-25 09:46:11 +02:00
|
|
|
.await?
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
|
2021-04-13 17:14:02 +02:00
|
|
|
let update_store = self.store.clone();
|
2021-03-23 11:00:50 +01:00
|
|
|
tokio::task::spawn_blocking(move || {
|
2021-05-25 09:46:11 +02:00
|
|
|
let result = update_store.list(uuid)?;
|
2021-03-23 11:00:50 +01:00
|
|
|
Ok(result)
|
|
|
|
})
|
2021-05-25 09:46:11 +02:00
|
|
|
.await?
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
|
2021-04-13 17:14:02 +02:00
|
|
|
let store = self.store.clone();
|
2021-06-09 17:10:10 +02:00
|
|
|
tokio::task::spawn_blocking(move || {
|
2021-06-15 17:39:07 +02:00
|
|
|
let result = store
|
|
|
|
.meta(uuid, id)?
|
|
|
|
.ok_or(UpdateActorError::UnexistingUpdate(id))?;
|
2021-06-09 17:10:10 +02:00
|
|
|
Ok(result)
|
|
|
|
})
|
|
|
|
.await?
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
|
2021-05-10 20:24:14 +02:00
|
|
|
let store = self.store.clone();
|
2021-04-28 16:43:49 +02:00
|
|
|
|
2021-05-25 09:46:11 +02:00
|
|
|
tokio::task::spawn_blocking(move || store.delete_all(uuid)).await??;
|
2021-04-28 16:43:49 +02:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
2021-03-23 11:00:50 +01:00
|
|
|
|
2021-05-10 20:24:14 +02:00
|
|
|
async fn handle_snapshot(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
|
|
|
|
let index_handle = self.index_handle.clone();
|
|
|
|
let update_store = self.store.clone();
|
2021-04-28 16:43:49 +02:00
|
|
|
|
2021-05-25 09:46:11 +02:00
|
|
|
tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle))
|
|
|
|
.await??;
|
2021-03-23 11:00:50 +01:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2021-05-25 16:33:09 +02:00
|
|
|
async fn handle_dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
|
2021-03-23 11:00:50 +01:00
|
|
|
let index_handle = self.index_handle.clone();
|
2021-04-13 17:14:02 +02:00
|
|
|
let update_store = self.store.clone();
|
2021-05-31 16:40:59 +02:00
|
|
|
|
2021-06-14 21:26:35 +02:00
|
|
|
tokio::task::spawn_blocking(move || -> Result<()> {
|
2021-05-05 19:06:07 +02:00
|
|
|
update_store.dump(&uuids, path.to_path_buf(), index_handle)?;
|
|
|
|
Ok(())
|
2021-04-13 17:14:02 +02:00
|
|
|
})
|
2021-05-25 09:46:11 +02:00
|
|
|
.await??;
|
|
|
|
|
2021-03-23 11:00:50 +01:00
|
|
|
Ok(())
|
|
|
|
}
|
2021-04-09 14:41:24 +02:00
|
|
|
|
2021-04-14 18:55:04 +02:00
|
|
|
async fn handle_get_info(&self) -> Result<UpdateStoreInfo> {
|
2021-04-14 17:53:12 +02:00
|
|
|
let update_store = self.store.clone();
|
2021-06-14 21:26:35 +02:00
|
|
|
let info = tokio::task::spawn_blocking(move || -> Result<UpdateStoreInfo> {
|
2021-04-22 10:14:29 +02:00
|
|
|
let info = update_store.get_info()?;
|
2021-04-14 18:55:04 +02:00
|
|
|
Ok(info)
|
2021-04-14 17:53:12 +02:00
|
|
|
})
|
2021-05-25 09:46:11 +02:00
|
|
|
.await??;
|
2021-04-09 14:41:24 +02:00
|
|
|
|
2021-04-14 18:55:04 +02:00
|
|
|
Ok(info)
|
2021-04-09 14:41:24 +02:00
|
|
|
}
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|