MeiliSearch/meilisearch-http/src/index_controller/update_actor/actor.rs

269 lines
8.6 KiB
Rust
Raw Normal View History

2021-04-22 10:14:29 +02:00
use std::collections::HashSet;
2021-03-23 11:00:50 +01:00
use std::io::SeekFrom;
use std::path::{Path, PathBuf};
2021-06-09 16:19:45 +02:00
use std::sync::atomic::AtomicBool;
2021-04-13 17:14:02 +02:00
use std::sync::Arc;
2021-03-23 11:00:50 +01:00
2021-06-02 17:45:28 +02:00
use async_stream::stream;
use futures::StreamExt;
2021-03-23 11:00:50 +01:00
use log::info;
use oxidized_json_checker::JsonChecker;
use tokio::fs;
2021-04-22 10:14:29 +02:00
use tokio::io::AsyncWriteExt;
2021-03-24 11:29:11 +01:00
use tokio::sync::mpsc;
use uuid::Uuid;
2021-03-23 11:00:50 +01:00
use super::error::{Result, UpdateActorError};
2021-06-15 17:39:07 +02:00
use super::{PayloadData, UpdateMsg, UpdateStore, UpdateStoreInfo};
2021-05-25 09:46:11 +02:00
use crate::index_controller::index_actor::IndexActorHandle;
2021-04-13 17:14:02 +02:00
use crate::index_controller::{UpdateMeta, UpdateStatus};
2021-03-23 11:00:50 +01:00
2021-04-13 17:14:02 +02:00
pub struct UpdateActor<D, I> {
2021-03-23 11:00:50 +01:00
path: PathBuf,
2021-04-13 17:14:02 +02:00
store: Arc<UpdateStore>,
2021-06-02 17:45:28 +02:00
inbox: Option<mpsc::Receiver<UpdateMsg<D>>>,
2021-03-23 11:00:50 +01:00
index_handle: I,
2021-06-09 16:19:45 +02:00
must_exit: Arc<AtomicBool>,
2021-03-23 11:00:50 +01:00
}
2021-04-13 17:14:02 +02:00
impl<D, I> UpdateActor<D, I>
2021-03-23 11:00:50 +01:00
where
D: AsRef<[u8]> + Sized + 'static,
2021-03-23 16:19:01 +01:00
I: IndexActorHandle + Clone + Send + Sync + 'static,
2021-03-23 11:00:50 +01:00
{
pub fn new(
2021-04-13 17:14:02 +02:00
update_db_size: usize,
2021-03-23 11:00:50 +01:00
inbox: mpsc::Receiver<UpdateMsg<D>>,
path: impl AsRef<Path>,
index_handle: I,
2021-06-15 17:39:07 +02:00
) -> anyhow::Result<Self> {
2021-04-22 10:14:29 +02:00
let path = path.as_ref().join("updates");
2021-04-13 17:14:02 +02:00
std::fs::create_dir_all(&path)?;
let mut options = heed::EnvOpenOptions::new();
options.map_size(update_db_size);
2021-06-09 16:19:45 +02:00
let must_exit = Arc::new(AtomicBool::new(false));
let store = UpdateStore::open(options, &path, index_handle.clone(), must_exit.clone())?;
2021-03-23 11:00:50 +01:00
std::fs::create_dir_all(path.join("update_files"))?;
2021-06-02 17:45:28 +02:00
let inbox = Some(inbox);
2021-05-25 09:46:11 +02:00
Ok(Self {
path,
store,
inbox,
index_handle,
2021-06-09 16:19:45 +02:00
must_exit,
2021-05-25 09:46:11 +02:00
})
2021-03-23 11:00:50 +01:00
}
pub async fn run(mut self) {
use UpdateMsg::*;
info!("Started update actor.");
2021-06-02 17:45:28 +02:00
let mut inbox = self
.inbox
.take()
.expect("A receiver should be present by now.");
2021-06-09 16:19:45 +02:00
2021-06-02 17:45:28 +02:00
let must_exit = self.must_exit.clone();
let stream = stream! {
loop {
let msg = inbox.recv().await;
2021-06-09 16:19:45 +02:00
2021-06-02 17:45:28 +02:00
if must_exit.load(std::sync::atomic::Ordering::Relaxed) {
break;
2021-03-23 11:00:50 +01:00
}
2021-06-02 17:45:28 +02:00
match msg {
Some(msg) => yield msg,
None => break,
}
2021-03-23 11:00:50 +01:00
}
2021-06-02 17:45:28 +02:00
};
stream
.for_each_concurrent(Some(10), |msg| async {
match msg {
Update {
uuid,
meta,
data,
ret,
} => {
let _ = ret.send(self.handle_update(uuid, meta, data).await);
}
ListUpdates { uuid, ret } => {
let _ = ret.send(self.handle_list_updates(uuid).await);
}
GetUpdate { uuid, ret, id } => {
let _ = ret.send(self.handle_get_update(uuid, id).await);
}
Delete { uuid, ret } => {
let _ = ret.send(self.handle_delete(uuid).await);
}
Snapshot { uuids, path, ret } => {
let _ = ret.send(self.handle_snapshot(uuids, path).await);
}
GetInfo { ret } => {
let _ = ret.send(self.handle_get_info().await);
}
Dump { uuids, path, ret } => {
let _ = ret.send(self.handle_dump(uuids, path).await);
}
}
})
.await;
2021-03-23 11:00:50 +01:00
}
async fn handle_update(
&self,
uuid: Uuid,
meta: UpdateMeta,
payload: mpsc::Receiver<PayloadData<D>>,
2021-03-23 11:00:50 +01:00
) -> Result<UpdateStatus> {
2021-04-22 10:14:29 +02:00
let file_path = match meta {
2021-06-09 17:10:10 +02:00
UpdateMeta::DocumentsAddition { .. } => {
2021-04-22 10:14:29 +02:00
let update_file_id = uuid::Uuid::new_v4();
let path = self
.path
.join(format!("update_files/update_{}", update_file_id));
let mut file = fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&path)
2021-05-25 09:46:11 +02:00
.await?;
2021-04-22 10:14:29 +02:00
async fn write_to_file<D>(file: &mut fs::File, mut payload: mpsc::Receiver<PayloadData<D>>) -> Result<usize>
where
D: AsRef<[u8]> + Sized + 'static,
{
let mut file_len = 0;
while let Some(bytes) = payload.recv().await {
let bytes = bytes?;
file_len += bytes.as_ref().len();
file.write_all(bytes.as_ref()).await?;
}
Ok(file_len)
2021-04-22 10:14:29 +02:00
}
let file_len = write_to_file(&mut file, payload).await;
match file_len {
Ok(len) if len > 0 => {
file.flush().await?;
let file = file.into_std().await;
Some((file, update_file_id))
}
Err(e) => {
fs::remove_file(&path).await?;
return Err(e)
}
_ => {
fs::remove_file(&path).await?;
None
}
2021-03-23 11:00:50 +01:00
}
}
2021-05-25 09:46:11 +02:00
_ => None,
2021-04-22 10:14:29 +02:00
};
2021-03-23 11:00:50 +01:00
2021-04-13 17:14:02 +02:00
let update_store = self.store.clone();
2021-03-23 11:00:50 +01:00
tokio::task::spawn_blocking(move || {
use std::io::{copy, sink, BufReader, Seek};
// If the payload is empty, ignore the check.
2021-05-29 00:08:17 +02:00
let update_uuid = if let Some((mut file, uuid)) = file_path {
2021-04-22 10:14:29 +02:00
// set the file back to the beginning
2021-05-25 09:46:11 +02:00
file.seek(SeekFrom::Start(0))?;
2021-03-23 11:00:50 +01:00
// Check that the json payload is valid:
let reader = BufReader::new(&mut file);
let mut checker = JsonChecker::new(reader);
if copy(&mut checker, &mut sink()).is_err() || checker.finish().is_err() {
// The json file is invalid, we use Serde to get a nice error message:
2021-05-25 09:46:11 +02:00
file.seek(SeekFrom::Start(0))?;
let _: serde_json::Value = serde_json::from_reader(file)
.map_err(|e| UpdateActorError::InvalidPayload(Box::new(e)))?;
2021-03-23 11:00:50 +01:00
}
2021-05-29 00:08:17 +02:00
Some(uuid)
2021-04-22 10:14:29 +02:00
} else {
None
};
2021-03-23 11:00:50 +01:00
// The payload is valid, we can register it to the update store.
2021-05-25 09:46:11 +02:00
let status = update_store
2021-05-29 00:08:17 +02:00
.register_update(meta, update_uuid, uuid)
2021-05-25 09:46:11 +02:00
.map(UpdateStatus::Enqueued)?;
Ok(status)
2021-03-23 11:00:50 +01:00
})
2021-05-25 09:46:11 +02:00
.await?
2021-03-23 11:00:50 +01:00
}
async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
2021-04-13 17:14:02 +02:00
let update_store = self.store.clone();
2021-03-23 11:00:50 +01:00
tokio::task::spawn_blocking(move || {
2021-05-25 09:46:11 +02:00
let result = update_store.list(uuid)?;
2021-03-23 11:00:50 +01:00
Ok(result)
})
2021-05-25 09:46:11 +02:00
.await?
2021-03-23 11:00:50 +01:00
}
async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
2021-04-13 17:14:02 +02:00
let store = self.store.clone();
2021-06-09 17:10:10 +02:00
tokio::task::spawn_blocking(move || {
2021-06-15 17:39:07 +02:00
let result = store
.meta(uuid, id)?
.ok_or(UpdateActorError::UnexistingUpdate(id))?;
2021-06-09 17:10:10 +02:00
Ok(result)
})
.await?
2021-03-23 11:00:50 +01:00
}
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
2021-05-10 20:24:14 +02:00
let store = self.store.clone();
2021-04-28 16:43:49 +02:00
2021-05-25 09:46:11 +02:00
tokio::task::spawn_blocking(move || store.delete_all(uuid)).await??;
2021-04-28 16:43:49 +02:00
Ok(())
}
2021-03-23 11:00:50 +01:00
2021-05-10 20:24:14 +02:00
async fn handle_snapshot(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
let index_handle = self.index_handle.clone();
let update_store = self.store.clone();
2021-04-28 16:43:49 +02:00
2021-05-25 09:46:11 +02:00
tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle))
.await??;
2021-03-23 11:00:50 +01:00
Ok(())
}
2021-05-25 16:33:09 +02:00
async fn handle_dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
2021-03-23 11:00:50 +01:00
let index_handle = self.index_handle.clone();
2021-04-13 17:14:02 +02:00
let update_store = self.store.clone();
2021-05-31 16:40:59 +02:00
tokio::task::spawn_blocking(move || -> Result<()> {
2021-05-05 19:06:07 +02:00
update_store.dump(&uuids, path.to_path_buf(), index_handle)?;
Ok(())
2021-04-13 17:14:02 +02:00
})
2021-05-25 09:46:11 +02:00
.await??;
2021-03-23 11:00:50 +01:00
Ok(())
}
2021-04-14 18:55:04 +02:00
async fn handle_get_info(&self) -> Result<UpdateStoreInfo> {
2021-04-14 17:53:12 +02:00
let update_store = self.store.clone();
let info = tokio::task::spawn_blocking(move || -> Result<UpdateStoreInfo> {
2021-04-22 10:14:29 +02:00
let info = update_store.get_info()?;
2021-04-14 18:55:04 +02:00
Ok(info)
2021-04-14 17:53:12 +02:00
})
2021-05-25 09:46:11 +02:00
.await??;
2021-04-14 18:55:04 +02:00
Ok(info)
}
2021-03-23 11:00:50 +01:00
}