2021-05-25 16:33:09 +02:00
|
|
|
mod codec;
|
2021-05-31 16:03:39 +02:00
|
|
|
pub mod dump;
|
2021-05-25 16:33:09 +02:00
|
|
|
|
2021-04-12 16:59:16 +02:00
|
|
|
use std::fs::{copy, create_dir_all, remove_file, File};
|
2021-04-22 10:14:29 +02:00
|
|
|
use std::path::Path;
|
2021-06-09 16:19:45 +02:00
|
|
|
use std::sync::atomic::{AtomicBool, Ordering};
|
2021-03-11 20:58:51 +01:00
|
|
|
use std::sync::Arc;
|
2021-05-31 16:03:39 +02:00
|
|
|
use std::{
|
|
|
|
collections::{BTreeMap, HashSet},
|
|
|
|
path::PathBuf,
|
2021-06-30 17:29:22 +02:00
|
|
|
time::Duration,
|
2021-05-31 16:03:39 +02:00
|
|
|
};
|
2021-02-27 10:19:05 +01:00
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
use arc_swap::ArcSwap;
|
2021-05-10 20:20:36 +02:00
|
|
|
use futures::StreamExt;
|
2021-04-22 10:14:29 +02:00
|
|
|
use heed::types::{ByteSlice, OwnedType, SerdeJson};
|
|
|
|
use heed::zerocopy::U64;
|
2021-05-25 16:33:09 +02:00
|
|
|
use heed::{CompactionOption, Database, Env, EnvOpenOptions};
|
2021-05-05 18:03:21 +02:00
|
|
|
use log::error;
|
2021-04-22 10:14:29 +02:00
|
|
|
use parking_lot::{Mutex, MutexGuard};
|
|
|
|
use tokio::runtime::Handle;
|
2021-03-04 17:25:02 +01:00
|
|
|
use tokio::sync::mpsc;
|
2021-06-30 17:29:22 +02:00
|
|
|
use tokio::sync::mpsc::error::TrySendError;
|
|
|
|
use tokio::time::timeout;
|
2021-02-27 10:19:05 +01:00
|
|
|
use uuid::Uuid;
|
|
|
|
|
2021-05-25 16:33:09 +02:00
|
|
|
use codec::*;
|
|
|
|
|
2021-06-14 21:26:35 +02:00
|
|
|
use super::error::Result;
|
2021-06-15 17:39:07 +02:00
|
|
|
use super::UpdateMeta;
|
2021-06-14 21:26:35 +02:00
|
|
|
use crate::helpers::EnvSizer;
|
2021-06-15 17:39:07 +02:00
|
|
|
use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle};
|
2021-02-27 10:19:05 +01:00
|
|
|
|
2021-04-27 12:16:24 +02:00
|
|
|
#[allow(clippy::upper_case_acronyms)]
|
2021-04-22 10:14:29 +02:00
|
|
|
type BEU64 = U64<heed::byteorder::BE>;
|
2021-02-27 10:19:05 +01:00
|
|
|
|
2021-05-31 16:03:39 +02:00
|
|
|
const UPDATE_DIR: &str = "update_files";
|
2021-05-29 00:08:17 +02:00
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
pub struct UpdateStoreInfo {
|
|
|
|
/// Size of the update store in bytes.
|
|
|
|
pub size: u64,
|
|
|
|
/// Uuid of the currently processing update if it exists
|
|
|
|
pub processing: Option<Uuid>,
|
|
|
|
}
|
2021-04-12 16:59:16 +02:00
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
/// A data structure that allows concurrent reads AND exactly one writer.
|
|
|
|
pub struct StateLock {
|
|
|
|
lock: Mutex<()>,
|
|
|
|
data: ArcSwap<State>,
|
|
|
|
}
|
|
|
|
|
2021-05-25 16:33:09 +02:00
|
|
|
pub struct StateLockGuard<'a> {
|
2021-04-22 10:14:29 +02:00
|
|
|
_lock: MutexGuard<'a, ()>,
|
|
|
|
state: &'a StateLock,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl StateLockGuard<'_> {
|
2021-05-25 16:33:09 +02:00
|
|
|
pub fn swap(&self, state: State) -> Arc<State> {
|
2021-04-22 10:14:29 +02:00
|
|
|
self.state.data.swap(Arc::new(state))
|
2021-04-12 16:59:16 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
impl StateLock {
|
|
|
|
fn from_state(state: State) -> Self {
|
|
|
|
let lock = Mutex::new(());
|
|
|
|
let data = ArcSwap::from(Arc::new(state));
|
2021-04-27 17:51:12 +02:00
|
|
|
Self { lock, data }
|
2021-04-22 10:14:29 +02:00
|
|
|
}
|
2021-04-12 16:59:16 +02:00
|
|
|
|
2021-05-25 16:33:09 +02:00
|
|
|
pub fn read(&self) -> Arc<State> {
|
2021-04-22 10:14:29 +02:00
|
|
|
self.data.load().clone()
|
|
|
|
}
|
2021-04-12 16:59:16 +02:00
|
|
|
|
2021-05-25 16:33:09 +02:00
|
|
|
pub fn write(&self) -> StateLockGuard {
|
2021-04-22 10:14:29 +02:00
|
|
|
let _lock = self.lock.lock();
|
|
|
|
let state = &self;
|
2021-04-27 17:51:12 +02:00
|
|
|
StateLockGuard { _lock, state }
|
2021-04-12 16:59:16 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-04-27 17:51:12 +02:00
|
|
|
#[allow(clippy::large_enum_variant)]
|
2021-04-22 10:14:29 +02:00
|
|
|
pub enum State {
|
|
|
|
Idle,
|
|
|
|
Processing(Uuid, Processing),
|
|
|
|
Snapshoting,
|
2021-05-05 18:03:21 +02:00
|
|
|
Dumping,
|
2021-04-22 10:14:29 +02:00
|
|
|
}
|
2021-04-12 16:59:16 +02:00
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
#[derive(Clone)]
|
|
|
|
pub struct UpdateStore {
|
|
|
|
pub env: Env,
|
|
|
|
/// A queue containing the updates to process, ordered by arrival.
|
|
|
|
/// The key are built as follow:
|
|
|
|
/// | global_update_id | index_uuid | update_id |
|
|
|
|
/// | 8-bytes | 16-bytes | 8-bytes |
|
|
|
|
pending_queue: Database<PendingKeyCodec, SerdeJson<Enqueued>>,
|
|
|
|
/// Map indexes to the next available update id. If NextIdKey::Global is queried, then the next
|
|
|
|
/// global update id is returned
|
|
|
|
next_update_id: Database<NextIdCodec, OwnedType<BEU64>>,
|
|
|
|
/// Contains all the performed updates meta, be they failed, aborted, or processed.
|
|
|
|
/// The keys are built as follow:
|
|
|
|
/// | Uuid | id |
|
|
|
|
/// | 16-bytes | 8-bytes |
|
2021-06-09 17:10:10 +02:00
|
|
|
updates: Database<UpdateKeyCodec, SerdeJson<UpdateStatus>>,
|
2021-04-22 10:14:29 +02:00
|
|
|
/// Indicates the current state of the update store,
|
2021-06-09 16:19:45 +02:00
|
|
|
state: Arc<StateLock>,
|
2021-04-22 10:14:29 +02:00
|
|
|
/// Wake up the loop when a new event occurs.
|
|
|
|
notification_sender: mpsc::Sender<()>,
|
2021-05-29 00:08:17 +02:00
|
|
|
path: PathBuf,
|
2021-04-22 10:14:29 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
impl UpdateStore {
|
2021-05-26 22:52:06 +02:00
|
|
|
fn new(
|
2021-05-10 20:20:36 +02:00
|
|
|
mut options: EnvOpenOptions,
|
|
|
|
path: impl AsRef<Path>,
|
2021-06-15 17:39:07 +02:00
|
|
|
) -> anyhow::Result<(Self, mpsc::Receiver<()>)> {
|
2021-02-27 10:19:05 +01:00
|
|
|
options.max_dbs(5);
|
|
|
|
|
2021-05-29 00:08:17 +02:00
|
|
|
let env = options.open(&path)?;
|
2021-04-22 10:14:29 +02:00
|
|
|
let pending_queue = env.create_database(Some("pending-queue"))?;
|
|
|
|
let next_update_id = env.create_database(Some("next-update-id"))?;
|
|
|
|
let updates = env.create_database(Some("updates"))?;
|
2021-02-27 10:19:05 +01:00
|
|
|
|
2021-04-29 14:45:08 +02:00
|
|
|
let state = Arc::new(StateLock::from_state(State::Idle));
|
|
|
|
|
2021-06-30 17:29:22 +02:00
|
|
|
let (notification_sender, notification_receiver) = mpsc::channel(1);
|
2021-02-27 10:19:05 +01:00
|
|
|
|
2021-05-10 20:20:36 +02:00
|
|
|
Ok((
|
|
|
|
Self {
|
|
|
|
env,
|
|
|
|
pending_queue,
|
|
|
|
next_update_id,
|
|
|
|
updates,
|
|
|
|
state,
|
|
|
|
notification_sender,
|
2021-05-29 00:08:17 +02:00
|
|
|
path: path.as_ref().to_owned(),
|
2021-05-10 20:20:36 +02:00
|
|
|
},
|
|
|
|
notification_receiver,
|
|
|
|
))
|
2021-04-29 14:45:08 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn open(
|
|
|
|
options: EnvOpenOptions,
|
|
|
|
path: impl AsRef<Path>,
|
|
|
|
index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static,
|
2021-06-09 16:19:45 +02:00
|
|
|
must_exit: Arc<AtomicBool>,
|
2021-06-15 17:39:07 +02:00
|
|
|
) -> anyhow::Result<Arc<Self>> {
|
2021-05-26 22:52:06 +02:00
|
|
|
let (update_store, mut notification_receiver) = Self::new(options, path)?;
|
2021-04-29 14:45:08 +02:00
|
|
|
let update_store = Arc::new(update_store);
|
2021-03-20 17:24:08 +01:00
|
|
|
|
2021-05-26 22:52:06 +02:00
|
|
|
// Send a first notification to trigger the process.
|
2021-06-30 17:29:22 +02:00
|
|
|
if let Err(TrySendError::Closed(())) = update_store.notification_sender.try_send(()) {
|
|
|
|
panic!("Failed to init update store");
|
|
|
|
}
|
2021-04-14 17:53:12 +02:00
|
|
|
|
2021-02-27 10:19:05 +01:00
|
|
|
// We need a weak reference so we can take ownership on the arc later when we
|
|
|
|
// want to close the index.
|
2021-06-30 17:29:22 +02:00
|
|
|
let duration = Duration::from_secs(10 * 60); // 10 minutes
|
2021-02-27 10:19:05 +01:00
|
|
|
let update_store_weak = Arc::downgrade(&update_store);
|
2021-03-04 17:25:02 +01:00
|
|
|
tokio::task::spawn(async move {
|
2021-06-30 17:29:22 +02:00
|
|
|
// Block and wait for something to process with a timeout. The timeout
|
|
|
|
// function returns a Result and we must just unlock the loop on Result.
|
|
|
|
'outer: while timeout(duration, notification_receiver.recv())
|
|
|
|
.await
|
2021-07-01 14:51:44 +02:00
|
|
|
.map_or(true, |o| o.is_some())
|
2021-06-30 17:29:22 +02:00
|
|
|
{
|
2021-02-27 10:19:05 +01:00
|
|
|
loop {
|
|
|
|
match update_store_weak.upgrade() {
|
|
|
|
Some(update_store) => {
|
2021-04-22 10:14:29 +02:00
|
|
|
let handler = index_handle.clone();
|
2021-03-04 17:25:02 +01:00
|
|
|
let res = tokio::task::spawn_blocking(move || {
|
|
|
|
update_store.process_pending_update(handler)
|
|
|
|
})
|
|
|
|
.await
|
2021-03-08 15:53:16 +01:00
|
|
|
.expect("Fatal error processing update.");
|
2021-03-04 17:25:02 +01:00
|
|
|
match res {
|
2021-02-27 10:19:05 +01:00
|
|
|
Ok(Some(_)) => (),
|
|
|
|
Ok(None) => break,
|
2021-06-09 16:19:45 +02:00
|
|
|
Err(e) => {
|
2021-06-10 15:55:44 +02:00
|
|
|
error!("Fatal error while processing an update that requires the update store to shutdown: {}", e);
|
2021-06-09 16:19:45 +02:00
|
|
|
must_exit.store(true, Ordering::SeqCst);
|
|
|
|
break 'outer;
|
|
|
|
}
|
2021-02-27 10:19:05 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
// the ownership on the arc has been taken, we need to exit.
|
|
|
|
None => break 'outer,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-06-10 15:55:44 +02:00
|
|
|
error!("Update store loop exited.");
|
|
|
|
});
|
2021-06-09 16:19:45 +02:00
|
|
|
|
2021-02-27 10:19:05 +01:00
|
|
|
Ok(update_store)
|
|
|
|
}
|
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
/// Returns the next global update id and the next update id for a given `index_uuid`.
|
|
|
|
fn next_update_id(&self, txn: &mut heed::RwTxn, index_uuid: Uuid) -> heed::Result<(u64, u64)> {
|
|
|
|
let global_id = self
|
|
|
|
.next_update_id
|
|
|
|
.get(txn, &NextIdKey::Global)?
|
|
|
|
.map(U64::get)
|
|
|
|
.unwrap_or_default();
|
2021-05-06 18:44:16 +02:00
|
|
|
|
|
|
|
self.next_update_id
|
|
|
|
.put(txn, &NextIdKey::Global, &BEU64::new(global_id + 1))?;
|
|
|
|
|
|
|
|
let update_id = self.next_update_id_raw(txn, index_uuid)?;
|
|
|
|
|
|
|
|
Ok((global_id, update_id))
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns the next next update id for a given `index_uuid` without
|
|
|
|
/// incrementing the global update id. This is useful for the dumps.
|
|
|
|
fn next_update_id_raw(&self, txn: &mut heed::RwTxn, index_uuid: Uuid) -> heed::Result<u64> {
|
2021-04-22 10:14:29 +02:00
|
|
|
let update_id = self
|
|
|
|
.next_update_id
|
|
|
|
.get(txn, &NextIdKey::Index(index_uuid))?
|
|
|
|
.map(U64::get)
|
|
|
|
.unwrap_or_default();
|
|
|
|
|
|
|
|
self.next_update_id.put(
|
|
|
|
txn,
|
|
|
|
&NextIdKey::Index(index_uuid),
|
|
|
|
&BEU64::new(update_id + 1),
|
|
|
|
)?;
|
|
|
|
|
2021-05-06 18:44:16 +02:00
|
|
|
Ok(update_id)
|
2021-02-27 10:19:05 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Registers the update content in the pending store and the meta
|
|
|
|
/// into the pending-meta store. Returns the new unique update id.
|
|
|
|
pub fn register_update(
|
|
|
|
&self,
|
2021-04-22 10:14:29 +02:00
|
|
|
meta: UpdateMeta,
|
2021-05-29 00:08:17 +02:00
|
|
|
content: Option<Uuid>,
|
2021-02-27 10:19:05 +01:00
|
|
|
index_uuid: Uuid,
|
2021-04-22 10:14:29 +02:00
|
|
|
) -> heed::Result<Enqueued> {
|
|
|
|
let mut txn = self.env.write_txn()?;
|
|
|
|
|
|
|
|
let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?;
|
2021-06-10 15:32:45 +02:00
|
|
|
let meta = Enqueued::new(meta, update_id, content);
|
2021-04-22 10:14:29 +02:00
|
|
|
|
|
|
|
self.pending_queue
|
|
|
|
.put(&mut txn, &(global_id, index_uuid, update_id), &meta)?;
|
|
|
|
|
|
|
|
txn.commit()?;
|
2021-02-27 10:19:05 +01:00
|
|
|
|
2021-06-30 17:29:22 +02:00
|
|
|
if let Err(TrySendError::Closed(())) = self.notification_sender.try_send(()) {
|
|
|
|
panic!("Update store loop exited");
|
|
|
|
}
|
|
|
|
|
2021-02-27 10:19:05 +01:00
|
|
|
Ok(meta)
|
|
|
|
}
|
2021-04-12 16:59:16 +02:00
|
|
|
|
2021-05-06 18:44:16 +02:00
|
|
|
/// Push already processed update in the UpdateStore without triggering the notification
|
|
|
|
/// process. This is useful for the dumps.
|
2021-05-10 20:20:36 +02:00
|
|
|
pub fn register_raw_updates(
|
2021-04-29 14:45:08 +02:00
|
|
|
&self,
|
2021-05-06 18:44:16 +02:00
|
|
|
wtxn: &mut heed::RwTxn,
|
2021-05-30 12:35:17 +02:00
|
|
|
update: &UpdateStatus,
|
2021-04-29 14:45:08 +02:00
|
|
|
index_uuid: Uuid,
|
|
|
|
) -> heed::Result<()> {
|
2021-05-06 18:44:16 +02:00
|
|
|
match update {
|
|
|
|
UpdateStatus::Enqueued(enqueued) => {
|
2021-05-10 20:20:36 +02:00
|
|
|
let (global_id, _update_id) = self.next_update_id(wtxn, index_uuid)?;
|
|
|
|
self.pending_queue.remap_key_type::<PendingKeyCodec>().put(
|
|
|
|
wtxn,
|
|
|
|
&(global_id, index_uuid, enqueued.id()),
|
|
|
|
&enqueued,
|
|
|
|
)?;
|
2021-05-06 18:44:16 +02:00
|
|
|
}
|
|
|
|
_ => {
|
2021-05-10 20:20:36 +02:00
|
|
|
let _update_id = self.next_update_id_raw(wtxn, index_uuid)?;
|
2021-06-15 17:39:07 +02:00
|
|
|
self.updates
|
|
|
|
.put(wtxn, &(index_uuid, update.id()), &update)?;
|
2021-05-06 18:44:16 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(())
|
2021-04-29 14:45:08 +02:00
|
|
|
}
|
|
|
|
|
2021-02-27 10:19:05 +01:00
|
|
|
/// Executes the user provided function on the next pending update (the one with the lowest id).
|
|
|
|
/// This is asynchronous as it let the user process the update with a read-only txn and
|
|
|
|
/// only writing the result meta to the processed-meta store *after* it has been processed.
|
2021-06-15 17:39:07 +02:00
|
|
|
fn process_pending_update(&self, index_handle: impl IndexActorHandle) -> Result<Option<()>> {
|
2021-02-27 10:19:05 +01:00
|
|
|
// Create a read transaction to be able to retrieve the pending update in order.
|
|
|
|
let rtxn = self.env.read_txn()?;
|
2021-06-10 15:32:45 +02:00
|
|
|
let first_meta = self.pending_queue.first(&rtxn)?;
|
2021-04-22 10:14:29 +02:00
|
|
|
drop(rtxn);
|
2021-02-27 10:19:05 +01:00
|
|
|
|
|
|
|
// If there is a pending update we process and only keep
|
|
|
|
// a reader while processing it, not a writer.
|
|
|
|
match first_meta {
|
2021-06-09 16:19:45 +02:00
|
|
|
Some(((global_id, index_uuid, _), mut pending)) => {
|
|
|
|
let content = pending.content.take();
|
2021-02-27 10:19:05 +01:00
|
|
|
let processing = pending.processing();
|
2021-04-22 10:14:29 +02:00
|
|
|
// Acquire the state lock and set the current state to processing.
|
2021-05-25 16:33:09 +02:00
|
|
|
// txn must *always* be acquired after state lock, or it will dead lock.
|
2021-04-22 10:14:29 +02:00
|
|
|
let state = self.state.write();
|
|
|
|
state.swap(State::Processing(index_uuid, processing.clone()));
|
|
|
|
|
2021-06-09 16:19:45 +02:00
|
|
|
let result =
|
|
|
|
self.perform_update(content, processing, index_handle, index_uuid, global_id);
|
2021-04-22 10:14:29 +02:00
|
|
|
|
|
|
|
state.swap(State::Idle);
|
2021-02-27 10:19:05 +01:00
|
|
|
|
2021-06-09 16:19:45 +02:00
|
|
|
result
|
2021-03-04 17:25:02 +01:00
|
|
|
}
|
|
|
|
None => Ok(None),
|
2021-02-27 10:19:05 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-06-09 16:19:45 +02:00
|
|
|
fn perform_update(
|
|
|
|
&self,
|
|
|
|
content: Option<Uuid>,
|
|
|
|
processing: Processing,
|
|
|
|
index_handle: impl IndexActorHandle,
|
|
|
|
index_uuid: Uuid,
|
|
|
|
global_id: u64,
|
2021-06-14 21:26:35 +02:00
|
|
|
) -> Result<Option<()>> {
|
2021-06-09 16:19:45 +02:00
|
|
|
let content_path = content.map(|uuid| update_uuid_to_file_path(&self.path, uuid));
|
|
|
|
let update_id = processing.id();
|
|
|
|
|
|
|
|
let file = match content_path {
|
|
|
|
Some(ref path) => {
|
|
|
|
let file = File::open(path)?;
|
|
|
|
Some(file)
|
|
|
|
}
|
|
|
|
None => None,
|
|
|
|
};
|
|
|
|
|
|
|
|
// Process the pending update using the provided user function.
|
|
|
|
let handle = Handle::current();
|
|
|
|
let result =
|
|
|
|
match handle.block_on(index_handle.update(index_uuid, processing.clone(), file)) {
|
|
|
|
Ok(result) => result,
|
2021-06-21 18:42:47 +02:00
|
|
|
Err(e) => Err(processing.fail(e.into())),
|
2021-06-09 16:19:45 +02:00
|
|
|
};
|
|
|
|
|
|
|
|
// Once the pending update have been successfully processed
|
|
|
|
// we must remove the content from the pending and processing stores and
|
|
|
|
// write the *new* meta to the processed-meta store and commit.
|
|
|
|
let mut wtxn = self.env.write_txn()?;
|
|
|
|
self.pending_queue
|
|
|
|
.delete(&mut wtxn, &(global_id, index_uuid, update_id))?;
|
|
|
|
|
|
|
|
let result = match result {
|
|
|
|
Ok(res) => res.into(),
|
|
|
|
Err(res) => res.into(),
|
|
|
|
};
|
|
|
|
|
2021-06-15 17:39:07 +02:00
|
|
|
self.updates
|
|
|
|
.put(&mut wtxn, &(index_uuid, update_id), &result)?;
|
2021-06-09 16:19:45 +02:00
|
|
|
|
|
|
|
wtxn.commit()?;
|
|
|
|
|
|
|
|
if let Some(ref path) = content_path {
|
|
|
|
remove_file(&path)?;
|
|
|
|
}
|
|
|
|
|
|
|
|
Ok(Some(()))
|
|
|
|
}
|
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
/// List the updates for `index_uuid`.
|
2021-06-14 21:26:35 +02:00
|
|
|
pub fn list(&self, index_uuid: Uuid) -> Result<Vec<UpdateStatus>> {
|
2021-04-22 10:14:29 +02:00
|
|
|
let mut update_list = BTreeMap::<u64, UpdateStatus>::new();
|
2021-02-27 10:19:05 +01:00
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
let txn = self.env.read_txn()?;
|
|
|
|
|
|
|
|
let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
|
|
|
|
for entry in pendings {
|
|
|
|
let ((_, uuid, id), pending) = entry?;
|
2021-04-12 16:59:16 +02:00
|
|
|
if uuid == index_uuid {
|
2021-04-22 10:14:29 +02:00
|
|
|
update_list.insert(id, pending.decode()?.into());
|
2021-04-12 16:59:16 +02:00
|
|
|
}
|
2021-03-11 20:58:51 +01:00
|
|
|
}
|
2021-02-27 10:19:05 +01:00
|
|
|
|
2021-06-09 17:10:10 +02:00
|
|
|
let updates = self
|
|
|
|
.updates
|
|
|
|
.remap_key_type::<ByteSlice>()
|
|
|
|
.prefix_iter(&txn, index_uuid.as_bytes())?;
|
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
for entry in updates {
|
|
|
|
let (_, update) = entry?;
|
|
|
|
update_list.insert(update.id(), update);
|
|
|
|
}
|
2021-03-11 20:58:51 +01:00
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
// If the currently processing update is from this index, replace the corresponding pending update with this one.
|
|
|
|
match *self.state.read() {
|
|
|
|
State::Processing(uuid, ref processing) if uuid == index_uuid => {
|
|
|
|
update_list.insert(processing.id(), processing.clone().into());
|
|
|
|
}
|
|
|
|
_ => (),
|
|
|
|
}
|
2021-03-11 20:58:51 +01:00
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
Ok(update_list.into_iter().map(|(_, v)| v).collect())
|
2021-02-27 10:19:05 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns the update associated meta or `None` if the update doesn't exist.
|
2021-04-22 10:14:29 +02:00
|
|
|
pub fn meta(&self, index_uuid: Uuid, update_id: u64) -> heed::Result<Option<UpdateStatus>> {
|
|
|
|
// Check if the update is the one currently processing
|
|
|
|
match *self.state.read() {
|
|
|
|
State::Processing(uuid, ref processing)
|
|
|
|
if uuid == index_uuid && processing.id() == update_id =>
|
|
|
|
{
|
|
|
|
return Ok(Some(processing.clone().into()));
|
2021-02-27 10:19:05 +01:00
|
|
|
}
|
2021-04-22 10:14:29 +02:00
|
|
|
_ => (),
|
2021-02-27 10:19:05 +01:00
|
|
|
}
|
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
let txn = self.env.read_txn()?;
|
|
|
|
// Else, check if it is in the updates database:
|
2021-06-10 15:32:45 +02:00
|
|
|
let update = self.updates.get(&txn, &(index_uuid, update_id))?;
|
2021-02-27 10:19:05 +01:00
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
if let Some(update) = update {
|
|
|
|
return Ok(Some(update));
|
2021-02-27 10:19:05 +01:00
|
|
|
}
|
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
// If nothing was found yet, we resolve to iterate over the pending queue.
|
2021-06-09 17:10:10 +02:00
|
|
|
let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
|
2021-04-22 10:14:29 +02:00
|
|
|
|
|
|
|
for entry in pendings {
|
2021-06-09 17:10:10 +02:00
|
|
|
let ((_, uuid, id), pending) = entry?;
|
2021-04-22 10:14:29 +02:00
|
|
|
if uuid == index_uuid && id == update_id {
|
2021-06-10 15:32:45 +02:00
|
|
|
return Ok(Some(pending.decode()?.into()));
|
2021-04-22 10:14:29 +02:00
|
|
|
}
|
2021-02-27 10:19:05 +01:00
|
|
|
}
|
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
// No update was found.
|
2021-02-27 10:19:05 +01:00
|
|
|
Ok(None)
|
|
|
|
}
|
|
|
|
|
2021-06-16 14:52:06 +02:00
|
|
|
/// Delete all updates for an index from the update store. If the currently processing update
|
|
|
|
/// is for `index_uuid`, the call will block until the update is terminated.
|
2021-06-14 21:26:35 +02:00
|
|
|
pub fn delete_all(&self, index_uuid: Uuid) -> Result<()> {
|
2021-04-22 10:14:29 +02:00
|
|
|
let mut txn = self.env.write_txn()?;
|
|
|
|
// Contains all the content file paths that we need to be removed if the deletion was successful.
|
2021-05-29 00:08:17 +02:00
|
|
|
let mut uuids_to_remove = Vec::new();
|
2021-02-27 10:19:05 +01:00
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
let mut pendings = self.pending_queue.iter_mut(&mut txn)?.lazily_decode_data();
|
2021-02-27 10:19:05 +01:00
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
while let Some(Ok(((_, uuid, _), pending))) = pendings.next() {
|
|
|
|
if uuid == index_uuid {
|
2021-06-29 10:25:47 +02:00
|
|
|
unsafe {
|
|
|
|
pendings.del_current()?;
|
|
|
|
}
|
2021-04-22 10:14:29 +02:00
|
|
|
let mut pending = pending.decode()?;
|
2021-05-29 00:08:17 +02:00
|
|
|
if let Some(update_uuid) = pending.content.take() {
|
|
|
|
uuids_to_remove.push(update_uuid);
|
2021-04-22 10:14:29 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2021-02-27 10:19:05 +01:00
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
drop(pendings);
|
2021-02-27 10:19:05 +01:00
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
let mut updates = self
|
|
|
|
.updates
|
2021-06-09 17:10:10 +02:00
|
|
|
.remap_key_type::<ByteSlice>()
|
2021-04-22 10:14:29 +02:00
|
|
|
.prefix_iter_mut(&mut txn, index_uuid.as_bytes())?
|
|
|
|
.lazily_decode_data();
|
2021-02-27 10:19:05 +01:00
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
while let Some(_) = updates.next() {
|
2021-06-29 10:25:47 +02:00
|
|
|
unsafe {
|
|
|
|
updates.del_current()?;
|
|
|
|
}
|
2021-02-27 10:19:05 +01:00
|
|
|
}
|
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
drop(updates);
|
2021-03-20 17:24:08 +01:00
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
txn.commit()?;
|
2021-04-13 17:14:02 +02:00
|
|
|
|
2021-06-16 14:52:06 +02:00
|
|
|
// If the currently processing update is from our index, we wait until it is
|
|
|
|
// finished before returning. This ensure that no write to the index occurs after we delete it.
|
|
|
|
if let State::Processing(uuid, _) = *self.state.read() {
|
|
|
|
if uuid == index_uuid {
|
|
|
|
// wait for a write lock, do nothing with it.
|
|
|
|
self.state.write();
|
|
|
|
}
|
|
|
|
}
|
2021-04-13 17:14:02 +02:00
|
|
|
|
2021-07-05 09:43:48 +02:00
|
|
|
// Finally, remove any outstanding update files. This must be done after waiting for the
|
|
|
|
// last update to ensure that the update files are not deleted before the update needs
|
|
|
|
// them.
|
|
|
|
uuids_to_remove
|
|
|
|
.iter()
|
|
|
|
.map(|uuid| update_uuid_to_file_path(&self.path, *uuid))
|
|
|
|
.for_each(|path| {
|
|
|
|
let _ = remove_file(path);
|
|
|
|
});
|
|
|
|
|
2021-04-13 17:14:02 +02:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2021-05-11 12:18:10 +02:00
|
|
|
pub fn snapshot(
|
|
|
|
&self,
|
|
|
|
uuids: &HashSet<Uuid>,
|
|
|
|
path: impl AsRef<Path>,
|
|
|
|
handle: impl IndexActorHandle + Clone,
|
2021-06-14 21:26:35 +02:00
|
|
|
) -> Result<()> {
|
2021-04-22 10:14:29 +02:00
|
|
|
let state_lock = self.state.write();
|
|
|
|
state_lock.swap(State::Snapshoting);
|
|
|
|
|
|
|
|
let txn = self.env.write_txn()?;
|
|
|
|
|
2021-03-20 17:24:08 +01:00
|
|
|
let update_path = path.as_ref().join("updates");
|
|
|
|
create_dir_all(&update_path)?;
|
|
|
|
|
|
|
|
// acquire write lock to prevent further writes during snapshot
|
2021-04-14 17:53:12 +02:00
|
|
|
create_dir_all(&update_path)?;
|
|
|
|
let db_path = update_path.join("data.mdb");
|
2021-03-20 17:24:08 +01:00
|
|
|
|
|
|
|
// create db snapshot
|
2021-04-22 10:14:29 +02:00
|
|
|
self.env.copy_to_path(&db_path, CompactionOption::Enabled)?;
|
2021-03-20 17:24:08 +01:00
|
|
|
|
2021-05-29 00:08:17 +02:00
|
|
|
let update_files_path = update_path.join(UPDATE_DIR);
|
2021-03-20 17:24:08 +01:00
|
|
|
create_dir_all(&update_files_path)?;
|
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
|
|
|
|
|
|
|
|
for entry in pendings {
|
|
|
|
let ((_, uuid, _), pending) = entry?;
|
|
|
|
if uuids.contains(&uuid) {
|
2021-05-31 16:03:39 +02:00
|
|
|
if let Enqueued {
|
|
|
|
content: Some(uuid),
|
|
|
|
..
|
|
|
|
} = pending.decode()?
|
|
|
|
{
|
2021-05-29 00:08:17 +02:00
|
|
|
let path = update_uuid_to_file_path(&self.path, uuid);
|
|
|
|
copy(path, &update_files_path)?;
|
2021-04-22 10:14:29 +02:00
|
|
|
}
|
|
|
|
}
|
2021-03-20 17:24:08 +01:00
|
|
|
}
|
|
|
|
|
2021-05-11 12:18:10 +02:00
|
|
|
let path = &path.as_ref().to_path_buf();
|
|
|
|
let handle = &handle;
|
|
|
|
// Perform the snapshot of each index concurently. Only a third of the capabilities of
|
|
|
|
// the index actor at a time not to put too much pressure on the index actor
|
|
|
|
let mut stream = futures::stream::iter(uuids.iter())
|
|
|
|
.map(move |uuid| handle.snapshot(*uuid, path.clone()))
|
|
|
|
.buffer_unordered(CONCURRENT_INDEX_MSG / 3);
|
|
|
|
|
|
|
|
Handle::current().block_on(async {
|
|
|
|
while let Some(res) = stream.next().await {
|
|
|
|
res?;
|
|
|
|
}
|
2021-06-14 21:26:35 +02:00
|
|
|
Ok(()) as Result<()>
|
2021-05-11 12:18:10 +02:00
|
|
|
})?;
|
|
|
|
|
2021-03-20 17:24:08 +01:00
|
|
|
Ok(())
|
|
|
|
}
|
2021-04-09 14:41:24 +02:00
|
|
|
|
2021-06-14 21:26:35 +02:00
|
|
|
pub fn get_info(&self) -> Result<UpdateStoreInfo> {
|
2021-04-28 16:43:49 +02:00
|
|
|
let mut size = self.env.size();
|
|
|
|
let txn = self.env.read_txn()?;
|
2021-04-22 10:14:29 +02:00
|
|
|
for entry in self.pending_queue.iter(&txn)? {
|
|
|
|
let (_, pending) = entry?;
|
2021-05-31 16:03:39 +02:00
|
|
|
if let Enqueued {
|
|
|
|
content: Some(uuid),
|
|
|
|
..
|
|
|
|
} = pending
|
|
|
|
{
|
2021-05-29 00:08:17 +02:00
|
|
|
let path = update_uuid_to_file_path(&self.path, uuid);
|
2021-04-22 10:14:29 +02:00
|
|
|
size += File::open(path)?.metadata()?.len();
|
2021-04-09 14:41:24 +02:00
|
|
|
}
|
|
|
|
}
|
2021-04-22 10:14:29 +02:00
|
|
|
let processing = match *self.state.read() {
|
|
|
|
State::Processing(uuid, _) => Some(uuid),
|
|
|
|
_ => None,
|
|
|
|
};
|
|
|
|
|
|
|
|
Ok(UpdateStoreInfo { size, processing })
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-05-29 00:08:17 +02:00
|
|
|
fn update_uuid_to_file_path(root: impl AsRef<Path>, uuid: Uuid) -> PathBuf {
|
2021-05-31 16:03:39 +02:00
|
|
|
root.as_ref()
|
|
|
|
.join(UPDATE_DIR)
|
|
|
|
.join(format!("update_{}", uuid))
|
2021-05-29 00:08:17 +02:00
|
|
|
}
|
|
|
|
|
2021-04-22 10:14:29 +02:00
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
|
|
|
use super::*;
|
2021-06-23 14:48:33 +02:00
|
|
|
use crate::index_controller::{
|
|
|
|
index_actor::{error::IndexActorError, MockIndexActorHandle},
|
|
|
|
UpdateResult,
|
|
|
|
};
|
2021-04-22 10:14:29 +02:00
|
|
|
|
|
|
|
use futures::future::ok;
|
|
|
|
|
|
|
|
#[actix_rt::test]
|
|
|
|
async fn test_next_id() {
|
|
|
|
let dir = tempfile::tempdir_in(".").unwrap();
|
|
|
|
let mut options = EnvOpenOptions::new();
|
|
|
|
let handle = Arc::new(MockIndexActorHandle::new());
|
|
|
|
options.map_size(4096 * 100);
|
2021-06-09 16:19:45 +02:00
|
|
|
let update_store = UpdateStore::open(
|
|
|
|
options,
|
|
|
|
dir.path(),
|
|
|
|
handle,
|
|
|
|
Arc::new(AtomicBool::new(false)),
|
|
|
|
)
|
|
|
|
.unwrap();
|
2021-04-22 10:14:29 +02:00
|
|
|
|
|
|
|
let index1_uuid = Uuid::new_v4();
|
|
|
|
let index2_uuid = Uuid::new_v4();
|
|
|
|
|
|
|
|
let mut txn = update_store.env.write_txn().unwrap();
|
|
|
|
let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
|
|
|
|
txn.commit().unwrap();
|
|
|
|
assert_eq!((0, 0), ids);
|
|
|
|
|
|
|
|
let mut txn = update_store.env.write_txn().unwrap();
|
|
|
|
let ids = update_store.next_update_id(&mut txn, index2_uuid).unwrap();
|
|
|
|
txn.commit().unwrap();
|
|
|
|
assert_eq!((1, 0), ids);
|
|
|
|
|
|
|
|
let mut txn = update_store.env.write_txn().unwrap();
|
|
|
|
let ids = update_store.next_update_id(&mut txn, index1_uuid).unwrap();
|
|
|
|
txn.commit().unwrap();
|
|
|
|
assert_eq!((2, 1), ids);
|
|
|
|
}
|
|
|
|
|
|
|
|
#[actix_rt::test]
|
|
|
|
async fn test_register_update() {
|
|
|
|
let dir = tempfile::tempdir_in(".").unwrap();
|
|
|
|
let mut options = EnvOpenOptions::new();
|
|
|
|
let handle = Arc::new(MockIndexActorHandle::new());
|
|
|
|
options.map_size(4096 * 100);
|
2021-06-09 16:19:45 +02:00
|
|
|
let update_store = UpdateStore::open(
|
|
|
|
options,
|
|
|
|
dir.path(),
|
|
|
|
handle,
|
|
|
|
Arc::new(AtomicBool::new(false)),
|
|
|
|
)
|
|
|
|
.unwrap();
|
2021-04-22 10:14:29 +02:00
|
|
|
let meta = UpdateMeta::ClearDocuments;
|
|
|
|
let uuid = Uuid::new_v4();
|
|
|
|
let store_clone = update_store.clone();
|
|
|
|
tokio::task::spawn_blocking(move || {
|
2021-05-31 16:40:59 +02:00
|
|
|
store_clone.register_update(meta, None, uuid).unwrap();
|
2021-04-22 10:14:29 +02:00
|
|
|
})
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let txn = update_store.env.read_txn().unwrap();
|
|
|
|
assert!(update_store
|
|
|
|
.pending_queue
|
|
|
|
.get(&txn, &(0, uuid, 0))
|
|
|
|
.unwrap()
|
|
|
|
.is_some());
|
|
|
|
}
|
|
|
|
|
|
|
|
#[actix_rt::test]
|
|
|
|
async fn test_process_update() {
|
|
|
|
let dir = tempfile::tempdir_in(".").unwrap();
|
|
|
|
let mut handle = MockIndexActorHandle::new();
|
|
|
|
|
|
|
|
handle
|
|
|
|
.expect_update()
|
|
|
|
.times(2)
|
|
|
|
.returning(|_index_uuid, processing, _file| {
|
|
|
|
if processing.id() == 0 {
|
|
|
|
Box::pin(ok(Ok(processing.process(UpdateResult::Other))))
|
|
|
|
} else {
|
2021-06-23 14:48:33 +02:00
|
|
|
Box::pin(ok(Err(
|
|
|
|
processing.fail(IndexActorError::ExistingPrimaryKey.into())
|
|
|
|
)))
|
2021-04-22 10:14:29 +02:00
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
let handle = Arc::new(handle);
|
|
|
|
|
|
|
|
let mut options = EnvOpenOptions::new();
|
|
|
|
options.map_size(4096 * 100);
|
2021-06-09 16:19:45 +02:00
|
|
|
let store = UpdateStore::open(
|
|
|
|
options,
|
|
|
|
dir.path(),
|
|
|
|
handle.clone(),
|
|
|
|
Arc::new(AtomicBool::new(false)),
|
|
|
|
)
|
|
|
|
.unwrap();
|
2021-04-22 10:14:29 +02:00
|
|
|
|
|
|
|
// wait a bit for the event loop exit.
|
|
|
|
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
|
|
|
|
|
|
|
|
let mut txn = store.env.write_txn().unwrap();
|
|
|
|
|
|
|
|
let update = Enqueued::new(UpdateMeta::ClearDocuments, 0, None);
|
|
|
|
let uuid = Uuid::new_v4();
|
|
|
|
|
|
|
|
store
|
|
|
|
.pending_queue
|
|
|
|
.put(&mut txn, &(0, uuid, 0), &update)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let update = Enqueued::new(UpdateMeta::ClearDocuments, 1, None);
|
|
|
|
|
|
|
|
store
|
|
|
|
.pending_queue
|
|
|
|
.put(&mut txn, &(1, uuid, 1), &update)
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
txn.commit().unwrap();
|
|
|
|
|
|
|
|
// Process the pending, and check that it has been moved to the update databases, and
|
|
|
|
// removed from the pending database.
|
|
|
|
let store_clone = store.clone();
|
|
|
|
tokio::task::spawn_blocking(move || {
|
|
|
|
store_clone.process_pending_update(handle.clone()).unwrap();
|
|
|
|
store_clone.process_pending_update(handle).unwrap();
|
|
|
|
})
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
let txn = store.env.read_txn().unwrap();
|
|
|
|
|
|
|
|
assert!(store.pending_queue.first(&txn).unwrap().is_none());
|
2021-06-15 17:39:07 +02:00
|
|
|
let update = store.updates.get(&txn, &(uuid, 0)).unwrap().unwrap();
|
2021-04-22 10:14:29 +02:00
|
|
|
|
|
|
|
assert!(matches!(update, UpdateStatus::Processed(_)));
|
2021-06-15 17:39:07 +02:00
|
|
|
let update = store.updates.get(&txn, &(uuid, 1)).unwrap().unwrap();
|
2021-04-22 10:14:29 +02:00
|
|
|
|
|
|
|
assert!(matches!(update, UpdateStatus::Failed(_)));
|
2021-04-09 14:41:24 +02:00
|
|
|
}
|
2021-02-27 10:19:05 +01:00
|
|
|
}
|