Simplify the processing_update UpdateStore method

This commit is contained in:
Clément Renault 2020-11-30 11:00:03 +01:00 committed by Kerollmops
parent 878b1873cd
commit 222f2913c1
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4

View File

@ -1,6 +1,5 @@
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use crossbeam_channel::Sender; use crossbeam_channel::Sender;
use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice}; use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice};
@ -17,7 +16,6 @@ pub struct UpdateStore<M, N> {
processed_meta: Database<OwnedType<BEU64>, SerdeJson<N>>, processed_meta: Database<OwnedType<BEU64>, SerdeJson<N>>,
aborted_meta: Database<OwnedType<BEU64>, SerdeJson<M>>, aborted_meta: Database<OwnedType<BEU64>, SerdeJson<M>>,
notification_sender: Sender<()>, notification_sender: Sender<()>,
processing_update_id: Arc<AtomicU64>,
} }
impl<M: 'static, N: 'static> UpdateStore<M, N> { impl<M: 'static, N: 'static> UpdateStore<M, N> {
@ -50,7 +48,6 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
processed_meta, processed_meta,
aborted_meta, aborted_meta,
notification_sender, notification_sender,
processing_update_id: Arc::new(AtomicU64::new(u64::MAX)),
}); });
let update_store_cloned = update_store.clone(); let update_store_cloned = update_store.clone();
@ -147,10 +144,7 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
.expect("associated update content"); .expect("associated update content");
// Process the pending update using the provided user function. // Process the pending update using the provided user function.
self.processing_update_id.store(first_id.get(), Ordering::Relaxed); let new_meta = (f)(first_id.get(), first_meta, first_content)?;
let result = (f)(first_id.get(), first_meta, first_content);
self.processing_update_id.store(u64::MAX, Ordering::Relaxed);
let new_meta = result?;
drop(rtxn); drop(rtxn);
// Once the pending update have been successfully processed // Once the pending update have been successfully processed
@ -168,12 +162,15 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
} }
} }
/// The id of the update tha is currently being processed, /// The id and metadata of the update that is currently being processed,
/// `None` if no update is being processed. /// `None` if no update is being processed.
pub fn processing_update_id(&self) -> Option<u64> { pub fn processing_update(&self) -> heed::Result<Option<(u64, M)>>
match self.processing_update_id.load(Ordering::Relaxed) { where M: for<'a> Deserialize<'a>,
u64::MAX => None, {
update_id => Some(update_id), let rtxn = self.env.read_txn()?;
match self.pending_meta.first(&rtxn)? {
Some((key, meta)) => Ok(Some((key.get(), meta))),
None => Ok(None),
} }
} }