completely file backed udpates

This commit is contained in:
mpostma 2021-03-03 10:57:13 +01:00
parent 62532b8f79
commit 9aca6fab88
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
6 changed files with 90 additions and 64 deletions

View File

@ -62,7 +62,7 @@ impl Data {
let path = options.db_path.clone(); let path = options.db_path.clone();
//let indexer_opts = options.indexer_options.clone(); //let indexer_opts = options.indexer_options.clone();
create_dir_all(&path)?; create_dir_all(&path)?;
let index_controller = ActorIndexController::new(); let index_controller = ActorIndexController::new(&path);
let index_controller = Arc::new(index_controller); let index_controller = Arc::new(index_controller);
let mut api_keys = ApiKeys { let mut api_keys = ApiKeys {

View File

@ -3,9 +3,6 @@
use milli::update::{IndexDocumentsMethod, UpdateFormat}; use milli::update::{IndexDocumentsMethod, UpdateFormat};
//use tokio::io::AsyncWriteExt; //use tokio::io::AsyncWriteExt;
use actix_web::web::Payload; use actix_web::web::Payload;
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, AsyncSeekExt};
use futures::prelude::stream::StreamExt;
use crate::index_controller::UpdateStatus; use crate::index_controller::UpdateStatus;
use crate::index_controller::{Settings, IndexMetadata}; use crate::index_controller::{Settings, IndexMetadata};
@ -17,18 +14,11 @@ impl Data {
index: impl AsRef<str> + Send + Sync + 'static, index: impl AsRef<str> + Send + Sync + 'static,
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
format: UpdateFormat, format: UpdateFormat,
mut stream: Payload, stream: Payload,
primary_key: Option<String>, primary_key: Option<String>,
) -> anyhow::Result<UpdateStatus> ) -> anyhow::Result<UpdateStatus>
{ {
let file = tempfile::tempfile_in(".")?; let update_status = self.index_controller.add_documents(index.as_ref().to_string(), method, format, stream, primary_key).await?;
let mut file = File::from_std(file);
while let Some(item) = stream.next().await {
file.write_all(&item?).await?;
}
file.seek(std::io::SeekFrom::Start(0)).await?;
file.flush().await?;
let update_status = self.index_controller.add_documents(index.as_ref().to_string(), method, format, file, primary_key).await?;
Ok(update_status) Ok(update_status)
} }

View File

@ -4,25 +4,29 @@ mod uuid_resolver;
mod update_store; mod update_store;
mod update_handler; mod update_handler;
use tokio::sync::oneshot; use std::path::Path;
use tokio::sync::{mpsc, oneshot};
use super::IndexController; use super::IndexController;
use uuid::Uuid; use uuid::Uuid;
use super::IndexMetadata; use super::IndexMetadata;
use tokio::fs::File; use futures::stream::StreamExt;
use actix_web::web::Payload;
use super::UpdateMeta; use super::UpdateMeta;
use crate::data::{SearchResult, SearchQuery}; use crate::data::{SearchResult, SearchQuery};
use actix_web::web::Bytes;
pub struct ActorIndexController { pub struct ActorIndexController {
uuid_resolver: uuid_resolver::UuidResolverHandle, uuid_resolver: uuid_resolver::UuidResolverHandle,
index_handle: index_actor::IndexActorHandle, index_handle: index_actor::IndexActorHandle,
update_handle: update_actor::UpdateActorHandle, update_handle: update_actor::UpdateActorHandle<Bytes>,
} }
impl ActorIndexController { impl ActorIndexController {
pub fn new() -> Self { pub fn new(path: impl AsRef<Path>) -> Self {
let uuid_resolver = uuid_resolver::UuidResolverHandle::new(); let uuid_resolver = uuid_resolver::UuidResolverHandle::new();
let index_actor = index_actor::IndexActorHandle::new(); let index_actor = index_actor::IndexActorHandle::new();
let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone()); let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone(), &path);
Self { uuid_resolver, index_handle: index_actor, update_handle } Self { uuid_resolver, index_handle: index_actor, update_handle }
} }
} }
@ -43,12 +47,22 @@ impl IndexController for ActorIndexController {
index: String, index: String,
method: milli::update::IndexDocumentsMethod, method: milli::update::IndexDocumentsMethod,
format: milli::update::UpdateFormat, format: milli::update::UpdateFormat,
data: File, mut payload: Payload,
primary_key: Option<String>, primary_key: Option<String>,
) -> anyhow::Result<super::UpdateStatus> { ) -> anyhow::Result<super::UpdateStatus> {
let uuid = self.uuid_resolver.get_or_create(index).await?; let uuid = self.uuid_resolver.get_or_create(index).await?;
let meta = UpdateMeta::DocumentsAddition { method, format, primary_key }; let meta = UpdateMeta::DocumentsAddition { method, format, primary_key };
let status = self.update_handle.update(meta, Some(data), uuid).await?; let (sender, receiver) = mpsc::channel(10);
// It is necessary to spawn a local task to senf the payload to the update handle to
// prevent dead_locking between the update_handle::update that waits for the update to be
// registered and the update_actor that waits for the the payload to be sent to it.
tokio::task::spawn_local(async move {
while let Some(bytes) = payload.next().await {
sender.send(bytes.unwrap()).await;
}
});
let status = self.update_handle.update(meta, receiver, uuid).await?;
Ok(status) Ok(status)
} }

View File

@ -1,22 +1,24 @@
use super::index_actor::IndexActorHandle;
use uuid::Uuid;
use tokio::sync::{mpsc, oneshot};
use crate::index_controller::{UpdateMeta, UpdateStatus, UpdateResult};
use thiserror::Error;
use tokio::io::AsyncReadExt;
use log::info;
use tokio::fs::File;
use std::path::PathBuf;
use std::fs::create_dir_all; use std::fs::create_dir_all;
use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use log::info;
use super::index_actor::IndexActorHandle;
use thiserror::Error;
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use crate::index_controller::{UpdateMeta, UpdateStatus, UpdateResult, updates::Pending};
pub type Result<T> = std::result::Result<T, UpdateError>; pub type Result<T> = std::result::Result<T, UpdateError>;
type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>; type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>;
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum UpdateError {} pub enum UpdateError {}
enum UpdateMsg { enum UpdateMsg<D> {
CreateIndex{ CreateIndex{
uuid: Uuid, uuid: Uuid,
ret: oneshot::Sender<Result<()>>, ret: oneshot::Sender<Result<()>>,
@ -24,20 +26,30 @@ enum UpdateMsg {
Update { Update {
uuid: Uuid, uuid: Uuid,
meta: UpdateMeta, meta: UpdateMeta,
payload: Option<File>, data: mpsc::Receiver<D>,
ret: oneshot::Sender<Result<UpdateStatus>> ret: oneshot::Sender<Result<UpdateStatus>>
} }
} }
struct UpdateActor { struct UpdateActor<D> {
path: PathBuf,
store: Arc<UpdateStore>, store: Arc<UpdateStore>,
inbox: mpsc::Receiver<UpdateMsg>, inbox: mpsc::Receiver<UpdateMsg<D>>,
index_handle: IndexActorHandle, index_handle: IndexActorHandle,
} }
impl UpdateActor { impl<D> UpdateActor<D>
fn new(store: Arc<UpdateStore>, inbox: mpsc::Receiver<UpdateMsg>, index_handle: IndexActorHandle) -> Self { where D: AsRef<[u8]> + Sized + 'static,
Self { store, inbox, index_handle } {
fn new(
store: Arc<UpdateStore>,
inbox: mpsc::Receiver<UpdateMsg<D>>,
index_handle: IndexActorHandle,
path: impl AsRef<Path>,
) -> Self {
let path = path.as_ref().to_owned().join("update_files");
create_dir_all(&path).unwrap();
Self { store, inbox, index_handle, path }
} }
async fn run(mut self) { async fn run(mut self) {
@ -45,29 +57,43 @@ impl UpdateActor {
loop { loop {
match self.inbox.recv().await { match self.inbox.recv().await {
Some(UpdateMsg::Update { uuid, meta, payload, ret }) => self.handle_update(uuid, meta, payload, ret).await, Some(UpdateMsg::Update { uuid, meta, data, ret }) => self.handle_update(uuid, meta, data, ret).await,
Some(_) => {} Some(_) => {}
None => {} None => {}
} }
} }
} }
async fn handle_update(&self, uuid: Uuid, meta: UpdateMeta, payload: Option<File>, ret: oneshot::Sender<Result<UpdateStatus>>) { async fn handle_update(&self, uuid: Uuid, meta: UpdateMeta, mut payload: mpsc::Receiver<D>, ret: oneshot::Sender<Result<UpdateStatus>>) {
let mut buf = Vec::new(); let store = self.store.clone();
let mut payload = payload.unwrap(); let update_file_id = uuid::Uuid::new_v4();
payload.read_to_end(&mut buf).await.unwrap(); let path = self.path.join(format!("update_{}", update_file_id));
let result = self.store.register_update(meta, &buf, uuid).unwrap(); let mut file = File::create(&path).await.unwrap();
while let Some(bytes) = payload.recv().await {
file.write_all(bytes.as_ref()).await;
}
file.flush().await;
let file = file.into_std().await;
let result = tokio::task::spawn_blocking(move || -> anyhow::Result<Pending<UpdateMeta>> {
Ok(store.register_update(meta, path, uuid)?)
}).await.unwrap().unwrap();
let _ = ret.send(Ok(UpdateStatus::Pending(result))); let _ = ret.send(Ok(UpdateStatus::Pending(result)));
} }
} }
#[derive(Clone)] #[derive(Clone)]
pub struct UpdateActorHandle { pub struct UpdateActorHandle<D> {
sender: mpsc::Sender<UpdateMsg>, sender: mpsc::Sender<UpdateMsg<D>>,
} }
impl UpdateActorHandle { impl<D> UpdateActorHandle<D>
pub fn new(index_handle: IndexActorHandle) -> Self { where D: AsRef<[u8]> + Sized + 'static,
{
pub fn new(index_handle: IndexActorHandle, path: impl AsRef<Path>) -> Self {
let (sender, receiver) = mpsc::channel(100); let (sender, receiver) = mpsc::channel(100);
let mut options = heed::EnvOpenOptions::new(); let mut options = heed::EnvOpenOptions::new();
options.map_size(4096 * 100_000); options.map_size(4096 * 100_000);
@ -79,16 +105,16 @@ impl UpdateActorHandle {
let store = UpdateStore::open(options, &path, move |meta, file| { let store = UpdateStore::open(options, &path, move |meta, file| {
futures::executor::block_on(index_handle_clone.update(meta, file)) futures::executor::block_on(index_handle_clone.update(meta, file))
}).unwrap(); }).unwrap();
let actor = UpdateActor::new(store, receiver, index_handle); let actor = UpdateActor::new(store, receiver, index_handle, path);
tokio::task::spawn_local(actor.run()); tokio::task::spawn_local(actor.run());
Self { sender } Self { sender }
} }
pub async fn update(&self, meta: UpdateMeta, payload: Option<File>, uuid: Uuid) -> Result<UpdateStatus> { pub async fn update(&self, meta: UpdateMeta, data: mpsc::Receiver<D>, uuid: Uuid) -> Result<UpdateStatus> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Update { let msg = UpdateMsg::Update {
uuid, uuid,
payload, data,
meta, meta,
ret, ret,
}; };

View File

@ -1,9 +1,9 @@
use std::path::Path; use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::io::{Cursor, SeekFrom, Seek, Write}; use std::fs::remove_file;
use crossbeam_channel::Sender; use crossbeam_channel::Sender;
use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice}; use heed::types::{OwnedType, DecodeIgnore, SerdeJson};
use heed::{EnvOpenOptions, Env, Database}; use heed::{EnvOpenOptions, Env, Database};
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
use std::fs::File; use std::fs::File;
@ -17,7 +17,7 @@ type BEU64 = heed::zerocopy::U64<heed::byteorder::BE>;
pub struct UpdateStore<M, N, E> { pub struct UpdateStore<M, N, E> {
env: Env, env: Env,
pending_meta: Database<OwnedType<BEU64>, SerdeJson<Pending<M>>>, pending_meta: Database<OwnedType<BEU64>, SerdeJson<Pending<M>>>,
pending: Database<OwnedType<BEU64>, ByteSlice>, pending: Database<OwnedType<BEU64>, SerdeJson<PathBuf>>,
processed_meta: Database<OwnedType<BEU64>, SerdeJson<Processed<M, N>>>, processed_meta: Database<OwnedType<BEU64>, SerdeJson<Processed<M, N>>>,
failed_meta: Database<OwnedType<BEU64>, SerdeJson<Failed<M, E>>>, failed_meta: Database<OwnedType<BEU64>, SerdeJson<Failed<M, E>>>,
aborted_meta: Database<OwnedType<BEU64>, SerdeJson<Aborted<M>>>, aborted_meta: Database<OwnedType<BEU64>, SerdeJson<Aborted<M>>>,
@ -140,7 +140,7 @@ where
pub fn register_update( pub fn register_update(
&self, &self,
meta: M, meta: M,
content: &[u8], content: impl AsRef<Path>,
index_uuid: Uuid, index_uuid: Uuid,
) -> heed::Result<Pending<M>> { ) -> heed::Result<Pending<M>> {
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
@ -154,7 +154,7 @@ where
let meta = Pending::new(meta, update_id, index_uuid); let meta = Pending::new(meta, update_id, index_uuid);
self.pending_meta.put(&mut wtxn, &update_key, &meta)?; self.pending_meta.put(&mut wtxn, &update_key, &meta)?;
self.pending.put(&mut wtxn, &update_key, content)?; self.pending.put(&mut wtxn, &update_key, &content.as_ref().to_owned())?;
wtxn.commit()?; wtxn.commit()?;
@ -178,7 +178,7 @@ where
// a reader while processing it, not a writer. // a reader while processing it, not a writer.
match first_meta { match first_meta {
Some((first_id, pending)) => { Some((first_id, pending)) => {
let first_content = self.pending let content_path = self.pending
.get(&rtxn, &first_id)? .get(&rtxn, &first_id)?
.expect("associated update content"); .expect("associated update content");
@ -190,12 +190,7 @@ where
.write() .write()
.unwrap() .unwrap()
.replace(processing.clone()); .replace(processing.clone());
let mut cursor = Cursor::new(first_content); let file = File::open(&content_path)?;
let mut file = tempfile::tempfile()?;
let n = std::io::copy(&mut cursor, &mut file)?;
println!("copied count: {}", n);
file.flush()?;
file.seek(SeekFrom::Start(0))?;
// Process the pending update using the provided user function. // Process the pending update using the provided user function.
let result = handler.handle_update(processing, file); let result = handler.handle_update(processing, file);
drop(rtxn); drop(rtxn);
@ -209,6 +204,7 @@ where
.unwrap() .unwrap()
.take(); .take();
self.pending_meta.delete(&mut wtxn, &first_id)?; self.pending_meta.delete(&mut wtxn, &first_id)?;
remove_file(&content_path)?;
self.pending.delete(&mut wtxn, &first_id)?; self.pending.delete(&mut wtxn, &first_id)?;
match result { match result {
Ok(processed) => self.processed_meta.put(&mut wtxn, &first_id, &processed)?, Ok(processed) => self.processed_meta.put(&mut wtxn, &first_id, &processed)?,

View File

@ -12,7 +12,7 @@ use milli::Index;
use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult};
use serde::{Serialize, Deserialize, de::Deserializer}; use serde::{Serialize, Deserialize, de::Deserializer};
use uuid::Uuid; use uuid::Uuid;
use tokio::fs::File; use actix_web::web::Payload;
use crate::data::SearchResult; use crate::data::SearchResult;
use crate::data::SearchQuery; use crate::data::SearchQuery;
@ -133,7 +133,7 @@ pub trait IndexController {
index: String, index: String,
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
format: UpdateFormat, format: UpdateFormat,
data: File, data: Payload,
primary_key: Option<String>, primary_key: Option<String>,
) -> anyhow::Result<UpdateStatus>; ) -> anyhow::Result<UpdateStatus>;