WIP concurent index store

This commit is contained in:
mpostma 2021-03-01 15:45:35 +01:00
parent c4dfd5f0c3
commit 62532b8f79
No known key found for this signature in database
GPG key ID: CBC8A7C1D7A28C3A
6 changed files with 106 additions and 53 deletions

View file

@ -23,10 +23,11 @@ impl Data {
{
let file = tempfile::tempfile_in(".")?;
let mut file = File::from_std(file);
while let Some(Ok(bytes)) = stream.next().await {
file.write(bytes.as_ref()).await;
while let Some(item) = stream.next().await {
file.write_all(&item?).await?;
}
file.seek(std::io::SeekFrom::Start(0)).await?;
file.flush().await?;
let update_status = self.index_controller.add_documents(index.as_ref().to_string(), method, format, file, primary_key).await?;
Ok(update_status)
}

View file

@ -13,7 +13,7 @@ use tokio::sync::{mpsc, oneshot, RwLock};
use uuid::Uuid;
use log::info;
use crate::data::SearchQuery;
use futures::stream::{StreamExt, Stream};
use futures::stream::StreamExt;
use super::update_handler::UpdateHandler;
use async_stream::stream;
@ -32,7 +32,7 @@ enum IndexMsg {
}
struct IndexActor<S> {
inbox: mpsc::Receiver<IndexMsg>,
inbox: Option<mpsc::Receiver<IndexMsg>>,
update_handler: Arc<UpdateHandler>,
store: S,
}
@ -57,26 +57,31 @@ impl<S: IndexStore + Sync + Send> IndexActor<S> {
let options = IndexerOpts::default();
let update_handler = UpdateHandler::new(&options).unwrap();
let update_handler = Arc::new(update_handler);
let inbox = Some(inbox);
Self { inbox, store, update_handler }
}
async fn run(mut self) {
let mut inbox = self.inbox.take().expect("Index Actor must have a inbox at this point.");
let stream = stream! {
loop {
match self.inbox.recv().await {
match inbox.recv().await {
Some(msg) => yield msg,
None => break,
}
}
};
stream.for_each_concurent(Some(10), |msg| {
let fut = stream.for_each_concurrent(Some(10), |msg| async {
match msg {
IndexMsg::CreateIndex { uuid, primary_key, ret } => self.handle_create_index(uuid, primary_key, ret),
IndexMsg::Update { ret, meta, data } => self.handle_update(meta, data, ret),
IndexMsg::Search { ret, query, uuid } => self.handle_search(uuid, query, ret),
IndexMsg::CreateIndex { uuid, primary_key, ret } => self.handle_create_index(uuid, primary_key, ret).await,
IndexMsg::Update { ret, meta, data } => self.handle_update(meta, data, ret).await,
IndexMsg::Search { ret, query, uuid } => self.handle_search(uuid, query, ret).await,
}
})
});
fut.await;
}
async fn handle_search(&self, uuid: Uuid, query: SearchQuery, ret: oneshot::Sender<anyhow::Result<SearchResult>>) {

View file

@ -41,7 +41,6 @@ impl UpdateActor {
}
async fn run(mut self) {
info!("started update actor.");
loop {

View file

@ -1,6 +1,6 @@
use std::path::Path;
use std::sync::{Arc, RwLock};
use std::io::{Cursor, SeekFrom, Seek};
use std::io::{Cursor, SeekFrom, Seek, Write};
use crossbeam_channel::Sender;
use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice};
@ -192,7 +192,9 @@ where
.replace(processing.clone());
let mut cursor = Cursor::new(first_content);
let mut file = tempfile::tempfile()?;
std::io::copy(&mut cursor, &mut file)?;
let n = std::io::copy(&mut cursor, &mut file)?;
println!("copied count: {}", n);
file.flush()?;
file.seek(SeekFrom::Start(0))?;
// Process the pending update using the provided user function.
let result = handler.handle_update(processing, file);