Make the UpdatesResults store work

This commit is contained in:
Clément Renault 2019-10-03 16:54:37 +02:00
parent af9fd9f552
commit 00c70d3cb5
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 52 additions and 7 deletions

View File

@ -33,6 +33,16 @@ impl Updates {
Ok(Some((number, last_data))) Ok(Some((number, last_data)))
} }
pub fn contains<T: rkv::Readable>(
&self,
reader: &T,
update_id: u64,
) -> Result<bool, rkv::StoreError>
{
let update_id_bytes = update_id.to_be_bytes();
self.updates.get(reader, update_id_bytes).map(|v| v.is_some())
}
pub fn push_back( pub fn push_back(
&self, &self,
writer: &mut rkv::Writer, writer: &mut rkv::Writer,

View File

@ -1,3 +1,4 @@
use rkv::Value;
use crate::update::UpdateResult; use crate::update::UpdateResult;
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
@ -13,19 +14,27 @@ impl UpdatesResults {
update_result: &UpdateResult, update_result: &UpdateResult,
) -> Result<(), rkv::StoreError> ) -> Result<(), rkv::StoreError>
{ {
// let update = rmp_serde::to_vec_named(&addition)?; let update_id_bytes = update_id.to_be_bytes();
let update_result = bincode::serialize(&update_result).unwrap();
// WARN could not retrieve the last key/data entry of a tree... let blob = Value::Blob(&update_result);
// self.updates.get(writer, )?; self.updates_results.put(writer, update_id_bytes, &blob)
unimplemented!()
} }
pub fn update_result<T: rkv::Readable>( pub fn update_result<T: rkv::Readable>(
&self,
reader: &T, reader: &T,
update_id: u64, update_id: u64,
) -> Result<Option<UpdateResult>, rkv::StoreError> ) -> Result<Option<UpdateResult>, rkv::StoreError>
{ {
unimplemented!() let update_id_bytes = update_id.to_be_bytes();
match self.updates_results.get(reader, update_id_bytes)? {
Some(Value::Blob(bytes)) => {
let update_result = bincode::deserialize(&bytes).unwrap();
Ok(Some(update_result))
},
Some(value) => panic!("invalid type {:?}", value),
None => Ok(None),
}
} }
} }

View File

@ -38,6 +38,32 @@ pub struct UpdateResult {
pub detailed_duration: DetailedDuration, pub detailed_duration: DetailedDuration,
} }
#[derive(Clone, Serialize, Deserialize)]
pub enum UpdateStatus {
Enqueued,
Processed(UpdateResult),
Unknown,
}
pub fn update_status<T: rkv::Readable>(
reader: &T,
updates_store: store::Updates,
updates_results_store: store::UpdatesResults,
update_id: u64,
) -> Result<UpdateStatus, rkv::StoreError>
{
match updates_results_store.update_result(reader, update_id)? {
Some(result) => Ok(UpdateStatus::Processed(result)),
None => {
if updates_store.contains(reader, update_id)? {
Ok(UpdateStatus::Enqueued)
} else {
Ok(UpdateStatus::Unknown)
}
}
}
}
pub fn push_documents_addition<D: serde::Serialize>( pub fn push_documents_addition<D: serde::Serialize>(
writer: &mut rkv::Writer, writer: &mut rkv::Writer,
updates_store: store::Updates, updates_store: store::Updates,