diff --git a/src/update_store.rs b/src/update_store.rs index c480d7766..9ffa2a7bc 100644 --- a/src/update_store.rs +++ b/src/update_store.rs @@ -1,6 +1,5 @@ use std::path::Path; use std::sync::Arc; -use std::sync::atomic::{AtomicU64, Ordering}; use crossbeam_channel::Sender; use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice}; @@ -17,7 +16,6 @@ pub struct UpdateStore { processed_meta: Database, SerdeJson>, aborted_meta: Database, SerdeJson>, notification_sender: Sender<()>, - processing_update_id: Arc, } impl UpdateStore { @@ -50,7 +48,6 @@ impl UpdateStore { processed_meta, aborted_meta, notification_sender, - processing_update_id: Arc::new(AtomicU64::new(u64::MAX)), }); let update_store_cloned = update_store.clone(); @@ -147,10 +144,7 @@ impl UpdateStore { .expect("associated update content"); // Process the pending update using the provided user function. - self.processing_update_id.store(first_id.get(), Ordering::Relaxed); - let result = (f)(first_id.get(), first_meta, first_content); - self.processing_update_id.store(u64::MAX, Ordering::Relaxed); - let new_meta = result?; + let new_meta = (f)(first_id.get(), first_meta, first_content)?; drop(rtxn); // Once the pending update have been successfully processed @@ -168,12 +162,15 @@ impl UpdateStore { } } - /// The id of the update tha is currently being processed, + /// The id and metadata of the update that is currently being processed, /// `None` if no update is being processed. - pub fn processing_update_id(&self) -> Option { - match self.processing_update_id.load(Ordering::Relaxed) { - u64::MAX => None, - update_id => Some(update_id), + pub fn processing_update(&self) -> heed::Result> + where M: for<'a> Deserialize<'a>, + { + let rtxn = self.env.read_txn()?; + match self.pending_meta.first(&rtxn)? { + Some((key, meta)) => Ok(Some((key.get(), meta))), + None => Ok(None), } }