multi index store

create two channels for Index handler, one for writes and one for reads,
so write are processed one at a time, while reads are processed in
parallel.
This commit is contained in:
mpostma 2021-03-04 19:18:01 +01:00
parent 6a0a9fec6b
commit f090f42e7a
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
3 changed files with 188 additions and 83 deletions

4
Cargo.lock generated
View File

@ -1,5 +1,7 @@
# This file is automatically @generated by Cargo. # This file is automatically @generated by Cargo.
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3
[[package]] [[package]]
name = "actix-codec" name = "actix-codec"
version = "0.3.0" version = "0.3.0"
@ -428,7 +430,7 @@ checksum = "afddf7f520a80dbf76e6f50a35bca42a2331ef227a28b3b6dc5c2e2338d114b1"
[[package]] [[package]]
name = "assert-json-diff" name = "assert-json-diff"
version = "1.0.1" version = "1.0.1"
source = "git+https://github.com/qdequele/assert-json-diff#9012a0c8866d0f2db0ef9a6242e4a19d1e8c67e4" source = "git+https://github.com/qdequele/assert-json-diff?branch=master#9012a0c8866d0f2db0ef9a6242e4a19d1e8c67e4"
dependencies = [ dependencies = [
"serde", "serde",
"serde_json", "serde_json",

View File

@ -11,6 +11,8 @@ use log::info;
use thiserror::Error; use thiserror::Error;
use tokio::sync::{mpsc, oneshot, RwLock}; use tokio::sync::{mpsc, oneshot, RwLock};
use uuid::Uuid; use uuid::Uuid;
use std::future::Future;
use futures::pin_mut;
use super::update_handler::UpdateHandler; use super::update_handler::UpdateHandler;
use crate::index::UpdateResult as UResult; use crate::index::UpdateResult as UResult;
@ -61,7 +63,8 @@ enum IndexMsg {
} }
struct IndexActor<S> { struct IndexActor<S> {
inbox: Option<mpsc::Receiver<IndexMsg>>, read_receiver: Option<mpsc::Receiver<IndexMsg>>,
write_receiver: Option<mpsc::Receiver<IndexMsg>>,
update_handler: Arc<UpdateHandler>, update_handler: Arc<UpdateHandler>,
store: S, store: S,
} }
@ -82,60 +85,96 @@ trait IndexStore {
} }
impl<S: IndexStore + Sync + Send> IndexActor<S> { impl<S: IndexStore + Sync + Send> IndexActor<S> {
fn new(inbox: mpsc::Receiver<IndexMsg>, store: S) -> Self { fn new(
read_receiver: mpsc::Receiver<IndexMsg>,
write_receiver: mpsc::Receiver<IndexMsg>,
store: S
) -> Self {
let options = IndexerOpts::default(); let options = IndexerOpts::default();
let update_handler = UpdateHandler::new(&options).unwrap(); let update_handler = UpdateHandler::new(&options).unwrap();
let update_handler = Arc::new(update_handler); let update_handler = Arc::new(update_handler);
let inbox = Some(inbox); let read_receiver = Some(read_receiver);
let write_receiver = Some(write_receiver);
Self { Self {
inbox, read_receiver,
write_receiver,
store, store,
update_handler, update_handler,
} }
} }
/// `run` poll the write_receiver and read_receiver concurrently, but while messages send
/// through the read channel are processed concurrently, the messages sent through the write
/// channel are processed one at a time.
async fn run(mut self) { async fn run(mut self) {
use IndexMsg::*; let mut read_receiver = self
.read_receiver
let mut inbox = self
.inbox
.take() .take()
.expect("Index Actor must have a inbox at this point."); .expect("Index Actor must have a inbox at this point.");
let stream = stream! { let read_stream = stream! {
loop { loop {
match inbox.recv().await { match read_receiver.recv().await {
Some(msg) => yield msg, Some(msg) => yield msg,
None => break, None => break,
} }
} }
}; };
let fut = stream.for_each_concurrent(Some(10), |msg| async { let mut write_receiver = self
match msg { .write_receiver
CreateIndex { .take()
uuid, .expect("Index Actor must have a inbox at this point.");
primary_key,
ret,
} => self.handle_create_index(uuid, primary_key, ret).await,
Update { ret, meta, data } => self.handle_update(meta, data, ret).await,
Search { ret, query, uuid } => self.handle_search(uuid, query, ret).await,
Settings { ret, uuid } => self.handle_settings(uuid, ret).await,
Documents {
ret,
uuid,
attributes_to_retrieve,
offset,
limit,
} => {
self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve, ret)
.await
}
Document { uuid, attributes_to_retrieve, doc_id, ret } => self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve, ret).await,
}
});
fut.await; let write_stream = stream! {
loop {
match write_receiver.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
pin_mut!(write_stream);
pin_mut!(read_stream);
let fut1 = read_stream.for_each_concurrent(Some(10), |msg| self.handle_message(msg));
let fut2 = write_stream.for_each_concurrent(Some(1), |msg| self.handle_message(msg));
let fut1: Box<dyn Future<Output = ()> + Unpin + Send> = Box::new(fut1);
let fut2: Box<dyn Future<Output = ()> + Unpin + Send> = Box::new(fut2);
//let futures = futures::stream::futures_unordered::FuturesUnordered::new();
//futures.push(fut1);
//futures.push(fut2);
//futures.for_each(f)
tokio::join!(fut1, fut2);
}
async fn handle_message(&self, msg: IndexMsg) {
use IndexMsg::*;
match msg {
CreateIndex {
uuid,
primary_key,
ret,
} => self.handle_create_index(uuid, primary_key, ret).await,
Update { ret, meta, data } => self.handle_update(meta, data, ret).await,
Search { ret, query, uuid } => self.handle_search(uuid, query, ret).await,
Settings { ret, uuid } => self.handle_settings(uuid, ret).await,
Documents {
ret,
uuid,
attributes_to_retrieve,
offset,
limit,
} => {
self.handle_fetch_documents(uuid, offset, limit, attributes_to_retrieve, ret)
.await
}
Document { uuid, attributes_to_retrieve, doc_id, ret } => self.handle_fetch_document(uuid, doc_id, attributes_to_retrieve, ret).await,
}
} }
async fn handle_search( async fn handle_search(
@ -221,17 +260,19 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
#[derive(Clone)] #[derive(Clone)]
pub struct IndexActorHandle { pub struct IndexActorHandle {
sender: mpsc::Sender<IndexMsg>, read_sender: mpsc::Sender<IndexMsg>,
write_sender: mpsc::Sender<IndexMsg>,
} }
impl IndexActorHandle { impl IndexActorHandle {
pub fn new(path: impl AsRef<Path>) -> Self { pub fn new(path: impl AsRef<Path>) -> Self {
let (sender, receiver) = mpsc::channel(100); let (read_sender, read_receiver) = mpsc::channel(100);
let (write_sender, write_receiver) = mpsc::channel(100);
let store = MapIndexStore::new(path); let store = MapIndexStore::new(path);
let actor = IndexActor::new(receiver, store); let actor = IndexActor::new(read_receiver, write_receiver, store);
tokio::task::spawn(actor.run()); tokio::task::spawn(actor.run());
Self { sender } Self { read_sender, write_sender }
} }
pub async fn create_index( pub async fn create_index(
@ -245,28 +286,28 @@ impl IndexActorHandle {
uuid, uuid,
primary_key, primary_key,
}; };
let _ = self.sender.send(msg).await; let _ = self.read_sender.send(msg).await;
receiver.await.expect("IndexActor has been killed") receiver.await.expect("IndexActor has been killed")
} }
pub async fn update(&self, meta: Processing<UpdateMeta>, data: std::fs::File) -> UpdateResult { pub async fn update(&self, meta: Processing<UpdateMeta>, data: std::fs::File) -> UpdateResult {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Update { ret, meta, data }; let msg = IndexMsg::Update { ret, meta, data };
let _ = self.sender.send(msg).await; let _ = self.read_sender.send(msg).await;
receiver.await.expect("IndexActor has been killed") receiver.await.expect("IndexActor has been killed")
} }
pub async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult> { pub async fn search(&self, uuid: Uuid, query: SearchQuery) -> Result<SearchResult> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Search { uuid, query, ret }; let msg = IndexMsg::Search { uuid, query, ret };
let _ = self.sender.send(msg).await; let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?) Ok(receiver.await.expect("IndexActor has been killed")?)
} }
pub async fn settings(&self, uuid: Uuid) -> Result<Settings> { pub async fn settings(&self, uuid: Uuid) -> Result<Settings> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Settings { uuid, ret }; let msg = IndexMsg::Settings { uuid, ret };
let _ = self.sender.send(msg).await; let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?) Ok(receiver.await.expect("IndexActor has been killed")?)
} }
@ -285,7 +326,7 @@ impl IndexActorHandle {
attributes_to_retrieve, attributes_to_retrieve,
limit, limit,
}; };
let _ = self.sender.send(msg).await; let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?) Ok(receiver.await.expect("IndexActor has been killed")?)
} }
@ -302,7 +343,7 @@ impl IndexActorHandle {
doc_id, doc_id,
attributes_to_retrieve, attributes_to_retrieve,
}; };
let _ = self.sender.send(msg).await; let _ = self.read_sender.send(msg).await;
Ok(receiver.await.expect("IndexActor has been killed")?) Ok(receiver.await.expect("IndexActor has been killed")?)
} }
} }

View File

@ -1,17 +1,18 @@
use std::fs::create_dir_all; use std::fs::create_dir_all;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::collections::{HashMap, hash_map::Entry};
use log::info;
use super::index_actor::IndexActorHandle; use super::index_actor::IndexActorHandle;
use log::info;
use thiserror::Error; use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
use tokio::fs::File; use tokio::fs::File;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::sync::{mpsc, oneshot, RwLock};
use uuid::Uuid;
use crate::index_controller::{UpdateMeta, UpdateStatus};
use crate::index::UpdateResult; use crate::index::UpdateResult;
use crate::index_controller::{UpdateMeta, UpdateStatus};
pub type Result<T> = std::result::Result<T, UpdateError>; pub type Result<T> = std::result::Result<T, UpdateError>;
type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>; type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>;
@ -28,33 +29,42 @@ enum UpdateMsg<D> {
uuid: Uuid, uuid: Uuid,
meta: UpdateMeta, meta: UpdateMeta,
data: mpsc::Receiver<PayloadData<D>>, data: mpsc::Receiver<PayloadData<D>>,
ret: oneshot::Sender<Result<UpdateStatus>> ret: oneshot::Sender<Result<UpdateStatus>>,
}, },
ListUpdates { ListUpdates {
uuid: Uuid, uuid: Uuid,
ret: oneshot::Sender<Result<Vec<UpdateStatus>>>, ret: oneshot::Sender<Result<Vec<UpdateStatus>>>,
} },
} }
struct UpdateActor<D> { struct UpdateActor<D, S> {
path: PathBuf, path: PathBuf,
store: Arc<UpdateStore>, store: S,
inbox: mpsc::Receiver<UpdateMsg<D>>, inbox: mpsc::Receiver<UpdateMsg<D>>,
index_handle: IndexActorHandle,
} }
impl<D> UpdateActor<D> #[async_trait::async_trait]
where D: AsRef<[u8]> + Sized + 'static, trait UpdateStoreStore {
async fn get_or_create(&self, uuid: Uuid) -> Result<Arc<UpdateStore>>;
}
impl<D, S> UpdateActor<D, S>
where
D: AsRef<[u8]> + Sized + 'static,
S: UpdateStoreStore,
{ {
fn new( fn new(
store: Arc<UpdateStore>, store: S,
inbox: mpsc::Receiver<UpdateMsg<D>>, inbox: mpsc::Receiver<UpdateMsg<D>>,
index_handle: IndexActorHandle,
path: impl AsRef<Path>, path: impl AsRef<Path>,
) -> Self { ) -> Self {
let path = path.as_ref().to_owned().join("update_files"); let path = path.as_ref().to_owned().join("update_files");
create_dir_all(&path).unwrap(); create_dir_all(&path).unwrap();
Self { store, inbox, index_handle, path } Self {
store,
inbox,
path,
}
} }
async fn run(mut self) { async fn run(mut self) {
@ -64,15 +74,26 @@ where D: AsRef<[u8]> + Sized + 'static,
loop { loop {
match self.inbox.recv().await { match self.inbox.recv().await {
Some(Update { uuid, meta, data, ret }) => self.handle_update(uuid, meta, data, ret).await, Some(Update {
uuid,
meta,
data,
ret,
}) => self.handle_update(uuid, meta, data, ret).await,
Some(ListUpdates { uuid, ret }) => self.handle_list_updates(uuid, ret).await, Some(ListUpdates { uuid, ret }) => self.handle_list_updates(uuid, ret).await,
None => {} None => {}
} }
} }
} }
async fn handle_update(&self, uuid: Uuid, meta: UpdateMeta, mut payload: mpsc::Receiver<PayloadData<D>>, ret: oneshot::Sender<Result<UpdateStatus>>) { async fn handle_update(
let store = self.store.clone(); &self,
uuid: Uuid,
meta: UpdateMeta,
mut payload: mpsc::Receiver<PayloadData<D>>,
ret: oneshot::Sender<Result<UpdateStatus>>,
) {
let update_store = self.store.get_or_create(uuid).await.unwrap();
let update_file_id = uuid::Uuid::new_v4(); let update_file_id = uuid::Uuid::new_v4();
let path = self.path.join(format!("update_{}", update_file_id)); let path = self.path.join(format!("update_{}", update_file_id));
let mut file = File::create(&path).await.unwrap(); let mut file = File::create(&path).await.unwrap();
@ -84,7 +105,7 @@ where D: AsRef<[u8]> + Sized + 'static,
} }
Err(e) => { Err(e) => {
ret.send(Err(UpdateError::Error(e))); ret.send(Err(UpdateError::Error(e)));
return return;
} }
} }
} }
@ -94,15 +115,20 @@ where D: AsRef<[u8]> + Sized + 'static,
let file = file.into_std().await; let file = file.into_std().await;
let result = tokio::task::spawn_blocking(move || { let result = tokio::task::spawn_blocking(move || {
let result = store let result = update_store
.register_update(meta, path, uuid) .register_update(meta, path, uuid)
.map(|pending| UpdateStatus::Pending(pending)) .map(|pending| UpdateStatus::Pending(pending))
.map_err(|e| UpdateError::Error(Box::new(e))); .map_err(|e| UpdateError::Error(Box::new(e)));
let _ = ret.send(result); let _ = ret.send(result);
}).await; })
.await;
} }
async fn handle_list_updates(&self, uuid: Uuid, ret: oneshot::Sender<Result<Vec<UpdateStatus>>>) { async fn handle_list_updates(
&self,
uuid: Uuid,
ret: oneshot::Sender<Result<Vec<UpdateStatus>>>,
) {
todo!() todo!()
} }
} }
@ -113,29 +139,26 @@ pub struct UpdateActorHandle<D> {
} }
impl<D> UpdateActorHandle<D> impl<D> UpdateActorHandle<D>
where D: AsRef<[u8]> + Sized + 'static, where
D: AsRef<[u8]> + Sized + 'static,
{ {
pub fn new(index_handle: IndexActorHandle, path: impl AsRef<Path>) -> Self { pub fn new(index_handle: IndexActorHandle, path: impl AsRef<Path>) -> Self {
let path = path.as_ref().to_owned().join("updates");
let (sender, receiver) = mpsc::channel(100); let (sender, receiver) = mpsc::channel(100);
let mut options = heed::EnvOpenOptions::new(); let store = MapUpdateStoreStore::new(index_handle, &path);
options.map_size(4096 * 100_000); let actor = UpdateActor::new(store, receiver, path);
let path = path
.as_ref()
.to_owned()
.join("updates");
create_dir_all(&path).unwrap();
let index_handle_clone = index_handle.clone();
let store = UpdateStore::open(options, &path, move |meta, file| {
futures::executor::block_on(index_handle_clone.update(meta, file))
}).unwrap();
let actor = UpdateActor::new(store, receiver, index_handle, path);
tokio::task::spawn_local(actor.run()); tokio::task::spawn_local(actor.run());
Self { sender } Self { sender }
} }
pub async fn update(&self, meta: UpdateMeta, data: mpsc::Receiver<PayloadData<D>>, uuid: Uuid) -> Result<UpdateStatus> { pub async fn update(
&self,
meta: UpdateMeta,
data: mpsc::Receiver<PayloadData<D>>,
uuid: Uuid,
) -> Result<UpdateStatus> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Update { let msg = UpdateMsg::Update {
uuid, uuid,
@ -143,7 +166,46 @@ where D: AsRef<[u8]> + Sized + 'static,
meta, meta,
ret, ret,
}; };
let _ = self.sender.send(msg).await; let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.") receiver.await.expect("update actor killed.")
} }
} }
struct MapUpdateStoreStore {
db: Arc<RwLock<HashMap<Uuid, Arc<UpdateStore>>>>,
index_handle: IndexActorHandle,
path: PathBuf,
}
impl MapUpdateStoreStore {
fn new(index_handle: IndexActorHandle, path: impl AsRef<Path>) -> Self {
let db = Arc::new(RwLock::new(HashMap::new()));
let path = path.as_ref().to_owned();
Self { db, index_handle, path }
}
}
#[async_trait::async_trait]
impl UpdateStoreStore for MapUpdateStoreStore {
async fn get_or_create(&self, uuid: Uuid) -> Result<Arc<UpdateStore>> {
match self.db.write().await.entry(uuid) {
Entry::Vacant(e) => {
let mut options = heed::EnvOpenOptions::new();
options.map_size(4096 * 100_000);
let path = self.path.clone().join(format!("updates-{}", e.key()));
create_dir_all(&path).unwrap();
let index_handle = self.index_handle.clone();
let store = UpdateStore::open(options, &path, move |meta, file| {
futures::executor::block_on(index_handle.update(meta, file))
}).unwrap();
let store = e.insert(store);
Ok(store.clone())
}
Entry::Occupied(e) => {
Ok(e.get().clone())
}
}
}
}