mod codec; pub mod dump; use std::fs::{copy, create_dir_all, remove_file, File}; use std::path::Path; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::{ collections::{BTreeMap, HashSet}, path::PathBuf, }; use arc_swap::ArcSwap; use futures::StreamExt; use heed::types::{ByteSlice, OwnedType, SerdeJson}; use heed::zerocopy::U64; use heed::{CompactionOption, Database, Env, EnvOpenOptions}; use log::error; use parking_lot::{Mutex, MutexGuard}; use tokio::runtime::Handle; use tokio::sync::mpsc; use uuid::Uuid; use codec::*; use super::error::Result; use super::UpdateMeta; use crate::helpers::EnvSizer; use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle}; #[allow(clippy::upper_case_acronyms)] type BEU64 = U64; const UPDATE_DIR: &str = "update_files"; pub struct UpdateStoreInfo { /// Size of the update store in bytes. pub size: u64, /// Uuid of the currently processing update if it exists pub processing: Option, } /// A data structure that allows concurrent reads AND exactly one writer. pub struct StateLock { lock: Mutex<()>, data: ArcSwap, } pub struct StateLockGuard<'a> { _lock: MutexGuard<'a, ()>, state: &'a StateLock, } impl StateLockGuard<'_> { pub fn swap(&self, state: State) -> Arc { self.state.data.swap(Arc::new(state)) } } impl StateLock { fn from_state(state: State) -> Self { let lock = Mutex::new(()); let data = ArcSwap::from(Arc::new(state)); Self { lock, data } } pub fn read(&self) -> Arc { self.data.load().clone() } pub fn write(&self) -> StateLockGuard { let _lock = self.lock.lock(); let state = &self; StateLockGuard { _lock, state } } } #[allow(clippy::large_enum_variant)] pub enum State { Idle, Processing(Uuid, Processing), Snapshoting, Dumping, } #[derive(Clone)] pub struct UpdateStore { pub env: Env, /// A queue containing the updates to process, ordered by arrival. /// The key are built as follow: /// | global_update_id | index_uuid | update_id | /// | 8-bytes | 16-bytes | 8-bytes | pending_queue: Database>, /// Map indexes to the next available update id. If NextIdKey::Global is queried, then the next /// global update id is returned next_update_id: Database>, /// Contains all the performed updates meta, be they failed, aborted, or processed. /// The keys are built as follow: /// | Uuid | id | /// | 16-bytes | 8-bytes | updates: Database>, /// Indicates the current state of the update store, state: Arc, /// Wake up the loop when a new event occurs. notification_sender: mpsc::Sender<()>, path: PathBuf, } impl UpdateStore { fn new( mut options: EnvOpenOptions, path: impl AsRef, ) -> anyhow::Result<(Self, mpsc::Receiver<()>)> { options.max_dbs(5); let env = options.open(&path)?; let pending_queue = env.create_database(Some("pending-queue"))?; let next_update_id = env.create_database(Some("next-update-id"))?; let updates = env.create_database(Some("updates"))?; let state = Arc::new(StateLock::from_state(State::Idle)); let (notification_sender, notification_receiver) = mpsc::channel(10); Ok(( Self { env, pending_queue, next_update_id, updates, state, notification_sender, path: path.as_ref().to_owned(), }, notification_receiver, )) } pub fn open( options: EnvOpenOptions, path: impl AsRef, index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static, must_exit: Arc, ) -> anyhow::Result> { let (update_store, mut notification_receiver) = Self::new(options, path)?; let update_store = Arc::new(update_store); // Send a first notification to trigger the process. let _ = update_store.notification_sender.send(()); // Init update loop to perform any pending updates at launch. // Since we just launched the update store, and we still own the receiving end of the // channel, this call is guaranteed to succeed. update_store .notification_sender .try_send(()) .expect("Failed to init update store"); // We need a weak reference so we can take ownership on the arc later when we // want to close the index. let update_store_weak = Arc::downgrade(&update_store); tokio::task::spawn(async move { // Block and wait for something to process. 'outer: while notification_receiver.recv().await.is_some() { loop { match update_store_weak.upgrade() { Some(update_store) => { let handler = index_handle.clone(); let res = tokio::task::spawn_blocking(move || { update_store.process_pending_update(handler) }) .await .expect("Fatal error processing update."); match res { Ok(Some(_)) => (), Ok(None) => break, Err(e) => { error!("Fatal error while processing an update that requires the update store to shutdown: {}", e); must_exit.store(true, Ordering::SeqCst); break 'outer; } } } // the ownership on the arc has been taken, we need to exit. None => break 'outer, } } } error!("Update store loop exited."); }); Ok(update_store) } /// Returns the next global update id and the next update id for a given `index_uuid`. fn next_update_id(&self, txn: &mut heed::RwTxn, index_uuid: Uuid) -> heed::Result<(u64, u64)> { let global_id = self .next_update_id .get(txn, &NextIdKey::Global)? .map(U64::get) .unwrap_or_default(); self.next_update_id .put(txn, &NextIdKey::Global, &BEU64::new(global_id + 1))?; let update_id = self.next_update_id_raw(txn, index_uuid)?; Ok((global_id, update_id)) } /// Returns the next next update id for a given `index_uuid` without /// incrementing the global update id. This is useful for the dumps. fn next_update_id_raw(&self, txn: &mut heed::RwTxn, index_uuid: Uuid) -> heed::Result { let update_id = self .next_update_id .get(txn, &NextIdKey::Index(index_uuid))? .map(U64::get) .unwrap_or_default(); self.next_update_id.put( txn, &NextIdKey::Index(index_uuid), &BEU64::new(update_id + 1), )?; Ok(update_id) } /// Registers the update content in the pending store and the meta /// into the pending-meta store. Returns the new unique update id. pub fn register_update( &self, meta: UpdateMeta, content: Option, index_uuid: Uuid, ) -> heed::Result { let mut txn = self.env.write_txn()?; let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?; let meta = Enqueued::new(meta, update_id, content); self.pending_queue .put(&mut txn, &(global_id, index_uuid, update_id), &meta)?; txn.commit()?; self.notification_sender .blocking_send(()) .expect("Update store loop exited."); Ok(meta) } /// Push already processed update in the UpdateStore without triggering the notification /// process. This is useful for the dumps. pub fn register_raw_updates( &self, wtxn: &mut heed::RwTxn, update: &UpdateStatus, index_uuid: Uuid, ) -> heed::Result<()> { match update { UpdateStatus::Enqueued(enqueued) => { let (global_id, _update_id) = self.next_update_id(wtxn, index_uuid)?; self.pending_queue.remap_key_type::().put( wtxn, &(global_id, index_uuid, enqueued.id()), &enqueued, )?; } _ => { let _update_id = self.next_update_id_raw(wtxn, index_uuid)?; self.updates .put(wtxn, &(index_uuid, update.id()), &update)?; } } Ok(()) } /// Executes the user provided function on the next pending update (the one with the lowest id). /// This is asynchronous as it let the user process the update with a read-only txn and /// only writing the result meta to the processed-meta store *after* it has been processed. fn process_pending_update(&self, index_handle: impl IndexActorHandle) -> Result> { // Create a read transaction to be able to retrieve the pending update in order. let rtxn = self.env.read_txn()?; let first_meta = self.pending_queue.first(&rtxn)?; drop(rtxn); // If there is a pending update we process and only keep // a reader while processing it, not a writer. match first_meta { Some(((global_id, index_uuid, _), mut pending)) => { let content = pending.content.take(); let processing = pending.processing(); // Acquire the state lock and set the current state to processing. // txn must *always* be acquired after state lock, or it will dead lock. let state = self.state.write(); state.swap(State::Processing(index_uuid, processing.clone())); let result = self.perform_update(content, processing, index_handle, index_uuid, global_id); state.swap(State::Idle); result } None => Ok(None), } } fn perform_update( &self, content: Option, processing: Processing, index_handle: impl IndexActorHandle, index_uuid: Uuid, global_id: u64, ) -> Result> { let content_path = content.map(|uuid| update_uuid_to_file_path(&self.path, uuid)); let update_id = processing.id(); let file = match content_path { Some(ref path) => { let file = File::open(path)?; Some(file) } None => None, }; // Process the pending update using the provided user function. let handle = Handle::current(); let result = match handle.block_on(index_handle.update(index_uuid, processing.clone(), file)) { Ok(result) => result, Err(e) => Err(processing.fail(e.into())), }; // Once the pending update have been successfully processed // we must remove the content from the pending and processing stores and // write the *new* meta to the processed-meta store and commit. let mut wtxn = self.env.write_txn()?; self.pending_queue .delete(&mut wtxn, &(global_id, index_uuid, update_id))?; let result = match result { Ok(res) => res.into(), Err(res) => res.into(), }; self.updates .put(&mut wtxn, &(index_uuid, update_id), &result)?; wtxn.commit()?; if let Some(ref path) = content_path { remove_file(&path)?; } Ok(Some(())) } /// List the updates for `index_uuid`. pub fn list(&self, index_uuid: Uuid) -> Result> { let mut update_list = BTreeMap::::new(); let txn = self.env.read_txn()?; let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); for entry in pendings { let ((_, uuid, id), pending) = entry?; if uuid == index_uuid { update_list.insert(id, pending.decode()?.into()); } } let updates = self .updates .remap_key_type::() .prefix_iter(&txn, index_uuid.as_bytes())?; for entry in updates { let (_, update) = entry?; update_list.insert(update.id(), update); } // If the currently processing update is from this index, replace the corresponding pending update with this one. match *self.state.read() { State::Processing(uuid, ref processing) if uuid == index_uuid => { update_list.insert(processing.id(), processing.clone().into()); } _ => (), } Ok(update_list.into_iter().map(|(_, v)| v).collect()) } /// Returns the update associated meta or `None` if the update doesn't exist. pub fn meta(&self, index_uuid: Uuid, update_id: u64) -> heed::Result> { // Check if the update is the one currently processing match *self.state.read() { State::Processing(uuid, ref processing) if uuid == index_uuid && processing.id() == update_id => { return Ok(Some(processing.clone().into())); } _ => (), } let txn = self.env.read_txn()?; // Else, check if it is in the updates database: let update = self.updates.get(&txn, &(index_uuid, update_id))?; if let Some(update) = update { return Ok(Some(update)); } // If nothing was found yet, we resolve to iterate over the pending queue. let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); for entry in pendings { let ((_, uuid, id), pending) = entry?; if uuid == index_uuid && id == update_id { return Ok(Some(pending.decode()?.into())); } } // No update was found. Ok(None) } /// Delete all updates for an index from the update store. pub fn delete_all(&self, index_uuid: Uuid) -> Result<()> { let mut txn = self.env.write_txn()?; // Contains all the content file paths that we need to be removed if the deletion was successful. let mut uuids_to_remove = Vec::new(); let mut pendings = self.pending_queue.iter_mut(&mut txn)?.lazily_decode_data(); while let Some(Ok(((_, uuid, _), pending))) = pendings.next() { if uuid == index_uuid { pendings.del_current()?; let mut pending = pending.decode()?; if let Some(update_uuid) = pending.content.take() { uuids_to_remove.push(update_uuid); } } } drop(pendings); let mut updates = self .updates .remap_key_type::() .prefix_iter_mut(&mut txn, index_uuid.as_bytes())? .lazily_decode_data(); while let Some(_) = updates.next() { updates.del_current()?; } drop(updates); txn.commit()?; uuids_to_remove .iter() .map(|uuid| update_uuid_to_file_path(&self.path, *uuid)) .for_each(|path| { let _ = remove_file(path); }); // We don't care about the currently processing update, since it will be removed by itself // once its done processing, and we can't abort a running update. Ok(()) } pub fn snapshot( &self, uuids: &HashSet, path: impl AsRef, handle: impl IndexActorHandle + Clone, ) -> Result<()> { let state_lock = self.state.write(); state_lock.swap(State::Snapshoting); let txn = self.env.write_txn()?; let update_path = path.as_ref().join("updates"); create_dir_all(&update_path)?; // acquire write lock to prevent further writes during snapshot create_dir_all(&update_path)?; let db_path = update_path.join("data.mdb"); // create db snapshot self.env.copy_to_path(&db_path, CompactionOption::Enabled)?; let update_files_path = update_path.join(UPDATE_DIR); create_dir_all(&update_files_path)?; let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); for entry in pendings { let ((_, uuid, _), pending) = entry?; if uuids.contains(&uuid) { if let Enqueued { content: Some(uuid), .. } = pending.decode()? { let path = update_uuid_to_file_path(&self.path, uuid); copy(path, &update_files_path)?; } } } let path = &path.as_ref().to_path_buf(); let handle = &handle; // Perform the snapshot of each index concurently. Only a third of the capabilities of // the index actor at a time not to put too much pressure on the index actor let mut stream = futures::stream::iter(uuids.iter()) .map(move |uuid| handle.snapshot(*uuid, path.clone())) .buffer_unordered(CONCURRENT_INDEX_MSG / 3); Handle::current().block_on(async { while let Some(res) = stream.next().await { res?; } Ok(()) as Result<()> })?; Ok(()) } pub fn get_info(&self) -> Result { let mut size = self.env.size(); let txn = self.env.read_txn()?; for entry in self.pending_queue.iter(&txn)? { let (_, pending) = entry?; if let Enqueued { content: Some(uuid), .. } = pending { let path = update_uuid_to_file_path(&self.path, uuid); size += File::open(path)?.metadata()?.len(); } } let processing = match *self.state.read() { State::Processing(uuid, _) => Some(uuid), _ => None, }; Ok(UpdateStoreInfo { size, processing }) } } fn update_uuid_to_file_path(root: impl AsRef, uuid: Uuid) -> PathBuf { root.as_ref() .join(UPDATE_DIR) .join(format!("update_{}", uuid)) } #[cfg(test)] mod test { use super::*; use crate::index_controller::{UpdateResult, index_actor::{MockIndexActorHandle, error::IndexActorError}}; use futures::future::ok; #[actix_rt::test] async fn test_next_id() { let dir = tempfile::tempdir_in(".").unwrap(); let mut options = EnvOpenOptions::new(); let handle = Arc::new(MockIndexActorHandle::new()); options.map_size(4096 * 100); let update_store = UpdateStore::open( options, dir.path(), handle, Arc::new(AtomicBool::new(false)), ) .unwrap(); let index1_uuid = Uuid::new_v4(); let index2_uuid = Uuid::new_v4(); let mut txn = update_store.env.write_txn().unwrap(); let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); txn.commit().unwrap(); assert_eq!((0, 0), ids); let mut txn = update_store.env.write_txn().unwrap(); let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap(); txn.commit().unwrap(); assert_eq!((1, 0), ids); let mut txn = update_store.env.write_txn().unwrap(); let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap(); txn.commit().unwrap(); assert_eq!((2, 1), ids); } #[actix_rt::test] async fn test_register_update() { let dir = tempfile::tempdir_in(".").unwrap(); let mut options = EnvOpenOptions::new(); let handle = Arc::new(MockIndexActorHandle::new()); options.map_size(4096 * 100); let update_store = UpdateStore::open( options, dir.path(), handle, Arc::new(AtomicBool::new(false)), ) .unwrap(); let meta = UpdateMeta::ClearDocuments; let uuid = Uuid::new_v4(); let store_clone = update_store.clone(); tokio::task::spawn_blocking(move || { store_clone.register_update(meta, None, uuid).unwrap(); }) .await .unwrap(); let txn = update_store.env.read_txn().unwrap(); assert!(update_store .pending_queue .get(&txn, &(0, uuid, 0)) .unwrap() .is_some()); } #[actix_rt::test] async fn test_process_update() { let dir = tempfile::tempdir_in(".").unwrap(); let mut handle = MockIndexActorHandle::new(); handle .expect_update() .times(2) .returning(|_index_uuid, processing, _file| { if processing.id() == 0 { Box::pin(ok(Ok(processing.process(UpdateResult::Other)))) } else { Box::pin(ok(Err(processing.fail(IndexActorError::ExistingPrimaryKey.into())))) } }); let handle = Arc::new(handle); let mut options = EnvOpenOptions::new(); options.map_size(4096 * 100); let store = UpdateStore::open( options, dir.path(), handle.clone(), Arc::new(AtomicBool::new(false)), ) .unwrap(); // wait a bit for the event loop exit. tokio::time::sleep(std::time::Duration::from_millis(50)).await; let mut txn = store.env.write_txn().unwrap(); let update = Enqueued::new(UpdateMeta::ClearDocuments, 0, None); let uuid = Uuid::new_v4(); store .pending_queue .put(&mut txn, &(0, uuid, 0), &update) .unwrap(); let update = Enqueued::new(UpdateMeta::ClearDocuments, 1, None); store .pending_queue .put(&mut txn, &(1, uuid, 1), &update) .unwrap(); txn.commit().unwrap(); // Process the pending, and check that it has been moved to the update databases, and // removed from the pending database. let store_clone = store.clone(); tokio::task::spawn_blocking(move || { store_clone.process_pending_update(handle.clone()).unwrap(); store_clone.process_pending_update(handle).unwrap(); }) .await .unwrap(); let txn = store.env.read_txn().unwrap(); assert!(store.pending_queue.first(&txn).unwrap().is_none()); let update = store.updates.get(&txn, &(uuid, 0)).unwrap().unwrap(); assert!(matches!(update, UpdateStatus::Processed(_))); let update = store.updates.get(&txn, &(uuid, 1)).unwrap().unwrap(); assert!(matches!(update, UpdateStatus::Failed(_))); } }