feat: get update status Enqueued / Processed / Unknown

This commit is contained in:
qdequele 2019-09-19 17:29:22 +02:00
parent 120d209e66
commit 7073b42afa
No known key found for this signature in database
GPG Key ID: B3F0A000EBF11745
5 changed files with 30 additions and 23 deletions

View File

@ -65,13 +65,20 @@ pub struct DetailedDuration {
}
#[derive(Clone, Serialize, Deserialize)]
pub struct UpdateStatus {
pub struct UpdateResult {
pub update_id: u64,
pub update_type: UpdateType,
pub result: Result<(), String>,
pub detailed_duration: DetailedDuration,
}
#[derive(Clone, Serialize, Deserialize)]
pub enum UpdateStatus {
Enqueued,
Processed(UpdateResult),
Unknown,
}
fn spawn_update_system(index: Index, subscription: Receiver<()>) -> thread::JoinHandle<()> {
thread::spawn(move || {
let mut subscription = subscription.into_iter();
@ -115,7 +122,7 @@ fn spawn_update_system(index: Index, subscription: Receiver<()>) -> thread::Join
};
let detailed_duration = DetailedDuration { main: duration };
let status = UpdateStatus {
let status = UpdateResult {
update_id,
update_type,
result: result.map_err(|e| e.to_string()),
@ -180,7 +187,7 @@ pub struct Index {
updates_id: Arc<AtomicU64>,
updates_index: crate::CfTree,
updates_results_index: crate::CfTree,
update_callback: Arc<ArcSwapOption<Box<dyn Fn(UpdateStatus) + Send + Sync + 'static>>>,
update_callback: Arc<ArcSwapOption<Box<dyn Fn(UpdateResult) + Send + Sync + 'static>>>,
}
pub(crate) struct Cache {
@ -266,7 +273,7 @@ impl Index {
}
pub fn set_update_callback<F>(&self, callback: F)
where F: Fn(UpdateStatus) + Send + Sync + 'static
where F: Fn(UpdateResult) + Send + Sync + 'static
{
self.update_callback.store(Some(Arc::new(Box::new(callback))));
}
@ -355,36 +362,35 @@ impl Index {
pub fn update_status(
&self,
update_id: u64,
) -> Result<Option<UpdateStatus>, Error>
) -> Result<UpdateStatus, Error>
{
let update_id = update_id.to_be_bytes();
match self.updates_results_index.get(update_id)? {
Some(value) => {
let value = bincode::deserialize(&value)?;
Ok(Some(value))
Ok(UpdateStatus::Processed(value))
},
None => Ok(None),
None => {
match self.updates_index.get(update_id)? {
Some(_) => Ok(UpdateStatus::Enqueued),
None => Ok(UpdateStatus::Unknown),
}
}
}
}
pub fn update_status_blocking(
&self,
update_id: u64,
) -> Result<UpdateStatus, Error>
) -> Result<UpdateResult, Error>
{
// if we find the update result return it now
if let Some(result) = self.update_status(update_id)? {
return Ok(result)
}
loop {
if self.updates_results_index.get(&update_id.to_be_bytes())?.is_some() { break }
if let Some(value) = self.updates_results_index.get(&update_id.to_be_bytes())? {
let value = bincode::deserialize(&value)?;
return Ok(value)
}
std::thread::sleep(Duration::from_millis(300));
}
// the thread has been unblocked, it means that the update result
// has been inserted in the tree, retrieve it
Ok(self.update_status(update_id)?.unwrap())
}
pub fn documents_ids(&self) -> Result<DocumentsIdsIter, Error> {

View File

@ -15,7 +15,7 @@ pub use self::error::Error;
pub use self::index::{
Index, CustomSettingsIndex, CommonIndex, RankingOrdering,
StopWords, RankingOrder, DistinctField, RankingRules,
UpdateType, DetailedDuration, UpdateStatus
UpdateType, DetailedDuration, UpdateResult, UpdateStatus
};
pub use self::update::DocumentsAddition;

View File

@ -10,7 +10,8 @@ pub use self::cf_tree::{CfTree, CfIter};
pub use self::database::{
Database, Index, CustomSettingsIndex, RankingOrdering,
StopWords, RankingOrder, DistinctField, RankingRules,
UpdateType, DetailedDuration, UpdateStatus, Error
UpdateType, DetailedDuration, UpdateResult, UpdateStatus,
Error,
};
pub use self::number::Number;
pub use self::ranked_map::RankedMap;

View File

@ -1,4 +1,3 @@
use meilidb_data::{Database};
use meilidb_data::Index;
use meilidb_schema::{SchemaBuilder, DISPLAYED, INDEXED};

View File

@ -1,8 +1,9 @@
mod common;
#[macro_use] extern crate maplit;
mod common;
use big_s::S;
use meilidb_data::{RankingOrdering};
use meilidb_data::RankingOrdering;
#[test]
fn stop_words() {