Display enqueued along with processed updates

This commit is contained in:
Clément Renault 2019-10-31 11:13:37 +01:00
parent dbe5363672
commit cc461b1331
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
7 changed files with 85 additions and 27 deletions

View File

@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize};
use structopt::StructOpt; use structopt::StructOpt;
use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor}; use termcolor::{Color, ColorChoice, ColorSpec, StandardStream, WriteColor};
use meilidb_core::{Database, Highlight, UpdateResult}; use meilidb_core::{Database, Highlight, ProcessedUpdateResult};
use meilidb_schema::SchemaAttr; use meilidb_schema::SchemaAttr;
const INDEX_NAME: &str = "default"; const INDEX_NAME: &str = "default";
@ -97,7 +97,7 @@ fn index_command(command: IndexCommand, database: Database) -> Result<(), Box<dy
let start = Instant::now(); let start = Instant::now();
let (sender, receiver) = mpsc::sync_channel(100); let (sender, receiver) = mpsc::sync_channel(100);
let update_fn = move |update: UpdateResult| sender.send(update.update_id).unwrap(); let update_fn = move |update: ProcessedUpdateResult| sender.send(update.update_id).unwrap();
let index = match database.open_index(INDEX_NAME) { let index = match database.open_index(INDEX_NAME) {
Some(index) => index, Some(index) => index,
None => database.create_index(INDEX_NAME).unwrap(), None => database.create_index(INDEX_NAME).unwrap(),

View File

@ -11,7 +11,7 @@ use log::{debug, error};
use crate::{store, update, Index, MResult}; use crate::{store, update, Index, MResult};
pub type BoxUpdateFn = Box<dyn Fn(update::UpdateResult) + Send + Sync + 'static>; pub type BoxUpdateFn = Box<dyn Fn(update::ProcessedUpdateResult) + Send + Sync + 'static>;
type ArcSwapFn = arc_swap::ArcSwapOption<BoxUpdateFn>; type ArcSwapFn = arc_swap::ArcSwapOption<BoxUpdateFn>;
pub struct Database { pub struct Database {

View File

@ -24,7 +24,7 @@ pub use self::number::{Number, ParseNumberError};
pub use self::ranked_map::RankedMap; pub use self::ranked_map::RankedMap;
pub use self::raw_document::RawDocument; pub use self::raw_document::RawDocument;
pub use self::store::Index; pub use self::store::Index;
pub use self::update::{UpdateResult, UpdateStatus, UpdateType}; pub use self::update::{EnqueuedUpdateResult, ProcessedUpdateResult, UpdateStatus, UpdateType};
use ::serde::{Deserialize, Serialize}; use ::serde::{Deserialize, Serialize};
use zerocopy::{AsBytes, FromBytes}; use zerocopy::{AsBytes, FromBytes};

View File

@ -219,17 +219,29 @@ impl Index {
} }
pub fn all_updates_status(&self, reader: &heed::RoTxn) -> MResult<Vec<update::UpdateStatus>> { pub fn all_updates_status(&self, reader: &heed::RoTxn) -> MResult<Vec<update::UpdateStatus>> {
match self.updates_results.last_update_id(reader)? { let mut updates = Vec::new();
Some((last_id, _)) => { let mut last_update_result_id = 0;
let mut updates = Vec::with_capacity(last_id as usize + 1);
for id in 0..=last_id { // retrieve all updates results
let update = self.update_status(reader, id)?; if let Some((last_id, _)) = self.updates_results.last_update_id(reader)? {
updates.push(update); updates.reserve(last_id as usize);
}
Ok(updates) for id in 0..=last_id {
let update = self.update_status(reader, id)?;
updates.push(update);
last_update_result_id = id;
} }
None => Ok(Vec::new()),
} }
// retrieve all enqueued updates
if let Some((last_id, _)) = self.updates.last_update_id(reader)? {
for id in last_update_result_id + 1..last_id {
let update = self.update_status(reader, id)?;
updates.push(update);
}
}
Ok(updates)
} }
pub fn query_builder(&self) -> QueryBuilder { pub fn query_builder(&self) -> QueryBuilder {

View File

@ -26,9 +26,9 @@ impl Updates {
} }
// TODO do not trigger deserialize if possible // TODO do not trigger deserialize if possible
pub fn contains(self, reader: &heed::RoTxn, update_id: u64) -> ZResult<bool> { pub fn get(self, reader: &heed::RoTxn, update_id: u64) -> ZResult<Option<Update>> {
let update_id = BEU64::new(update_id); let update_id = BEU64::new(update_id);
self.updates.get(reader, &update_id).map(|v| v.is_some()) self.updates.get(reader, &update_id)
} }
pub fn put_update( pub fn put_update(

View File

@ -1,15 +1,19 @@
use super::BEU64; use super::BEU64;
use crate::update::UpdateResult; use crate::update::ProcessedUpdateResult;
use heed::types::{OwnedType, SerdeBincode}; use heed::types::{OwnedType, SerdeBincode};
use heed::Result as ZResult; use heed::Result as ZResult;
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
pub struct UpdatesResults { pub struct UpdatesResults {
pub(crate) updates_results: heed::Database<OwnedType<BEU64>, SerdeBincode<UpdateResult>>, pub(crate) updates_results:
heed::Database<OwnedType<BEU64>, SerdeBincode<ProcessedUpdateResult>>,
} }
impl UpdatesResults { impl UpdatesResults {
pub fn last_update_id(self, reader: &heed::RoTxn) -> ZResult<Option<(u64, UpdateResult)>> { pub fn last_update_id(
self,
reader: &heed::RoTxn,
) -> ZResult<Option<(u64, ProcessedUpdateResult)>> {
match self.updates_results.last(reader)? { match self.updates_results.last(reader)? {
Some((key, data)) => Ok(Some((key.get(), data))), Some((key, data)) => Ok(Some((key.get(), data))),
None => Ok(None), None => Ok(None),
@ -20,7 +24,7 @@ impl UpdatesResults {
self, self,
writer: &mut heed::RwTxn, writer: &mut heed::RwTxn,
update_id: u64, update_id: u64,
update_result: &UpdateResult, update_result: &ProcessedUpdateResult,
) -> ZResult<()> { ) -> ZResult<()> {
let update_id = BEU64::new(update_id); let update_id = BEU64::new(update_id);
self.updates_results.put(writer, &update_id, update_result) self.updates_results.put(writer, &update_id, update_result)
@ -30,7 +34,7 @@ impl UpdatesResults {
self, self,
reader: &heed::RoTxn, reader: &heed::RoTxn,
update_id: u64, update_id: u64,
) -> ZResult<Option<UpdateResult>> { ) -> ZResult<Option<ProcessedUpdateResult>> {
let update_id = BEU64::new(update_id); let update_id = BEU64::new(update_id);
self.updates_results.get(reader, &update_id) self.updates_results.get(reader, &update_id)
} }

View File

@ -42,6 +42,36 @@ pub enum Update {
StopWordsDeletion(BTreeSet<String>), StopWordsDeletion(BTreeSet<String>),
} }
impl Update {
pub fn update_type(&self) -> UpdateType {
match self {
Update::ClearAll => UpdateType::ClearAll,
Update::Schema(schema) => UpdateType::Schema {
schema: schema.clone(),
},
Update::Customs(_) => UpdateType::Customs,
Update::DocumentsAddition(addition) => UpdateType::DocumentsAddition {
number: addition.len(),
},
Update::DocumentsDeletion(deletion) => UpdateType::DocumentsDeletion {
number: deletion.len(),
},
Update::SynonymsAddition(addition) => UpdateType::SynonymsAddition {
number: addition.len(),
},
Update::SynonymsDeletion(deletion) => UpdateType::SynonymsDeletion {
number: deletion.len(),
},
Update::StopWordsAddition(addition) => UpdateType::StopWordsAddition {
number: addition.len(),
},
Update::StopWordsDeletion(deletion) => UpdateType::StopWordsDeletion {
number: deletion.len(),
},
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateType { pub enum UpdateType {
ClearAll, ClearAll,
@ -61,17 +91,23 @@ pub struct DetailedDuration {
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateResult { pub struct ProcessedUpdateResult {
pub update_id: u64, pub update_id: u64,
pub update_type: UpdateType, pub update_type: UpdateType,
pub result: Result<(), String>, pub result: Result<(), String>,
pub detailed_duration: DetailedDuration, pub detailed_duration: DetailedDuration,
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnqueuedUpdateResult {
pub update_id: u64,
pub update_type: UpdateType,
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateStatus { pub enum UpdateStatus {
Enqueued, Enqueued(EnqueuedUpdateResult),
Processed(UpdateResult), Processed(ProcessedUpdateResult),
Unknown, Unknown,
} }
@ -84,8 +120,11 @@ pub fn update_status(
match updates_results_store.update_result(reader, update_id)? { match updates_results_store.update_result(reader, update_id)? {
Some(result) => Ok(UpdateStatus::Processed(result)), Some(result) => Ok(UpdateStatus::Processed(result)),
None => { None => {
if updates_store.contains(reader, update_id)? { if let Some(update) = updates_store.get(reader, update_id)? {
Ok(UpdateStatus::Enqueued) Ok(UpdateStatus::Enqueued(EnqueuedUpdateResult {
update_id,
update_type: update.update_type(),
}))
} else { } else {
Ok(UpdateStatus::Unknown) Ok(UpdateStatus::Unknown)
} }
@ -110,7 +149,10 @@ pub fn next_update_id(
Ok(new_update_id) Ok(new_update_id)
} }
pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult<Option<UpdateResult>> { pub fn update_task(
writer: &mut heed::RwTxn,
index: store::Index,
) -> MResult<Option<ProcessedUpdateResult>> {
let (update_id, update) = match index.updates.pop_front(writer)? { let (update_id, update) = match index.updates.pop_front(writer)? {
Some(value) => value, Some(value) => value,
None => return Ok(None), None => return Ok(None),
@ -259,7 +301,7 @@ pub fn update_task(writer: &mut heed::RwTxn, index: store::Index) -> MResult<Opt
); );
let detailed_duration = DetailedDuration { main: duration }; let detailed_duration = DetailedDuration { main: duration };
let status = UpdateResult { let status = ProcessedUpdateResult {
update_id, update_id,
update_type, update_type,
result: result.map_err(|e| e.to_string()), result: result.map_err(|e| e.to_string()),