From 83c1db8763a87b83b8859ecae30ca84c5cd98f0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Sun, 18 Oct 2020 15:16:57 +0200 Subject: [PATCH] Introduce the UpdateStore --- Cargo.lock | 5 +- Cargo.toml | 1 + src/lib.rs | 2 + src/update_store.rs | 132 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 138 insertions(+), 2 deletions(-) create mode 100644 src/update_store.rs diff --git a/Cargo.lock b/Cargo.lock index f0499caee..95c770b0a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -978,6 +978,7 @@ dependencies = [ "ringtail", "roaring", "serde", + "serde_json", "slice-group-by", "smallstr", "smallvec", @@ -1716,9 +1717,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.53" +version = "1.0.59" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "993948e75b189211a9b31a7528f950c6adc21f9720b6438ff80a7fa2f864cea2" +checksum = "dcac07dbffa1c65e7f816ab9eba78eb142c6d44410f4eeba1e26e4f5dfa56b95" dependencies = [ "itoa", "ryu", diff --git a/Cargo.toml b/Cargo.toml index 1e66054b2..ec5ccea40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ once_cell = "1.4.0" rayon = "1.3.1" ringtail = "0.3.0" roaring = "0.6.1" +serde_json = "1.0.59" slice-group-by = "0.2.6" smallstr = "0.2.0" smallvec = "1.4.0" diff --git a/src/lib.rs b/src/lib.rs index a9ef09162..4ae60525d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ mod criterion; mod mdfs; mod query_tokens; mod search; +mod update_store; pub mod heed_codec; pub mod proximity; pub mod tokenizer; @@ -29,6 +30,7 @@ pub type SmallString32 = smallstr::SmallString<[u8; 32]>; pub type SmallVec32 = smallvec::SmallVec<[T; 32]>; pub type SmallVec16 = smallvec::SmallVec<[T; 16]>; pub type BEU32 = heed::zerocopy::U32; +pub type BEU64 = heed::zerocopy::U64; pub type DocumentId = u32; pub type Attribute = u32; pub type Position = u32; diff --git a/src/update_store.rs b/src/update_store.rs new file mode 100644 index 000000000..b71cfab42 --- /dev/null +++ b/src/update_store.rs @@ -0,0 +1,132 @@ +use std::path::Path; + +use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice}; +use heed::{EnvOpenOptions, Env, Database}; +use serde::{Serialize, Deserialize}; + +use crate::BEU64; + +#[derive(Clone)] +pub struct UpdateStore { + env: Env, + pending_meta: Database, SerdeJson>, + pending: Database, ByteSlice>, + processed_meta: Database, SerdeJson>, +} + +impl UpdateStore { + pub fn open>(options: EnvOpenOptions, path: P) -> heed::Result> { + let env = options.open(path)?; + let pending_meta = env.create_database(Some("pending-meta"))?; + let pending = env.create_database(Some("pending"))?; + let processed_meta = env.create_database(Some("processed-meta"))?; + Ok(UpdateStore { env, pending, pending_meta, processed_meta }) + } + + /// Returns the new biggest id to use to store the new update. + fn new_update_id(&self, txn: &heed::RoTxn) -> heed::Result { + let last_pending = self.pending_meta + .as_polymorph() + .last::<_, OwnedType, DecodeIgnore>(txn)? + .map(|(k, _)| k.get()); + + if let Some(last_id) = last_pending { + return Ok(last_id + 1); + } + + let last_processed = self.processed_meta + .as_polymorph() + .last::<_, OwnedType, DecodeIgnore>(txn)? + .map(|(k, _)| k.get()); + + match last_processed { + Some(last_id) => Ok(last_id + 1), + None => Ok(0), + } + } + + /// Registers the update content in the pending store and the meta + /// into the pending-meta store. Returns the new unique update id. + pub fn register_update(&self, meta: &M, content: &[u8]) -> heed::Result + where M: Serialize, + { + let mut wtxn = self.env.write_txn()?; + + // We ask the update store to give us a new update id, this is safe, + // no other update can have the same id because we use a write txn before + // asking for the id and registering it so other update registering + // will be forced to wait for a new write txn. + let update_id = self.new_update_id(&wtxn)?; + let update_key = BEU64::new(update_id); + + self.pending_meta.put(&mut wtxn, &update_key, meta)?; + self.pending.put(&mut wtxn, &update_key, content)?; + + wtxn.commit()?; + + Ok(update_id) + } + + /// Executes the user provided function on the next pending update (the one with the lowest id). + /// This is asynchronous as it let the user process the update with a read-only txn and + /// only writing the result meta to the processed-meta store *after* it has been processed. + pub fn process_pending_update(&self, mut f: F) -> heed::Result> + where + F: FnMut(u64, M, &[u8]) -> heed::Result, + M: for<'a> Deserialize<'a> + Serialize, + { + // Create a read transaction to be able to retrieve the pending update in order. + let rtxn = self.env.read_txn()?; + let first_meta = self.pending_meta.first(&rtxn)?; + + // If there is a pending update we process and only keep + // a reader while processing it, not a writer. + match first_meta { + Some((first_id, first_meta)) => { + let first_content = self.pending + .get(&rtxn, &first_id)? + .expect("associated update content"); + + // Process the pending update using the provided user function. + let new_meta = (f)(first_id.get(), first_meta, first_content)?; + drop(rtxn); + + // Once the pending update have been successfully processed + // we must remove the content from the pending stores and + // write the *new* meta to the processed-meta store and commit. + let mut wtxn = self.env.write_txn()?; + self.pending_meta.delete(&mut wtxn, &first_id)?; + self.pending.delete(&mut wtxn, &first_id)?; + self.processed_meta.put(&mut wtxn, &first_id, &new_meta)?; + wtxn.commit()?; + + Ok(Some((first_id.get(), new_meta))) + }, + None => Ok(None) + } + } + + /// Iterate over the pending and the processed metas one after the other, + /// calling the user defined callback for each meta. + pub fn iter_meta(&self, mut f: F) -> heed::Result<()> + where + M: for<'a> Deserialize<'a>, + F: FnMut(u64, M), + { + let rtxn = self.env.read_txn()?; + + // We iterate over the pending updates. + for result in self.pending_meta.iter(&rtxn)? { + let (key, meta) = result?; + (f)(key.get(), meta); + } + + // We iterate over the processed updates. + for result in self.processed_meta.iter(&rtxn)? { + let (key, meta) = result?; + (f)(key.get(), meta); + } + + Ok(()) + } +}