comment tests out

This commit is contained in:
mpostma 2021-01-28 20:55:29 +01:00
parent e9c95f6623
commit da056a6877
No known key found for this signature in database
GPG key ID: CBC8A7C1D7A28C3A
27 changed files with 132 additions and 8585 deletions

View file

@ -11,9 +11,11 @@ use uuid::Uuid;
use serde::{Serialize, Deserialize};
use log::warn;
use super::update_store::UpdateStore;
use super::update_handler::UpdateHandler;
use crate::option::IndexerOpts;
use super::update_handler::UpdateHandler;
use super::{UpdateMeta, UpdateResult};
type UpdateStore = super::update_store::UpdateStore<UpdateMeta, UpdateResult, String>;
#[derive(Serialize, Deserialize, Debug)]
struct IndexMeta {

View file

@ -182,12 +182,13 @@ impl UpdateHandler {
impl HandleUpdate<UpdateMeta, UpdateResult, String> for UpdateHandler {
fn handle_update(
&mut self,
update_id: u64,
meta: Processing<UpdateMeta>,
content: &[u8]
) -> Result<Processed<UpdateMeta, UpdateResult>, Failed<UpdateMeta, String>> {
use UpdateMeta::*;
let update_id = meta.id();
let update_builder = self.update_buidler(update_id);
let result = match meta.meta() {
@ -203,4 +204,3 @@ impl HandleUpdate<UpdateMeta, UpdateResult, String> for UpdateHandler {
}
}
}

View file

@ -4,37 +4,42 @@ use std::sync::{Arc, RwLock};
use crossbeam_channel::Sender;
use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice};
use heed::{EnvOpenOptions, Env, Database};
use serde::{Serialize, Deserialize};
use crate::index_controller::updates::*;
use crate::index_controller::{UpdateMeta, UpdateResult};
type BEU64 = heed::zerocopy::U64<heed::byteorder::BE>;
#[derive(Clone)]
pub struct UpdateStore {
pub struct UpdateStore<M, N, E> {
env: Env,
pending_meta: Database<OwnedType<BEU64>, SerdeJson<Pending<UpdateMeta>>>,
pending_meta: Database<OwnedType<BEU64>, SerdeJson<Pending<M>>>,
pending: Database<OwnedType<BEU64>, ByteSlice>,
processed_meta: Database<OwnedType<BEU64>, SerdeJson<Processed<UpdateMeta, UpdateResult>>>,
failed_meta: Database<OwnedType<BEU64>, SerdeJson<Failed<UpdateMeta, String>>>,
aborted_meta: Database<OwnedType<BEU64>, SerdeJson<Aborted<UpdateMeta>>>,
processing: Arc<RwLock<Option<Processing<UpdateMeta>>>>,
processed_meta: Database<OwnedType<BEU64>, SerdeJson<Processed<M, N>>>,
failed_meta: Database<OwnedType<BEU64>, SerdeJson<Failed<M, E>>>,
aborted_meta: Database<OwnedType<BEU64>, SerdeJson<Aborted<M>>>,
processing: Arc<RwLock<Option<Processing<M>>>>,
notification_sender: Sender<()>,
}
pub trait HandleUpdate<M, N, E> {
fn handle_update(&mut self, update_id: u64, meta: Processing<M>, content: &[u8]) -> Result<Processed<M, N>, Failed<M, E>>;
fn handle_update(&mut self, meta: Processing<M>, content: &[u8]) -> Result<Processed<M, N>, Failed<M, E>>;
}
impl UpdateStore {
impl<M, N, E> UpdateStore<M, N, E>
where
M: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync + Clone,
N: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync,
E: for<'a> Deserialize<'a> + Serialize + 'static + Send + Sync,
{
pub fn open<P, U>(
mut options: EnvOpenOptions,
path: P,
mut update_handler: U,
) -> heed::Result<Arc<UpdateStore>>
) -> heed::Result<Arc<Self>>
where
P: AsRef<Path>,
U: HandleUpdate<UpdateMeta, UpdateResult, String> + Send + 'static,
U: HandleUpdate<M, N, E> + Send + 'static,
{
options.max_dbs(5);
@ -111,9 +116,9 @@ impl UpdateStore {
/// into the pending-meta store. Returns the new unique update id.
pub fn register_update(
&self,
meta: UpdateMeta,
meta: M,
content: &[u8]
) -> heed::Result<Pending<UpdateMeta>> {
) -> heed::Result<Pending<M>> {
let mut wtxn = self.env.write_txn()?;
// We ask the update store to give us a new update id, this is safe,
@ -139,7 +144,7 @@ impl UpdateStore {
/// only writing the result meta to the processed-meta store *after* it has been processed.
fn process_pending_update<U>(&self, handler: &mut U) -> heed::Result<Option<()>>
where
U: HandleUpdate<UpdateMeta, UpdateResult, String> + Send + 'static,
U: HandleUpdate<M, N, E> + Send + 'static,
{
// Create a read transaction to be able to retrieve the pending update in order.
let rtxn = self.env.read_txn()?;
@ -153,7 +158,7 @@ impl UpdateStore {
.get(&rtxn, &first_id)?
.expect("associated update content");
// we cahnge the state of the update from pending to processing before we pass it
// we change the state of the update from pending to processing before we pass it
// to the update handler. Processing store is non persistent to be able recover
// from a failure
let processing = pending.processing();
@ -162,7 +167,7 @@ impl UpdateStore {
.unwrap()
.replace(processing.clone());
// Process the pending update using the provided user function.
let result = handler.handle_update(first_id.get(), processing, first_content);
let result = handler.handle_update(processing, first_content);
drop(rtxn);
// Once the pending update have been successfully processed
@ -189,10 +194,10 @@ impl UpdateStore {
/// The id and metadata of the update that is currently being processed,
/// `None` if no update is being processed.
pub fn processing_update(&self) -> heed::Result<Option<(u64, Pending<UpdateMeta>)>> {
pub fn processing_update(&self) -> heed::Result<Option<Pending<M>>> {
let rtxn = self.env.read_txn()?;
match self.pending_meta.first(&rtxn)? {
Some((key, meta)) => Ok(Some((key.get(), meta))),
Some((_, meta)) => Ok(Some(meta)),
None => Ok(None),
}
}
@ -203,11 +208,11 @@ impl UpdateStore {
pub fn iter_metas<F, T>(&self, mut f: F) -> heed::Result<T>
where
F: for<'a> FnMut(
Option<Processing<UpdateMeta>>,
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Processed<UpdateMeta, UpdateResult>>>,
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Aborted<UpdateMeta>>>,
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Pending<UpdateMeta>>>,
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Failed<UpdateMeta, String>>>,
Option<Processing<M>>,
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Processed<M, N>>>,
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Aborted<M>>>,
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Pending<M>>>,
heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<Failed<M, E>>>,
) -> heed::Result<T>,
{
let rtxn = self.env.read_txn()?;
@ -224,7 +229,7 @@ impl UpdateStore {
}
/// Returns the update associated meta or `None` if the update doesn't exist.
pub fn meta(&self, update_id: u64) -> heed::Result<Option<UpdateStatus<UpdateMeta, UpdateResult, String>>> {
pub fn meta(&self, update_id: u64) -> heed::Result<Option<UpdateStatus<M, N, E>>> {
let rtxn = self.env.read_txn()?;
let key = BEU64::new(update_id);
@ -259,7 +264,7 @@ impl UpdateStore {
/// Trying to abort an update that is currently being processed, an update
/// that as already been processed or which doesn't actually exist, will
/// return `None`.
pub fn abort_update(&self, update_id: u64) -> heed::Result<Option<Aborted<UpdateMeta>>> {
pub fn abort_update(&self, update_id: u64) -> heed::Result<Option<Aborted<M>>> {
let mut wtxn = self.env.write_txn()?;
let key = BEU64::new(update_id);
@ -286,7 +291,7 @@ impl UpdateStore {
/// Aborts all the pending updates, and not the one being currently processed.
/// Returns the update metas and ids that were successfully aborted.
pub fn abort_pendings(&self) -> heed::Result<Vec<(u64, Aborted<UpdateMeta>)>> {
pub fn abort_pendings(&self) -> heed::Result<Vec<(u64, Aborted<M>)>> {
let mut wtxn = self.env.write_txn()?;
let mut aborted_updates = Vec::new();
@ -309,3 +314,90 @@ impl UpdateStore {
Ok(aborted_updates)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::{Duration, Instant};
impl<M, N, F, E> HandleUpdate<M, N, E> for F
where F: FnMut(Processing<M>, &[u8]) -> Result<Processed<M, N>, Failed<M, E>> + Send + 'static {
fn handle_update(&mut self, meta: Processing<M>, content: &[u8]) -> Result<Processed<M, N>, Failed<M, E>> {
self(meta, content)
}
}
#[test]
fn simple() {
let dir = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100);
let update_store = UpdateStore::open(options, dir, |meta: Processing<String>, _content: &_| -> Result<_, Failed<_, ()>> {
let new_meta = meta.meta().to_string() + " processed";
let processed = meta.process(new_meta);
Ok(processed)
}).unwrap();
let meta = String::from("kiki");
let update = update_store.register_update(meta, &[]).unwrap();
thread::sleep(Duration::from_millis(100));
let meta = update_store.meta(update.id()).unwrap().unwrap();
if let UpdateStatus::Processed(Processed { success, .. }) = meta {
assert_eq!(success, "kiki processed");
} else {
panic!()
}
}
#[test]
#[ignore]
fn long_running_update() {
let dir = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new();
options.map_size(4096 * 100);
let update_store = UpdateStore::open(options, dir, |meta: Processing<String>, _content:&_| -> Result<_, Failed<_, ()>> {
thread::sleep(Duration::from_millis(400));
let new_meta = meta.meta().to_string() + "processed";
let processed = meta.process(new_meta);
Ok(processed)
}).unwrap();
let before_register = Instant::now();
let meta = String::from("kiki");
let update_kiki = update_store.register_update(meta, &[]).unwrap();
assert!(before_register.elapsed() < Duration::from_millis(200));
let meta = String::from("coco");
let update_coco = update_store.register_update(meta, &[]).unwrap();
assert!(before_register.elapsed() < Duration::from_millis(200));
let meta = String::from("cucu");
let update_cucu = update_store.register_update(meta, &[]).unwrap();
assert!(before_register.elapsed() < Duration::from_millis(200));
thread::sleep(Duration::from_millis(400 * 3 + 100));
let meta = update_store.meta(update_kiki.id()).unwrap().unwrap();
if let UpdateStatus::Processed(Processed { success, .. }) = meta {
assert_eq!(success, "kiki processed");
} else {
panic!()
}
let meta = update_store.meta(update_coco.id()).unwrap().unwrap();
if let UpdateStatus::Processed(Processed { success, .. }) = meta {
assert_eq!(success, "coco processed");
} else {
panic!()
}
let meta = update_store.meta(update_cucu.id()).unwrap().unwrap();
if let UpdateStatus::Processed(Processed { success, .. }) = meta {
assert_eq!(success, "cucu processed");
} else {
panic!()
}
}
}

View file

@ -3,9 +3,9 @@ use serde::{Serialize, Deserialize};
#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)]
pub struct Pending<M> {
update_id: u64,
meta: M,
enqueued_at: DateTime<Utc>,
pub update_id: u64,
pub meta: M,
pub enqueued_at: DateTime<Utc>,
}
impl<M> Pending<M> {
@ -42,10 +42,10 @@ impl<M> Pending<M> {
#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)]
pub struct Processed<M, N> {
success: N,
processed_at: DateTime<Utc>,
pub success: N,
pub processed_at: DateTime<Utc>,
#[serde(flatten)]
from: Processing<M>,
pub from: Processing<M>,
}
impl<M, N> Processed<M, N> {
@ -57,8 +57,8 @@ impl<M, N> Processed<M, N> {
#[derive(Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Clone)]
pub struct Processing<M> {
#[serde(flatten)]
from: Pending<M>,
started_processing_at: DateTime<Utc>,
pub from: Pending<M>,
pub started_processing_at: DateTime<Utc>,
}
impl<M> Processing<M> {