From a03eef65118af2c5667e0b1338ee3e576194c95a Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Tue, 4 Mar 2025 17:46:04 +0100 Subject: [PATCH] Support rollback --- crates/milli/src/index.rs | 112 +++++++++++++++++++++++++++++++++++--- 1 file changed, 104 insertions(+), 8 deletions(-) diff --git a/crates/milli/src/index.rs b/crates/milli/src/index.rs index e91d5e253..8e1cc2a0f 100644 --- a/crates/milli/src/index.rs +++ b/crates/milli/src/index.rs @@ -3,8 +3,8 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::fs::File; use std::path::Path; -use heed::{types::*, DatabaseStat, WithoutTls}; -use heed::{CompactionOption, Database, RoTxn, RwTxn, Unspecified}; +use heed::types::*; +use heed::{CompactionOption, Database, DatabaseStat, RoTxn, RwTxn, Unspecified, WithoutTls}; use indexmap::IndexMap; use roaring::RoaringBitmap; use rstar::RTree; @@ -107,6 +107,7 @@ pub mod db_name { pub const VECTOR_ARROY: &str = "vector-arroy"; pub const DOCUMENTS: &str = "documents"; } +const NUMBER_OF_DBS: u32 = 25; #[derive(Clone)] pub struct Index { @@ -186,7 +187,7 @@ impl Index { ) -> Result { use db_name::*; - options.max_dbs(25); + options.max_dbs(NUMBER_OF_DBS); let env = unsafe { options.open(path) }?; let mut wtxn = env.write_txn()?; @@ -261,11 +262,7 @@ impl Index { if this.get_version(&wtxn)?.is_none() && creation { this.put_version( &mut wtxn, - ( - constants::VERSION_MAJOR, - constants::VERSION_MINOR, - constants::VERSION_PATCH, - ), + (constants::VERSION_MAJOR, constants::VERSION_MINOR, constants::VERSION_PATCH), )?; } wtxn.commit()?; @@ -284,6 +281,76 @@ impl Index { Self::new_with_creation_dates(options, path, now, now, creation) } + /// Attempts to rollback the index at `path` to the version specified by `requested_version`. + pub fn rollback>( + mut options: heed::EnvOpenOptions, + path: P, + requested_version: (u32, u32, u32), + ) -> Result { + options.max_dbs(NUMBER_OF_DBS); + + // optimistically check if the index is already at the requested version. + let env = unsafe { options.open(path.as_ref()) }?; + let rtxn = env.read_txn()?; + let Some(main) = env.database_options().name(db_name::MAIN).open(&rtxn)? else { + return Err(crate::Error::InternalError(crate::InternalError::DatabaseMissingEntry { + db_name: db_name::MAIN, + key: None, + })); + }; + let rollback_version = + main.remap_types::().get(&rtxn, main_key::VERSION_KEY)?; + if rollback_version == Some(requested_version) { + return Ok(RollbackOutcome::NoRollback); + } + + // explicitly drop the environment before reopening it. + drop(rtxn); + drop(env); + + // really need to rollback then... + unsafe { options.flags(heed::EnvFlags::PREV_SNAPSHOT) }; + let env = unsafe { options.open(path) }?; + let mut wtxn = env.write_txn()?; + let Some(main) = env.database_options().name(db_name::MAIN).open(&wtxn)? else { + return Err(crate::Error::InternalError(crate::InternalError::DatabaseMissingEntry { + db_name: db_name::MAIN, + key: None, + })); + }; + + let main = main.remap_key_type::(); + + let Some(rollback_version) = + main.remap_data_type::().get(&wtxn, main_key::VERSION_KEY)? + else { + return Ok(RollbackOutcome::VersionMismatch { + requested_version, + rollback_version: None, + }); + }; + + if requested_version != rollback_version { + return Ok(RollbackOutcome::VersionMismatch { + requested_version, + rollback_version: Some(rollback_version), + }); + } + + // this is a bit of a trick to force a change in the index + // which is necessary to actually discard the next snapshot, replacing it with this transaction. + let now = time::OffsetDateTime::now_utc(); + main.remap_data_type::>().put( + &mut wtxn, + main_key::UPDATED_AT_KEY, + &OffsetDateTime(now), + )?; + + wtxn.commit()?; + + Ok(RollbackOutcome::Rollback) + } + fn set_creation_dates( env: &heed::Env, main: Database, @@ -1864,6 +1931,35 @@ pub enum PrefixSearch { Disabled, } +#[derive(Debug)] +pub enum RollbackOutcome { + VersionMismatch { + requested_version: (u32, u32, u32), + rollback_version: Option<(u32, u32, u32)>, + }, + Rollback, + NoRollback, +} + +impl RollbackOutcome { + pub fn succeeded(&self) -> bool { + matches!(self, RollbackOutcome::Rollback | RollbackOutcome::NoRollback) + } +} + +impl std::fmt::Display for RollbackOutcome { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RollbackOutcome::VersionMismatch { requested_version, rollback_version: Some(rollback_version) } => write!(f, "cannot rollback to the requested version\n - note: requested version is v{}.{}.{}\n - note: only possible to rollback to v{}.{}.{}", + requested_version.0, requested_version.1, requested_version.2, rollback_version.0, rollback_version.1, rollback_version.2), + RollbackOutcome::VersionMismatch { requested_version, rollback_version: None } => write!(f, "cannot rollback to the requested version\n - note: requested version is v{}.{}.{}\n - note: only possible to rollback to an unknown version", + requested_version.0, requested_version.1, requested_version.2), + RollbackOutcome::Rollback => f.write_str("rollback complete"), + RollbackOutcome::NoRollback => f.write_str("no rollback necessary"), + } + } +} + #[derive(Serialize, Deserialize)] #[serde(transparent)] struct OffsetDateTime(#[serde(with = "time::serde::rfc3339")] time::OffsetDateTime);