receive update

This commit is contained in:
mpostma 2021-02-26 17:14:11 +01:00
parent 672a4b5400
commit 6bcc302950
No known key found for this signature in database
GPG Key ID: CBC8A7C1D7A28C3A
8 changed files with 181 additions and 459 deletions

View File

@ -1,29 +1,33 @@
use std::ops::Deref;
//use async_compression::tokio_02::write::GzipEncoder; //use async_compression::tokio_02::write::GzipEncoder;
//use futures_util::stream::StreamExt; //use futures_util::stream::StreamExt;
//use milli::update::{IndexDocumentsMethod, UpdateFormat}; use milli::update::{IndexDocumentsMethod, UpdateFormat};
//use tokio::io::AsyncWriteExt; //use tokio::io::AsyncWriteExt;
use actix_web::web::Payload; use actix_web::web::Payload;
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, AsyncSeekExt};
use futures::prelude::stream::StreamExt;
use crate::index_controller::UpdateStatus; use crate::index_controller::UpdateStatus;
use crate::index_controller::{Settings, IndexMetadata}; use crate::index_controller::{Settings, IndexMetadata};
use super::Data; use super::Data;
impl Data { impl Data {
pub async fn add_documents<B, E>( pub async fn add_documents(
&self, &self,
index: impl AsRef<str> + Send + Sync + 'static, index: impl AsRef<str> + Send + Sync + 'static,
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
format: UpdateFormat, format: UpdateFormat,
stream: Payload, mut stream: Payload,
primary_key: Option<String>, primary_key: Option<String>,
) -> anyhow::Result<UpdateStatus> ) -> anyhow::Result<UpdateStatus>
where
B: Deref<Target = [u8]>,
E: std::error::Error + Send + Sync + 'static,
{ {
let update_status = self.index_controller.add_documents(index.as_ref().to_string(), method, format, stream, primary_key).await?; let file = tempfile::tempfile_in(".")?;
let mut file = File::from_std(file);
while let Some(Ok(bytes)) = stream.next().await {
file.write(bytes.as_ref()).await;
}
file.seek(std::io::SeekFrom::Start(0)).await?;
let update_status = self.index_controller.add_documents(index.as_ref().to_string(), method, format, file, primary_key).await?;
Ok(update_status) Ok(update_status)
} }

View File

@ -9,13 +9,15 @@ use milli::Index;
use std::collections::hash_map::Entry; use std::collections::hash_map::Entry;
use std::fs::create_dir_all; use std::fs::create_dir_all;
use heed::EnvOpenOptions; use heed::EnvOpenOptions;
use crate::index_controller::IndexMetadata; use crate::index_controller::{IndexMetadata, UpdateMeta, updates::{Processed, Failed, Processing}, UpdateResult as UResult};
pub type Result<T> = std::result::Result<T, IndexError>; pub type Result<T> = std::result::Result<T, IndexError>;
type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>; type AsyncMap<K, V> = Arc<RwLock<HashMap<K, V>>>;
type UpdateResult = std::result::Result<Processed<UpdateMeta, UResult>, Failed<UpdateMeta, String>>;
enum IndexMsg { enum IndexMsg {
CreateIndex { uuid: Uuid, primary_key: Option<String>, ret: oneshot::Sender<Result<IndexMetadata>> }, CreateIndex { uuid: Uuid, primary_key: Option<String>, ret: oneshot::Sender<Result<IndexMetadata>> },
Update { meta: Processing<UpdateMeta>, data: std::fs::File, ret: oneshot::Sender<UpdateResult>},
} }
struct IndexActor<S> { struct IndexActor<S> {
@ -45,6 +47,7 @@ impl<S: IndexStore> IndexActor<S> {
loop { loop {
match self.inbox.recv().await { match self.inbox.recv().await {
Some(IndexMsg::CreateIndex { uuid, primary_key, ret }) => self.handle_create_index(uuid, primary_key, ret).await, Some(IndexMsg::CreateIndex { uuid, primary_key, ret }) => self.handle_create_index(uuid, primary_key, ret).await,
Some(IndexMsg::Update { ret, meta, data }) => self.handle_update().await,
None => break, None => break,
} }
} }
@ -54,6 +57,10 @@ impl<S: IndexStore> IndexActor<S> {
let result = self.store.create_index(uuid, primary_key).await; let result = self.store.create_index(uuid, primary_key).await;
let _ = ret.send(result); let _ = ret.send(result);
} }
async fn handle_update(&self) {
println!("processing update!!!");
}
} }
#[derive(Clone)] #[derive(Clone)]
@ -77,6 +84,13 @@ impl IndexActorHandle {
let _ = self.sender.send(msg).await; let _ = self.sender.send(msg).await;
receiver.await.expect("IndexActor has been killed") receiver.await.expect("IndexActor has been killed")
} }
pub async fn update(&self, meta: Processing<UpdateMeta>, data: std::fs::File) -> UpdateResult {
let (ret, receiver) = oneshot::channel();
let msg = IndexMsg::Update { ret, meta, data };
let _ = self.sender.send(msg).await;
receiver.await.expect("IndexActor has been killed")
}
} }
struct MapIndexStore { struct MapIndexStore {
@ -103,8 +117,6 @@ impl IndexStore for MapIndexStore {
let db_path = self.root.join(format!("index-{}", meta.uuid)); let db_path = self.root.join(format!("index-{}", meta.uuid));
println!("before blocking");
let index: Result<Index> = tokio::task::spawn_blocking(move || { let index: Result<Index> = tokio::task::spawn_blocking(move || {
create_dir_all(&db_path).expect("can't create db"); create_dir_all(&db_path).expect("can't create db");
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
@ -113,7 +125,6 @@ impl IndexStore for MapIndexStore {
.map_err(|e| IndexError::Error(e))?; .map_err(|e| IndexError::Error(e))?;
Ok(index) Ok(index)
}).await.expect("thread died"); }).await.expect("thread died");
println!("after blocking");
self.index_store.write().await.insert(meta.uuid.clone(), index?); self.index_store.write().await.insert(meta.uuid.clone(), index?);

View File

@ -1,24 +1,28 @@
mod index_actor; mod index_actor;
mod update_actor; mod update_actor;
mod uuid_resolver; mod uuid_resolver;
mod update_store;
use tokio::fs::File;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use super::IndexController; use super::IndexController;
use uuid::Uuid; use uuid::Uuid;
use super::IndexMetadata; use super::IndexMetadata;
use tokio::fs::File;
use super::UpdateMeta;
pub struct ActorIndexController { pub struct ActorIndexController {
uuid_resolver: uuid_resolver::UuidResolverHandle, uuid_resolver: uuid_resolver::UuidResolverHandle,
index_actor: index_actor::IndexActorHandle, index_handle: index_actor::IndexActorHandle,
update_handle: update_actor::UpdateActorHandle,
} }
impl ActorIndexController { impl ActorIndexController {
pub fn new() -> Self { pub fn new() -> Self {
let uuid_resolver = uuid_resolver::UuidResolverHandle::new(); let uuid_resolver = uuid_resolver::UuidResolverHandle::new();
let index_actor = index_actor::IndexActorHandle::new(); let index_actor = index_actor::IndexActorHandle::new();
Self { uuid_resolver, index_actor } let update_handle = update_actor::UpdateActorHandle::new(index_actor.clone());
Self { uuid_resolver, index_handle: index_actor, update_handle }
} }
} }
@ -31,7 +35,7 @@ enum IndexControllerMsg {
Shutdown, Shutdown,
} }
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
impl IndexController for ActorIndexController { impl IndexController for ActorIndexController {
async fn add_documents( async fn add_documents(
&self, &self,
@ -41,7 +45,10 @@ impl IndexController for ActorIndexController {
data: File, data: File,
primary_key: Option<String>, primary_key: Option<String>,
) -> anyhow::Result<super::UpdateStatus> { ) -> anyhow::Result<super::UpdateStatus> {
todo!() let uuid = self.uuid_resolver.get_or_create(index).await?;
let meta = UpdateMeta::DocumentsAddition { method, format, primary_key };
let status = self.update_handle.update(meta, Some(data), uuid).await?;
Ok(status)
} }
fn clear_documents(&self, index: String) -> anyhow::Result<super::UpdateStatus> { fn clear_documents(&self, index: String) -> anyhow::Result<super::UpdateStatus> {
@ -59,7 +66,7 @@ impl IndexController for ActorIndexController {
async fn create_index(&self, index_settings: super::IndexSettings) -> anyhow::Result<super::IndexMetadata> { async fn create_index(&self, index_settings: super::IndexSettings) -> anyhow::Result<super::IndexMetadata> {
let super::IndexSettings { name, primary_key } = index_settings; let super::IndexSettings { name, primary_key } = index_settings;
let uuid = self.uuid_resolver.create(name.unwrap()).await?; let uuid = self.uuid_resolver.create(name.unwrap()).await?;
let index_meta = self.index_actor.create_index(uuid, primary_key).await?; let index_meta = self.index_handle.create_index(uuid, primary_key).await?;
Ok(index_meta) Ok(index_meta)
} }

View File

@ -1,16 +1,99 @@
use super::index_actor::IndexActorHandle; use super::index_actor::IndexActorHandle;
use uuid::Uuid; use uuid::Uuid;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::{mpsc, oneshot};
use crate::index_controller::{UpdateMeta, UpdateStatus, UpdateResult};
use thiserror::Error;
use tokio::io::AsyncReadExt;
use log::info;
use tokio::fs::File;
use std::path::PathBuf;
use std::fs::create_dir_all;
use std::sync::Arc;
pub type Result<T> = std::result::Result<T, UpdateError>;
type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>;
#[derive(Debug, Error)]
pub enum UpdateError {}
enum UpdateMsg { enum UpdateMsg {
CreateIndex{ CreateIndex{
uuid: Uuid, uuid: Uuid,
ret: oneshot::Sender<anyhow::Result<()>>, ret: oneshot::Sender<Result<()>>,
},
Update {
uuid: Uuid,
meta: UpdateMeta,
payload: Option<File>,
ret: oneshot::Sender<Result<UpdateStatus>>
} }
} }
struct UpdateActor<S> { struct UpdateActor {
update_store: S, store: Arc<UpdateStore>,
inbox: mpsc::Receiver<UpdateMsg>, inbox: mpsc::Receiver<UpdateMsg>,
index_actor: IndexActorHandle, index_handle: IndexActorHandle,
}
impl UpdateActor {
fn new(store: Arc<UpdateStore>, inbox: mpsc::Receiver<UpdateMsg>, index_handle: IndexActorHandle) -> Self {
Self { store, inbox, index_handle }
}
async fn run(mut self) {
info!("started update actor.");
loop {
match self.inbox.recv().await {
Some(UpdateMsg::Update { uuid, meta, payload, ret }) => self.handle_update(uuid, meta, payload, ret).await,
Some(_) => {}
None => {}
}
}
}
async fn handle_update(&self, _uuid: Uuid, meta: UpdateMeta, payload: Option<File>, ret: oneshot::Sender<Result<UpdateStatus>>) {
let mut buf = Vec::new();
let mut payload = payload.unwrap();
payload.read_to_end(&mut buf).await.unwrap();
let result = self.store.register_update(meta, &buf).unwrap();
let _ = ret.send(Ok(UpdateStatus::Pending(result)));
}
}
#[derive(Clone)]
pub struct UpdateActorHandle {
sender: mpsc::Sender<UpdateMsg>,
}
impl UpdateActorHandle {
pub fn new(index_handle: IndexActorHandle) -> Self {
let (sender, receiver) = mpsc::channel(100);
let mut options = heed::EnvOpenOptions::new();
options.map_size(4096 * 100_000);
let mut path = PathBuf::new();
path.push("data.ms");
path.push("updates");
create_dir_all(&path).unwrap();
let index_handle_clone = index_handle.clone();
let store = UpdateStore::open(options, &path, move |meta, file| {
futures::executor::block_on(index_handle_clone.update(meta, file))
}).unwrap();
let actor = UpdateActor::new(store, receiver, index_handle);
tokio::task::spawn_local(actor.run());
Self { sender }
}
pub async fn update(&self, meta: UpdateMeta, payload: Option<File>, uuid: Uuid) -> Result<UpdateStatus> {
let (ret, receiver) = oneshot::channel();
let msg = UpdateMsg::Update {
uuid,
payload,
meta,
ret,
};
let _ = self.sender.send(msg).await;
receiver.await.expect("update actor killed.")
}
} }

View File

@ -14,6 +14,10 @@ enum UuidResolveMsg {
name: String, name: String,
ret: oneshot::Sender<Result<Option<Uuid>>>, ret: oneshot::Sender<Result<Option<Uuid>>>,
}, },
GetOrCreate {
name: String,
ret: oneshot::Sender<Result<Uuid>>,
},
Create { Create {
name: String, name: String,
ret: oneshot::Sender<Result<Uuid>>, ret: oneshot::Sender<Result<Uuid>>,
@ -41,15 +45,21 @@ impl<S: UuidStore> UuidResolverActor<S> {
loop { loop {
match self.inbox.recv().await { match self.inbox.recv().await {
Some(Create { name, ret }) => self.handle_create(name, ret).await, Some(Create { name, ret }) => self.handle_create(name, ret).await,
Some(_) => (), Some(GetOrCreate { name, ret }) => self.handle_get_or_create(name, ret).await,
// all senders have ned dropped, need to quit. Some(_) => {}
// all senders have been dropped, need to quit.
None => break, None => break,
} }
} }
} }
async fn handle_create(&self, name: String, ret: oneshot::Sender<Result<Uuid>>) { async fn handle_create(&self, name: String, ret: oneshot::Sender<Result<Uuid>>) {
let result = self.store.create_uuid(name).await; let result = self.store.create_uuid(name, true).await;
let _ = ret.send(result);
}
async fn handle_get_or_create(&self, name: String, ret: oneshot::Sender<Result<Uuid>>) {
let result = self.store.create_uuid(name, false).await;
let _ = ret.send(result); let _ = ret.send(result);
} }
} }
@ -75,6 +85,13 @@ impl UuidResolverHandle {
Ok(receiver.await.expect("Uuid resolver actor has been killed")?) Ok(receiver.await.expect("Uuid resolver actor has been killed")?)
} }
pub async fn get_or_create(&self, name: String) -> Result<Uuid> {
let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::GetOrCreate { name, ret };
let _ = self.sender.send(msg).await;
Ok(receiver.await.expect("Uuid resolver actor has been killed")?)
}
pub async fn create(&self, name: String) -> anyhow::Result<Uuid> { pub async fn create(&self, name: String) -> anyhow::Result<Uuid> {
let (ret, receiver) = oneshot::channel(); let (ret, receiver) = oneshot::channel();
let msg = UuidResolveMsg::Create { name, ret }; let msg = UuidResolveMsg::Create { name, ret };
@ -91,7 +108,9 @@ pub enum UuidError {
#[async_trait::async_trait] #[async_trait::async_trait]
trait UuidStore { trait UuidStore {
async fn create_uuid(&self, name: String) -> Result<Uuid>; // Create a new entry for `name`. Return an error if `err` and the entry already exists, return
// the uuid otherwise.
async fn create_uuid(&self, name: String, err: bool) -> Result<Uuid>;
async fn get_uuid(&self, name: String) -> Result<Option<Uuid>>; async fn get_uuid(&self, name: String) -> Result<Option<Uuid>>;
} }
@ -99,9 +118,15 @@ struct MapUuidStore(Arc<RwLock<HashMap<String, Uuid>>>);
#[async_trait::async_trait] #[async_trait::async_trait]
impl UuidStore for MapUuidStore { impl UuidStore for MapUuidStore {
async fn create_uuid(&self, name: String) -> Result<Uuid> { async fn create_uuid(&self, name: String, err: bool) -> Result<Uuid> {
match self.0.write().await.entry(name) { match self.0.write().await.entry(name) {
Entry::Occupied(_) => Err(UuidError::NameAlreadyExist), Entry::Occupied(entry) => {
if err {
Err(UuidError::NameAlreadyExist)
} else {
Ok(entry.get().clone())
}
},
Entry::Vacant(entry) => { Entry::Vacant(entry) => {
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
let uuid = entry.insert(uuid); let uuid = entry.insert(uuid);

View File

@ -1,407 +0,0 @@
use std::path::Path;
use std::sync::{Arc, RwLock};
use crossbeam_channel::Sender;
use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice};
use heed::{EnvOpenOptions, Env, Database};
use serde::{Serialize, Deserialize};
use crate::index_controller::updates::*;
type BEU64 = heed::zerocopy::U64<heed::byteorder::BE>;
#[derive(Clone)]
pub struct UpdateStore<M, N, E> {
env: Env,
pending_meta: Database<OwnedType<BEU64>, SerdeJson<Pending<M>>>,
pending: Database<OwnedType<BEU64>, ByteSlice>,
processed_meta: Database<OwnedType<BEU64>, SerdeJson<Processed<M, N>>>,
failed_meta: Database<OwnedType<BEU64>, SerdeJson<Failed<M, E>>>,
aborted_meta: Database<OwnedType<BEU64>, SerdeJson<Aborted<M>>>,
processing: Arc<RwLock<Option<Processing<M>>>>,
notification_sender: Sender<()>,
}
pub trait HandleUpdate<M, N, E> {
fn handle_update(&mut self, meta: Processing<M>, content: &[u8]) -> Result<Processed<M, N>, Failed<M, E>>;
}
impl<M, N, E> UpdateStore<M, N, E>
where
M: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync + Clone,
N: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync,
E: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync,
{
pub fn open<P, U>(
mut options: EnvOpenOptions,
path: P,
mut update_handler: U,
) -> heed::Result<Arc<Self>>
where
P: AsRef<Path>,
U: HandleUpdate<M, N, E> + Send + 'static,
{
options.max_dbs(5);
let env = options.open(path)?;
let pending_meta = env.create_database(Some("pending-meta"))?;
let pending = env.create_database(Some("pending"))?;
let processed_meta = env.create_database(Some("processed-meta"))?;
let aborted_meta = env.create_database(Some("aborted-meta"))?;
let failed_meta = env.create_database(Some("failed-meta"))?;
let processing = Arc::new(RwLock::new(None));
let (notification_sender, notification_receiver) = crossbeam_channel::bounded(1);
// Send a first notification to trigger the process.
let _ = notification_sender.send(());
let update_store = Arc::new(UpdateStore {
env,
pending,
pending_meta,
processed_meta,
aborted_meta,
notification_sender,
failed_meta,
processing,
});
// We need a weak reference so we can take ownership on the arc later when we
// want to close the index.
let update_store_weak = Arc::downgrade(&update_store);
std::thread::spawn(move || {
// Block and wait for something to process.
'outer: for _ in notification_receiver {
loop {
match update_store_weak.upgrade() {
Some(update_store) => {
match update_store.process_pending_update(&mut update_handler) {
Ok(Some(_)) => (),
Ok(None) => break,
Err(e) => eprintln!("error while processing update: {}", e),
}
}
// the ownership on the arc has been taken, we need to exit.
None => break 'outer,
}
}
}
});
Ok(update_store)
}
pub fn prepare_for_closing(self) -> heed::EnvClosingEvent {
self.env.prepare_for_closing()
}
/// Returns the new biggest id to use to store the new update.
fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result<u64> {
let last_pending = self.pending_meta
.remap_data_type::<DecodeIgnore>()
.last(txn)?
.map(|(k, _)| k.get());
let last_processed = self.processed_meta
.remap_data_type::<DecodeIgnore>()
.last(txn)?
.map(|(k, _)| k.get());
let last_aborted = self.aborted_meta
.remap_data_type::<DecodeIgnore>()
.last(txn)?
.map(|(k, _)| k.get());
let last_update_id = [last_pending, last_processed, last_aborted]
.iter()
.copied()
.flatten()
.max();
match last_update_id {
Some(last_id) => Ok(last_id + 1),
None => Ok(0),
}
}
/// Registers the update content in the pending store and the meta
/// into the pending-meta store. Returns the new unique update id.
pub fn register_update(
&self,
meta: M,
content: &[u8]
) -> heed::Result<Pending<M>> {
let mut wtxn = self.env.write_txn()?;
// We ask the update store to give us a new update id, this is safe,
// no other update can have the same id because we use a write txn before
// asking for the id and registering it so other update registering
// will be forced to wait for a new write txn.
let update_id = self.new_update_id(&wtxn)?;
let update_key = BEU64::new(update_id);
let meta = Pending::new(meta, update_id);
self.pending_meta.put(&mut wtxn, &update_key, &meta)?;
self.pending.put(&mut wtxn, &update_key, content)?;
wtxn.commit()?;
if let Err(e) = self.notification_sender.try_send(()) {
assert!(!e.is_disconnected(), "update notification channel is disconnected");
}
Ok(meta)
}
/// Executes the user provided function on the next pending update (the one with the lowest id).
/// This is asynchronous as it let the user process the update with a read-only txn and
/// only writing the result meta to the processed-meta store *after* it has been processed.
fn process_pending_update<U>(&self, handler: &mut U) -> heed::Result<Option<()>>
where
U: HandleUpdate<M, N, E> + Send + 'static,
{
// Create a read transaction to be able to retrieve the pending update in order.
let rtxn = self.env.read_txn()?;
let first_meta = self.pending_meta.first(&rtxn)?;
// If there is a pending update we process and only keep
// a reader while processing it, not a writer.
match first_meta {
Some((first_id, pending)) => {
let first_content = self.pending
.get(&rtxn, &first_id)?
.expect("associated update content");
// we change the state of the update from pending to processing before we pass it
// to the update handler. Processing store is non persistent to be able recover
// from a failure
let processing = pending.processing();
self.processing
.write()
.unwrap()
.replace(processing.clone());
// Process the pending update using the provided user function.
let result = handler.handle_update(processing, first_content);
drop(rtxn);
// Once the pending update have been successfully processed
// we must remove the content from the pending and processing stores and
// write the *new* meta to the processed-meta store and commit.
let mut wtxn = self.env.write_txn()?;
self.processing
.write()
.unwrap()
.take();
self.pending_meta.delete(&mut wtxn, &first_id)?;
self.pending.delete(&mut wtxn, &first_id)?;
match result {
Ok(processed) => self.processed_meta.put(&mut wtxn, &first_id, &processed)?,
Err(failed) => self.failed_meta.put(&mut wtxn, &first_id, &failed)?,
}
wtxn.commit()?;
Ok(Some(()))
},
None => Ok(None)
}
}
/// Execute the user defined function with the meta-store iterators, the first
/// iterator is the *processed* meta one, the second the *aborted* meta one
/// and, the last is the *pending* meta one.
pub fn iter_metas<F, T>(&self, mut f: F) -> heed::Result<T>
where
F: for<'a> FnMut(
Option<Processing<M>>,
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Processed<M, N>>>,
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Aborted<M>>>,
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Pending<M>>>,
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Failed<M, E>>>,
) -> heed::Result<T>,
{
let rtxn = self.env.read_txn()?;
// We get the pending, processed and aborted meta iterators.
let processed_iter = self.processed_meta.iter(&rtxn)?;
let aborted_iter = self.aborted_meta.iter(&rtxn)?;
let pending_iter = self.pending_meta.iter(&rtxn)?;
let processing = self.processing.read().unwrap().clone();
let failed_iter = self.failed_meta.iter(&rtxn)?;
// We execute the user defined function with both iterators.
(f)(processing, processed_iter, aborted_iter, pending_iter, failed_iter)
}
/// Returns the update associated meta or `None` if the update doesn't exist.
pub fn meta(&self, update_id: u64) -> heed::Result<Option<UpdateStatus<M, N, E>>> {
let rtxn = self.env.read_txn()?;
let key = BEU64::new(update_id);
if let Some(ref meta) = *self.processing.read().unwrap() {
if meta.id() == update_id {
return Ok(Some(UpdateStatus::Processing(meta.clone())));
}
}
if let Some(meta) = self.pending_meta.get(&rtxn, &key)? {
return Ok(Some(UpdateStatus::Pending(meta)));
}
if let Some(meta) = self.processed_meta.get(&rtxn, &key)? {
return Ok(Some(UpdateStatus::Processed(meta)));
}
if let Some(meta) = self.aborted_meta.get(&rtxn, &key)? {
return Ok(Some(UpdateStatus::Aborted(meta)));
}
if let Some(meta) = self.failed_meta.get(&rtxn, &key)? {
return Ok(Some(UpdateStatus::Failed(meta)));
}
Ok(None)
}
/// Aborts an update, an aborted update content is deleted and
/// the meta of it is moved into the aborted updates database.
///
/// Trying to abort an update that is currently being processed, an update
/// that as already been processed or which doesn't actually exist, will
/// return `None`.
#[allow(dead_code)]
pub fn abort_update(&self, update_id: u64) -> heed::Result<Option<Aborted<M>>> {
let mut wtxn = self.env.write_txn()?;
let key = BEU64::new(update_id);
// We cannot abort an update that is currently being processed.
if self.pending_meta.first(&wtxn)?.map(|(key, _)| key.get()) == Some(update_id) {
return Ok(None);
}
let pending = match self.pending_meta.get(&wtxn, &key)? {
Some(meta) => meta,
None => return Ok(None),
};
let aborted = pending.abort();
self.aborted_meta.put(&mut wtxn, &key, &aborted)?;
self.pending_meta.delete(&mut wtxn, &key)?;
self.pending.delete(&mut wtxn, &key)?;
wtxn.commit()?;
Ok(Some(aborted))
}
/// Aborts all the pending updates, and not the one being currently processed.
/// Returns the update metas and ids that were successfully aborted.
#[allow(dead_code)]
pub fn abort_pendings(&self) -> heed::Result<Vec<(u64, Aborted<M>)>> {
let mut wtxn = self.env.write_txn()?;
let mut aborted_updates = Vec::new();
// We skip the first pending update as it is currently being processed.
for result in self.pending_meta.iter(&wtxn)?.skip(1) {
let (key, pending) = result?;
let id = key.get();
aborted_updates.push((id, pending.abort()));
}
for (id, aborted) in &aborted_updates {
let key = BEU64::new(*id);
self.aborted_meta.put(&mut wtxn, &key, &aborted)?;
self.pending_meta.delete(&mut wtxn, &key)?;
self.pending.delete(&mut wtxn, &key)?;
}
wtxn.commit()?;
Ok(aborted_updates)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::{Duration, Instant};
impl<M, N, F, E> HandleUpdate<M, N, E> for F
where F: FnMut(Processing<M>, &[u8]) -> Result<Processed<M, N>, Failed<M, E>> + Send + 'static {
fn handle_update(&mut self, meta: Processing<M>, content: &[u8]) -> Result<Processed<M, N>, Failed<M, E>> {
self(meta, content)
}
}
#[test]
fn simple() {
let dir = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100);
let update_store = UpdateStore::open(options, dir, |meta: Processing<String>, _content: &_| -> Result<_, Failed<_, ()>> {
let new_meta = meta.meta().to_string() + " processed";
let processed = meta.process(new_meta);
Ok(processed)
}).unwrap();
let meta = String::from("kiki");
let update = update_store.register_update(meta, &[]).unwrap();
thread::sleep(Duration::from_millis(100));
let meta = update_store.meta(update.id()).unwrap().unwrap();
if let UpdateStatus::Processed(Processed { success, .. }) = meta {
assert_eq!(success, "kiki processed");
} else {
panic!()
}
}
#[test]
#[ignore]
fn long_running_update() {
let dir = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100);
let update_store = UpdateStore::open(options, dir, |meta: Processing<String>, _content:&_| -> Result<_, Failed<_, ()>> {
thread::sleep(Duration::from_millis(400));
let new_meta = meta.meta().to_string() + "processed";
let processed = meta.process(new_meta);
Ok(processed)
}).unwrap();
let before_register = Instant::now();
let meta = String::from("kiki");
let update_kiki = update_store.register_update(meta, &[]).unwrap();
assert!(before_register.elapsed() < Duration::from_millis(200));
let meta = String::from("coco");
let update_coco = update_store.register_update(meta, &[]).unwrap();
assert!(before_register.elapsed() < Duration::from_millis(200));
let meta = String::from("cucu");
let update_cucu = update_store.register_update(meta, &[]).unwrap();
assert!(before_register.elapsed() < Duration::from_millis(200));
thread::sleep(Duration::from_millis(400 * 3 + 100));
let meta = update_store.meta(update_kiki.id()).unwrap().unwrap();
if let UpdateStatus::Processed(Processed { success, .. }) = meta {
assert_eq!(success, "kiki processed");
} else {
panic!()
}
let meta = update_store.meta(update_coco.id()).unwrap().unwrap();
if let UpdateStatus::Processed(Processed { success, .. }) = meta {
assert_eq!(success, "coco processed");
} else {
panic!()
}
let meta = update_store.meta(update_cucu.id()).unwrap().unwrap();
if let UpdateStatus::Processed(Processed { success, .. }) = meta {
assert_eq!(success, "cucu processed");
} else {
panic!()
}
}
}

View File

@ -12,7 +12,7 @@ use milli::Index;
use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult}; use milli::update::{IndexDocumentsMethod, UpdateFormat, DocumentAdditionResult};
use serde::{Serialize, Deserialize, de::Deserializer}; use serde::{Serialize, Deserialize, de::Deserializer};
use uuid::Uuid; use uuid::Uuid;
use actix_web::web::Payload; use tokio::fs::File;
pub use updates::{Processed, Processing, Failed}; pub use updates::{Processed, Processing, Failed};
@ -113,7 +113,7 @@ pub struct IndexSettings {
/// be provided. This allows the implementer to define the behaviour of write accesses to the /// be provided. This allows the implementer to define the behaviour of write accesses to the
/// indices, and abstract the scheduling of the updates. The implementer must be able to provide an /// indices, and abstract the scheduling of the updates. The implementer must be able to provide an
/// instance of `IndexStore` /// instance of `IndexStore`
#[async_trait::async_trait] #[async_trait::async_trait(?Send)]
pub trait IndexController { pub trait IndexController {
/* /*
@ -131,7 +131,7 @@ pub trait IndexController {
index: String, index: String,
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
format: UpdateFormat, format: UpdateFormat,
data: Payload, data: File,
primary_key: Option<String>, primary_key: Option<String>,
) -> anyhow::Result<UpdateStatus>; ) -> anyhow::Result<UpdateStatus>;

View File

@ -3,7 +3,7 @@ use actix_web::{delete, get, post, put};
use actix_web::{web, HttpResponse}; use actix_web::{web, HttpResponse};
use indexmap::IndexMap; use indexmap::IndexMap;
use log::error; use log::error;
//use milli::update::{IndexDocumentsMethod, UpdateFormat}; use milli::update::{IndexDocumentsMethod, UpdateFormat};
use serde::Deserialize; use serde::Deserialize;
use serde_json::Value; use serde_json::Value;
@ -142,26 +142,25 @@ async fn add_documents_json(
params: web::Query<UpdateDocumentsQuery>, params: web::Query<UpdateDocumentsQuery>,
body: Payload, body: Payload,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
todo!() let addition_result = data
//let addition_result = data .add_documents(
//.add_documents( path.into_inner().index_uid,
//path.into_inner().index_uid, IndexDocumentsMethod::ReplaceDocuments,
//IndexDocumentsMethod::ReplaceDocuments, UpdateFormat::Json,
//UpdateFormat::Json, body,
//body, params.primary_key.clone(),
//params.primary_key.clone(), ).await;
//).await;
//match addition_result { match addition_result {
//Ok(update) => { Ok(update) => {
//let value = serde_json::to_string(&update).unwrap(); let value = serde_json::to_string(&update).unwrap();
//let response = HttpResponse::Ok().body(value); let response = HttpResponse::Ok().body(value);
//Ok(response) Ok(response)
//} }
//Err(e) => { Err(e) => {
//Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() }))) Ok(HttpResponse::BadRequest().body(serde_json::json!({ "error": e.to_string() })))
//} }
//} }
} }