2021-09-22 11:52:29 +02:00
|
|
|
pub mod error;
|
|
|
|
mod message;
|
|
|
|
pub mod status;
|
|
|
|
pub mod store;
|
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
use std::collections::HashSet;
|
2021-09-14 18:39:02 +02:00
|
|
|
use std::io;
|
2021-03-23 11:00:50 +01:00
|
|
|
use std::path::{Path, PathBuf};
|
2021-06-09 16:19:45 +02:00
|
|
|
use std::sync::atomic::AtomicBool;
|
2021-04-13 17:14:02 +02:00
|
|
|
use std::sync::Arc;
|
2021-03-23 11:00:50 +01:00
|
|
|
|
2021-09-14 18:39:02 +02:00
|
|
|
use actix_web::error::PayloadError;
|
2021-06-02 17:45:28 +02:00
|
|
|
use async_stream::stream;
|
2021-09-14 18:39:02 +02:00
|
|
|
use bytes::Bytes;
|
|
|
|
use futures::{Stream, StreamExt};
|
2021-06-29 15:25:18 +02:00
|
|
|
use log::trace;
|
2021-09-14 18:39:02 +02:00
|
|
|
use milli::documents::DocumentBatchBuilder;
|
2021-09-22 11:52:29 +02:00
|
|
|
use milli::update::IndexDocumentsMethod;
|
|
|
|
use serde::{Deserialize, Serialize};
|
2021-09-14 18:39:02 +02:00
|
|
|
use serde_json::{Map, Value};
|
2021-03-24 11:29:11 +01:00
|
|
|
use tokio::sync::mpsc;
|
|
|
|
use uuid::Uuid;
|
2021-03-23 11:00:50 +01:00
|
|
|
|
2021-09-24 11:53:11 +02:00
|
|
|
use self::error::{Result, UpdateLoopError};
|
2021-09-22 11:52:29 +02:00
|
|
|
pub use self::message::UpdateMsg;
|
|
|
|
use self::store::{UpdateStore, UpdateStoreInfo};
|
2021-09-24 14:55:57 +02:00
|
|
|
use crate::index::{Settings, Unchecked};
|
2021-09-14 18:39:02 +02:00
|
|
|
use crate::index_controller::update_file_store::UpdateFileStore;
|
2021-09-22 11:52:29 +02:00
|
|
|
use status::UpdateStatus;
|
2021-03-23 11:00:50 +01:00
|
|
|
|
2021-09-24 11:53:11 +02:00
|
|
|
use super::index_resolver::HardStateIndexResolver;
|
2021-09-22 11:52:29 +02:00
|
|
|
use super::{DocumentAdditionFormat, Payload, Update};
|
|
|
|
|
|
|
|
pub type UpdateSender = mpsc::Sender<UpdateMsg>;
|
|
|
|
|
|
|
|
pub fn create_update_handler(
|
2021-09-24 11:53:11 +02:00
|
|
|
index_resolver: Arc<HardStateIndexResolver>,
|
2021-09-22 11:52:29 +02:00
|
|
|
db_path: impl AsRef<Path>,
|
|
|
|
update_store_size: usize,
|
|
|
|
) -> anyhow::Result<UpdateSender> {
|
|
|
|
let path = db_path.as_ref().to_owned();
|
|
|
|
let (sender, receiver) = mpsc::channel(100);
|
2021-09-24 11:53:11 +02:00
|
|
|
let actor = UpdateLoop::new(update_store_size, receiver, path, index_resolver)?;
|
2021-09-22 11:52:29 +02:00
|
|
|
|
|
|
|
tokio::task::spawn_local(actor.run());
|
|
|
|
|
|
|
|
Ok(sender)
|
|
|
|
}
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
|
|
pub enum RegisterUpdate {
|
2021-09-24 15:21:07 +02:00
|
|
|
DeleteDocuments(Vec<String>),
|
2021-09-22 11:52:29 +02:00
|
|
|
DocumentAddition {
|
|
|
|
primary_key: Option<String>,
|
|
|
|
method: IndexDocumentsMethod,
|
|
|
|
content_uuid: Uuid,
|
|
|
|
},
|
2021-09-24 14:55:57 +02:00
|
|
|
Settings(Settings<Unchecked>),
|
2021-09-24 15:21:07 +02:00
|
|
|
ClearDocuments,
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|
|
|
|
|
2021-09-22 11:52:29 +02:00
|
|
|
/// A wrapper type to implement read on a `Stream<Result<Bytes, Error>>`.
|
2021-09-14 18:39:02 +02:00
|
|
|
struct StreamReader<S> {
|
|
|
|
stream: S,
|
|
|
|
current: Option<Bytes>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl<S> StreamReader<S> {
|
|
|
|
fn new(stream: S) -> Self {
|
2021-09-22 11:52:29 +02:00
|
|
|
Self {
|
|
|
|
stream,
|
|
|
|
current: None,
|
|
|
|
}
|
2021-09-14 18:39:02 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-22 11:52:29 +02:00
|
|
|
impl<S: Stream<Item = std::result::Result<Bytes, PayloadError>> + Unpin> io::Read
|
|
|
|
for StreamReader<S>
|
|
|
|
{
|
2021-09-14 18:39:02 +02:00
|
|
|
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
2021-09-22 11:52:29 +02:00
|
|
|
// TODO: optimize buf filling
|
2021-09-14 18:39:02 +02:00
|
|
|
match self.current.take() {
|
|
|
|
Some(mut bytes) => {
|
|
|
|
let copied = bytes.split_to(buf.len());
|
|
|
|
buf.copy_from_slice(&copied);
|
|
|
|
if !bytes.is_empty() {
|
|
|
|
self.current.replace(bytes);
|
|
|
|
}
|
|
|
|
Ok(copied.len())
|
|
|
|
}
|
2021-09-22 11:52:29 +02:00
|
|
|
None => match tokio::runtime::Handle::current().block_on(self.stream.next()) {
|
|
|
|
Some(Ok(bytes)) => {
|
|
|
|
self.current.replace(bytes);
|
|
|
|
self.read(buf)
|
2021-09-14 18:39:02 +02:00
|
|
|
}
|
2021-09-22 11:52:29 +02:00
|
|
|
Some(Err(e)) => Err(io::Error::new(io::ErrorKind::BrokenPipe, e)),
|
|
|
|
None => return Ok(0),
|
|
|
|
},
|
2021-09-14 18:39:02 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-09-22 15:07:04 +02:00
|
|
|
pub struct UpdateLoop {
|
2021-09-22 11:52:29 +02:00
|
|
|
store: Arc<UpdateStore>,
|
|
|
|
inbox: Option<mpsc::Receiver<UpdateMsg>>,
|
|
|
|
update_file_store: UpdateFileStore,
|
2021-09-24 11:53:11 +02:00
|
|
|
index_resolver: Arc<HardStateIndexResolver>,
|
2021-09-22 11:52:29 +02:00
|
|
|
must_exit: Arc<AtomicBool>,
|
|
|
|
}
|
|
|
|
|
2021-09-22 15:07:04 +02:00
|
|
|
impl UpdateLoop {
|
2021-03-23 11:00:50 +01:00
|
|
|
pub fn new(
|
2021-04-13 17:14:02 +02:00
|
|
|
update_db_size: usize,
|
2021-09-14 18:39:02 +02:00
|
|
|
inbox: mpsc::Receiver<UpdateMsg>,
|
2021-03-23 11:00:50 +01:00
|
|
|
path: impl AsRef<Path>,
|
2021-09-24 11:53:11 +02:00
|
|
|
index_resolver: Arc<HardStateIndexResolver>,
|
2021-06-15 17:39:07 +02:00
|
|
|
) -> anyhow::Result<Self> {
|
2021-09-14 18:39:02 +02:00
|
|
|
let path = path.as_ref().to_owned();
|
2021-04-13 17:14:02 +02:00
|
|
|
std::fs::create_dir_all(&path)?;
|
|
|
|
|
|
|
|
let mut options = heed::EnvOpenOptions::new();
|
|
|
|
options.map_size(update_db_size);
|
|
|
|
|
2021-06-09 16:19:45 +02:00
|
|
|
let must_exit = Arc::new(AtomicBool::new(false));
|
|
|
|
|
2021-09-24 11:53:11 +02:00
|
|
|
let store = UpdateStore::open(options, &path, index_resolver.clone(), must_exit.clone())?;
|
2021-09-14 18:39:02 +02:00
|
|
|
|
2021-06-02 17:45:28 +02:00
|
|
|
let inbox = Some(inbox);
|
2021-09-14 18:39:02 +02:00
|
|
|
|
2021-09-22 11:52:29 +02:00
|
|
|
let update_file_store = UpdateFileStore::new(&path).unwrap();
|
2021-09-14 18:39:02 +02:00
|
|
|
|
2021-05-25 09:46:11 +02:00
|
|
|
Ok(Self {
|
|
|
|
store,
|
|
|
|
inbox,
|
2021-06-09 16:19:45 +02:00
|
|
|
must_exit,
|
2021-09-22 11:52:29 +02:00
|
|
|
update_file_store,
|
2021-09-24 11:53:11 +02:00
|
|
|
index_resolver,
|
2021-05-25 09:46:11 +02:00
|
|
|
})
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn run(mut self) {
|
|
|
|
use UpdateMsg::*;
|
|
|
|
|
2021-06-23 10:41:55 +02:00
|
|
|
trace!("Started update actor.");
|
2021-03-23 11:00:50 +01:00
|
|
|
|
2021-06-02 17:45:28 +02:00
|
|
|
let mut inbox = self
|
|
|
|
.inbox
|
|
|
|
.take()
|
|
|
|
.expect("A receiver should be present by now.");
|
2021-06-09 16:19:45 +02:00
|
|
|
|
2021-06-02 17:45:28 +02:00
|
|
|
let must_exit = self.must_exit.clone();
|
|
|
|
let stream = stream! {
|
|
|
|
loop {
|
|
|
|
let msg = inbox.recv().await;
|
2021-06-09 16:19:45 +02:00
|
|
|
|
2021-06-02 17:45:28 +02:00
|
|
|
if must_exit.load(std::sync::atomic::Ordering::Relaxed) {
|
|
|
|
break;
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|
2021-06-02 17:45:28 +02:00
|
|
|
|
|
|
|
match msg {
|
|
|
|
Some(msg) => yield msg,
|
|
|
|
None => break,
|
2021-04-09 14:41:24 +02:00
|
|
|
}
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|
2021-06-02 17:45:28 +02:00
|
|
|
};
|
|
|
|
|
|
|
|
stream
|
|
|
|
.for_each_concurrent(Some(10), |msg| async {
|
|
|
|
match msg {
|
2021-09-22 11:52:29 +02:00
|
|
|
Update { uuid, update, ret } => {
|
2021-09-14 18:39:02 +02:00
|
|
|
let _ = ret.send(self.handle_update(uuid, update).await);
|
2021-06-02 17:45:28 +02:00
|
|
|
}
|
|
|
|
ListUpdates { uuid, ret } => {
|
|
|
|
let _ = ret.send(self.handle_list_updates(uuid).await);
|
|
|
|
}
|
|
|
|
GetUpdate { uuid, ret, id } => {
|
|
|
|
let _ = ret.send(self.handle_get_update(uuid, id).await);
|
|
|
|
}
|
|
|
|
Delete { uuid, ret } => {
|
|
|
|
let _ = ret.send(self.handle_delete(uuid).await);
|
|
|
|
}
|
|
|
|
Snapshot { uuids, path, ret } => {
|
|
|
|
let _ = ret.send(self.handle_snapshot(uuids, path).await);
|
|
|
|
}
|
|
|
|
GetInfo { ret } => {
|
|
|
|
let _ = ret.send(self.handle_get_info().await);
|
|
|
|
}
|
|
|
|
Dump { uuids, path, ret } => {
|
|
|
|
let _ = ret.send(self.handle_dump(uuids, path).await);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
.await;
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|
|
|
|
|
2021-09-22 11:52:29 +02:00
|
|
|
async fn handle_update(&self, index_uuid: Uuid, update: Update) -> Result<UpdateStatus> {
|
2021-09-14 18:39:02 +02:00
|
|
|
let registration = match update {
|
2021-09-22 11:52:29 +02:00
|
|
|
Update::DocumentAddition {
|
|
|
|
payload,
|
|
|
|
primary_key,
|
|
|
|
method,
|
|
|
|
format,
|
|
|
|
} => {
|
2021-09-14 18:39:02 +02:00
|
|
|
let content_uuid = match format {
|
|
|
|
DocumentAdditionFormat::Json => self.documents_from_json(payload).await?,
|
|
|
|
};
|
2021-06-23 16:34:07 +02:00
|
|
|
|
2021-09-22 11:52:29 +02:00
|
|
|
RegisterUpdate::DocumentAddition {
|
|
|
|
primary_key,
|
|
|
|
method,
|
|
|
|
content_uuid,
|
|
|
|
}
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|
2021-09-24 14:55:57 +02:00
|
|
|
Update::Settings(settings) => RegisterUpdate::Settings(settings),
|
2021-09-24 15:21:07 +02:00
|
|
|
Update::ClearDocuments => RegisterUpdate::ClearDocuments,
|
|
|
|
Update::DeleteDocuments(ids) => RegisterUpdate::DeleteDocuments(ids),
|
2021-04-22 10:14:29 +02:00
|
|
|
};
|
2021-03-23 11:00:50 +01:00
|
|
|
|
2021-09-14 18:39:02 +02:00
|
|
|
let store = self.store.clone();
|
2021-09-22 11:52:29 +02:00
|
|
|
let status =
|
|
|
|
tokio::task::spawn_blocking(move || store.register_update(index_uuid, registration))
|
|
|
|
.await??;
|
2021-09-14 18:39:02 +02:00
|
|
|
|
|
|
|
Ok(status.into())
|
|
|
|
}
|
2021-04-13 17:14:02 +02:00
|
|
|
|
2021-09-14 18:39:02 +02:00
|
|
|
async fn documents_from_json(&self, payload: Payload) -> Result<Uuid> {
|
|
|
|
let file_store = self.update_file_store.clone();
|
2021-03-23 11:00:50 +01:00
|
|
|
tokio::task::spawn_blocking(move || {
|
2021-09-14 18:39:02 +02:00
|
|
|
let (uuid, mut file) = file_store.new_update().unwrap();
|
|
|
|
let mut builder = DocumentBatchBuilder::new(&mut *file).unwrap();
|
|
|
|
|
2021-09-22 11:52:29 +02:00
|
|
|
let documents: Vec<Map<String, Value>> =
|
|
|
|
serde_json::from_reader(StreamReader::new(payload))?;
|
2021-09-14 18:39:02 +02:00
|
|
|
builder.add_documents(documents).unwrap();
|
|
|
|
builder.finish().unwrap();
|
|
|
|
|
|
|
|
file.persist();
|
|
|
|
|
|
|
|
Ok(uuid)
|
2021-09-22 11:52:29 +02:00
|
|
|
})
|
|
|
|
.await?
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
|
2021-04-13 17:14:02 +02:00
|
|
|
let update_store = self.store.clone();
|
2021-03-23 11:00:50 +01:00
|
|
|
tokio::task::spawn_blocking(move || {
|
2021-05-25 09:46:11 +02:00
|
|
|
let result = update_store.list(uuid)?;
|
2021-03-23 11:00:50 +01:00
|
|
|
Ok(result)
|
|
|
|
})
|
2021-05-25 09:46:11 +02:00
|
|
|
.await?
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
|
2021-04-13 17:14:02 +02:00
|
|
|
let store = self.store.clone();
|
2021-06-09 17:10:10 +02:00
|
|
|
tokio::task::spawn_blocking(move || {
|
2021-06-15 17:39:07 +02:00
|
|
|
let result = store
|
|
|
|
.meta(uuid, id)?
|
2021-09-24 11:53:11 +02:00
|
|
|
.ok_or(UpdateLoopError::UnexistingUpdate(id))?;
|
2021-06-09 17:10:10 +02:00
|
|
|
Ok(result)
|
|
|
|
})
|
|
|
|
.await?
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
|
2021-05-10 20:24:14 +02:00
|
|
|
let store = self.store.clone();
|
2021-04-28 16:43:49 +02:00
|
|
|
|
2021-05-25 09:46:11 +02:00
|
|
|
tokio::task::spawn_blocking(move || store.delete_all(uuid)).await??;
|
2021-04-28 16:43:49 +02:00
|
|
|
|
|
|
|
Ok(())
|
|
|
|
}
|
2021-03-23 11:00:50 +01:00
|
|
|
|
2021-09-24 11:53:11 +02:00
|
|
|
async fn handle_snapshot(&self, _uuids: HashSet<Uuid>,_pathh: PathBuf) -> Result<()> {
|
|
|
|
todo!()
|
|
|
|
//let index_handle = self.index_resolver.clone();
|
|
|
|
//let update_store = self.store.clone();
|
2021-04-28 16:43:49 +02:00
|
|
|
|
2021-09-24 11:53:11 +02:00
|
|
|
//tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle))
|
|
|
|
//.await??;
|
2021-03-23 11:00:50 +01:00
|
|
|
|
2021-09-24 11:53:11 +02:00
|
|
|
//Ok(())
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|
|
|
|
|
2021-05-25 16:33:09 +02:00
|
|
|
async fn handle_dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
|
2021-09-24 11:53:11 +02:00
|
|
|
let index_handle = self.index_resolver.clone();
|
2021-04-13 17:14:02 +02:00
|
|
|
let update_store = self.store.clone();
|
2021-05-31 16:40:59 +02:00
|
|
|
|
2021-06-14 21:26:35 +02:00
|
|
|
tokio::task::spawn_blocking(move || -> Result<()> {
|
2021-05-05 19:06:07 +02:00
|
|
|
update_store.dump(&uuids, path.to_path_buf(), index_handle)?;
|
|
|
|
Ok(())
|
2021-04-13 17:14:02 +02:00
|
|
|
})
|
2021-05-25 09:46:11 +02:00
|
|
|
.await??;
|
|
|
|
|
2021-03-23 11:00:50 +01:00
|
|
|
Ok(())
|
|
|
|
}
|
2021-04-09 14:41:24 +02:00
|
|
|
|
2021-04-14 18:55:04 +02:00
|
|
|
async fn handle_get_info(&self) -> Result<UpdateStoreInfo> {
|
2021-04-14 17:53:12 +02:00
|
|
|
let update_store = self.store.clone();
|
2021-06-14 21:26:35 +02:00
|
|
|
let info = tokio::task::spawn_blocking(move || -> Result<UpdateStoreInfo> {
|
2021-04-22 10:14:29 +02:00
|
|
|
let info = update_store.get_info()?;
|
2021-04-14 18:55:04 +02:00
|
|
|
Ok(info)
|
2021-04-14 17:53:12 +02:00
|
|
|
})
|
2021-05-25 09:46:11 +02:00
|
|
|
.await??;
|
2021-04-09 14:41:24 +02:00
|
|
|
|
2021-04-14 18:55:04 +02:00
|
|
|
Ok(info)
|
2021-04-09 14:41:24 +02:00
|
|
|
}
|
2021-03-23 11:00:50 +01:00
|
|
|
}
|