MeiliSearch/meilisearch-lib/src/index_controller/updates/mod.rs

280 lines
8.3 KiB
Rust
Raw Normal View History

2021-09-22 11:52:29 +02:00
pub mod error;
mod message;
pub mod status;
pub mod store;
2021-09-14 18:39:02 +02:00
use std::io;
2021-03-23 11:00:50 +01:00
use std::path::{Path, PathBuf};
2021-06-09 16:19:45 +02:00
use std::sync::atomic::AtomicBool;
2021-04-13 17:14:02 +02:00
use std::sync::Arc;
2021-03-23 11:00:50 +01:00
2021-09-14 18:39:02 +02:00
use actix_web::error::PayloadError;
2021-06-02 17:45:28 +02:00
use async_stream::stream;
2021-09-14 18:39:02 +02:00
use bytes::Bytes;
use futures::{Stream, StreamExt};
use log::trace;
2021-09-22 11:52:29 +02:00
use milli::update::IndexDocumentsMethod;
use serde::{Deserialize, Serialize};
2021-03-24 11:29:11 +01:00
use tokio::sync::mpsc;
use uuid::Uuid;
2021-03-23 11:00:50 +01:00
2021-09-24 11:53:11 +02:00
use self::error::{Result, UpdateLoopError};
2021-09-22 11:52:29 +02:00
pub use self::message::UpdateMsg;
use self::store::{UpdateStore, UpdateStoreInfo};
use crate::document_formats::read_json;
2021-09-27 16:48:03 +02:00
use crate::index::{Index, Settings, Unchecked};
2021-09-14 18:39:02 +02:00
use crate::index_controller::update_file_store::UpdateFileStore;
2021-09-22 11:52:29 +02:00
use status::UpdateStatus;
2021-03-23 11:00:50 +01:00
2021-09-24 11:53:11 +02:00
use super::index_resolver::HardStateIndexResolver;
use super::{DocumentAdditionFormat, Update};
2021-09-22 11:52:29 +02:00
pub type UpdateSender = mpsc::Sender<UpdateMsg>;
pub fn create_update_handler(
2021-09-24 11:53:11 +02:00
index_resolver: Arc<HardStateIndexResolver>,
2021-09-22 11:52:29 +02:00
db_path: impl AsRef<Path>,
update_store_size: usize,
) -> anyhow::Result<UpdateSender> {
let path = db_path.as_ref().to_owned();
let (sender, receiver) = mpsc::channel(100);
2021-09-24 11:53:11 +02:00
let actor = UpdateLoop::new(update_store_size, receiver, path, index_resolver)?;
2021-09-22 11:52:29 +02:00
tokio::task::spawn_local(actor.run());
Ok(sender)
}
/// A wrapper type to implement read on a `Stream<Result<Bytes, Error>>`.
2021-09-14 18:39:02 +02:00
struct StreamReader<S> {
stream: S,
current: Option<Bytes>,
}
impl<S> StreamReader<S> {
fn new(stream: S) -> Self {
2021-09-22 11:52:29 +02:00
Self {
stream,
current: None,
}
2021-09-14 18:39:02 +02:00
}
}
2021-09-22 11:52:29 +02:00
impl<S: Stream<Item = std::result::Result<Bytes, PayloadError>> + Unpin> io::Read
for StreamReader<S>
{
2021-09-14 18:39:02 +02:00
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
2021-09-22 11:52:29 +02:00
// TODO: optimize buf filling
2021-09-14 18:39:02 +02:00
match self.current.take() {
Some(mut bytes) => {
let copied = bytes.split_to(buf.len());
buf.copy_from_slice(&copied);
if !bytes.is_empty() {
self.current.replace(bytes);
}
Ok(copied.len())
}
2021-09-22 11:52:29 +02:00
None => match tokio::runtime::Handle::current().block_on(self.stream.next()) {
Some(Ok(bytes)) => {
self.current.replace(bytes);
self.read(buf)
2021-09-14 18:39:02 +02:00
}
2021-09-22 11:52:29 +02:00
Some(Err(e)) => Err(io::Error::new(io::ErrorKind::BrokenPipe, e)),
None => return Ok(0),
},
2021-09-14 18:39:02 +02:00
}
}
}
2021-09-22 15:07:04 +02:00
pub struct UpdateLoop {
2021-09-22 11:52:29 +02:00
store: Arc<UpdateStore>,
inbox: Option<mpsc::Receiver<UpdateMsg>>,
update_file_store: UpdateFileStore,
must_exit: Arc<AtomicBool>,
}
2021-09-22 15:07:04 +02:00
impl UpdateLoop {
2021-03-23 11:00:50 +01:00
pub fn new(
2021-04-13 17:14:02 +02:00
update_db_size: usize,
2021-09-14 18:39:02 +02:00
inbox: mpsc::Receiver<UpdateMsg>,
2021-03-23 11:00:50 +01:00
path: impl AsRef<Path>,
2021-09-24 11:53:11 +02:00
index_resolver: Arc<HardStateIndexResolver>,
2021-06-15 17:39:07 +02:00
) -> anyhow::Result<Self> {
2021-09-14 18:39:02 +02:00
let path = path.as_ref().to_owned();
2021-04-13 17:14:02 +02:00
std::fs::create_dir_all(&path)?;
let mut options = heed::EnvOpenOptions::new();
options.map_size(update_db_size);
2021-06-09 16:19:45 +02:00
let must_exit = Arc::new(AtomicBool::new(false));
2021-09-27 16:48:03 +02:00
let update_file_store = UpdateFileStore::new(&path).unwrap();
let store = UpdateStore::open(options, &path, index_resolver.clone(), must_exit.clone(), update_file_store.clone())?;
2021-09-14 18:39:02 +02:00
2021-06-02 17:45:28 +02:00
let inbox = Some(inbox);
2021-09-14 18:39:02 +02:00
2021-05-25 09:46:11 +02:00
Ok(Self {
store,
inbox,
2021-06-09 16:19:45 +02:00
must_exit,
2021-09-22 11:52:29 +02:00
update_file_store,
2021-05-25 09:46:11 +02:00
})
2021-03-23 11:00:50 +01:00
}
pub async fn run(mut self) {
use UpdateMsg::*;
2021-06-23 10:41:55 +02:00
trace!("Started update actor.");
2021-03-23 11:00:50 +01:00
2021-06-02 17:45:28 +02:00
let mut inbox = self
.inbox
.take()
.expect("A receiver should be present by now.");
2021-06-09 16:19:45 +02:00
2021-06-02 17:45:28 +02:00
let must_exit = self.must_exit.clone();
let stream = stream! {
loop {
let msg = inbox.recv().await;
2021-06-09 16:19:45 +02:00
2021-06-02 17:45:28 +02:00
if must_exit.load(std::sync::atomic::Ordering::Relaxed) {
break;
2021-03-23 11:00:50 +01:00
}
2021-06-02 17:45:28 +02:00
match msg {
Some(msg) => yield msg,
None => break,
}
2021-03-23 11:00:50 +01:00
}
2021-06-02 17:45:28 +02:00
};
stream
.for_each_concurrent(Some(10), |msg| async {
match msg {
2021-09-22 11:52:29 +02:00
Update { uuid, update, ret } => {
2021-09-14 18:39:02 +02:00
let _ = ret.send(self.handle_update(uuid, update).await);
2021-06-02 17:45:28 +02:00
}
ListUpdates { uuid, ret } => {
let _ = ret.send(self.handle_list_updates(uuid).await);
}
GetUpdate { uuid, ret, id } => {
let _ = ret.send(self.handle_get_update(uuid, id).await);
}
2021-09-28 18:10:09 +02:00
DeleteIndex { uuid, ret } => {
2021-06-02 17:45:28 +02:00
let _ = ret.send(self.handle_delete(uuid).await);
}
2021-09-27 16:48:03 +02:00
Snapshot { indexes, path, ret } => {
let _ = ret.send(self.handle_snapshot(indexes, path).await);
2021-06-02 17:45:28 +02:00
}
GetInfo { ret } => {
let _ = ret.send(self.handle_get_info().await);
}
2021-09-28 11:59:55 +02:00
Dump { indexes, path, ret } => {
let _ = ret.send(self.handle_dump(indexes, path).await);
2021-06-02 17:45:28 +02:00
}
}
})
.await;
2021-03-23 11:00:50 +01:00
}
2021-09-22 11:52:29 +02:00
async fn handle_update(&self, index_uuid: Uuid, update: Update) -> Result<UpdateStatus> {
2021-09-14 18:39:02 +02:00
let registration = match update {
2021-09-22 11:52:29 +02:00
Update::DocumentAddition {
payload,
primary_key,
method,
format,
} => {
let reader = StreamReader::new(payload);
let (content_uuid, mut update_file) = self.update_file_store.new_update()?;
tokio::task::spawn_blocking(move || -> Result<_> {
match format {
DocumentAdditionFormat::Json => read_json(reader, &mut *update_file)?,
}
update_file.persist()?;
Ok(())
}).await??;
2021-06-23 16:34:07 +02:00
2021-09-28 20:20:13 +02:00
store::Update::DocumentAddition {
2021-09-22 11:52:29 +02:00
primary_key,
method,
content_uuid,
}
2021-03-23 11:00:50 +01:00
}
2021-09-28 20:20:13 +02:00
Update::Settings(settings) => store::Update::Settings(settings),
Update::ClearDocuments => store::Update::ClearDocuments,
Update::DeleteDocuments(ids) => store::Update::DeleteDocuments(ids),
2021-04-22 10:14:29 +02:00
};
2021-03-23 11:00:50 +01:00
2021-09-14 18:39:02 +02:00
let store = self.store.clone();
2021-09-22 11:52:29 +02:00
let status =
tokio::task::spawn_blocking(move || store.register_update(index_uuid, registration))
.await??;
2021-09-14 18:39:02 +02:00
Ok(status.into())
}
2021-04-13 17:14:02 +02:00
2021-03-23 11:00:50 +01:00
async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
2021-04-13 17:14:02 +02:00
let update_store = self.store.clone();
2021-03-23 11:00:50 +01:00
tokio::task::spawn_blocking(move || {
2021-05-25 09:46:11 +02:00
let result = update_store.list(uuid)?;
2021-03-23 11:00:50 +01:00
Ok(result)
})
2021-05-25 09:46:11 +02:00
.await?
2021-03-23 11:00:50 +01:00
}
async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result<UpdateStatus> {
2021-04-13 17:14:02 +02:00
let store = self.store.clone();
2021-06-09 17:10:10 +02:00
tokio::task::spawn_blocking(move || {
2021-06-15 17:39:07 +02:00
let result = store
.meta(uuid, id)?
2021-09-24 11:53:11 +02:00
.ok_or(UpdateLoopError::UnexistingUpdate(id))?;
2021-06-09 17:10:10 +02:00
Ok(result)
})
.await?
2021-03-23 11:00:50 +01:00
}
async fn handle_delete(&self, uuid: Uuid) -> Result<()> {
2021-05-10 20:24:14 +02:00
let store = self.store.clone();
2021-04-28 16:43:49 +02:00
2021-05-25 09:46:11 +02:00
tokio::task::spawn_blocking(move || store.delete_all(uuid)).await??;
2021-04-28 16:43:49 +02:00
Ok(())
}
2021-03-23 11:00:50 +01:00
2021-09-27 16:48:03 +02:00
async fn handle_snapshot(&self, indexes: Vec<Index>, path: PathBuf) -> Result<()> {
let update_store = self.store.clone();
2021-04-28 16:43:49 +02:00
2021-09-27 16:48:03 +02:00
tokio::task::spawn_blocking(move || update_store.snapshot(indexes, path))
.await??;
2021-03-23 11:00:50 +01:00
2021-09-27 16:48:03 +02:00
Ok(())
2021-03-23 11:00:50 +01:00
}
2021-09-28 11:59:55 +02:00
async fn handle_dump(&self, indexes: Vec<Index>, path: PathBuf) -> Result<()> {
2021-04-13 17:14:02 +02:00
let update_store = self.store.clone();
2021-05-31 16:40:59 +02:00
tokio::task::spawn_blocking(move || -> Result<()> {
2021-09-28 11:59:55 +02:00
update_store.dump(&indexes, path.to_path_buf())?;
2021-05-05 19:06:07 +02:00
Ok(())
2021-04-13 17:14:02 +02:00
})
2021-05-25 09:46:11 +02:00
.await??;
2021-03-23 11:00:50 +01:00
Ok(())
}
2021-04-14 18:55:04 +02:00
async fn handle_get_info(&self) -> Result<UpdateStoreInfo> {
2021-04-14 17:53:12 +02:00
let update_store = self.store.clone();
let info = tokio::task::spawn_blocking(move || -> Result<UpdateStoreInfo> {
2021-04-22 10:14:29 +02:00
let info = update_store.get_info()?;
2021-04-14 18:55:04 +02:00
Ok(info)
2021-04-14 17:53:12 +02:00
})
2021-05-25 09:46:11 +02:00
.await??;
2021-04-14 18:55:04 +02:00
Ok(info)
}
2021-03-23 11:00:50 +01:00
}