Support rollback

This commit is contained in:
Louis Dureuil 2025-03-04 17:46:04 +01:00
parent 42fae9994d
commit a03eef6511
No known key found for this signature in database

View File

@ -3,8 +3,8 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::fs::File;
use std::path::Path;
use heed::{types::*, DatabaseStat, WithoutTls};
use heed::{CompactionOption, Database, RoTxn, RwTxn, Unspecified};
use heed::types::*;
use heed::{CompactionOption, Database, DatabaseStat, RoTxn, RwTxn, Unspecified, WithoutTls};
use indexmap::IndexMap;
use roaring::RoaringBitmap;
use rstar::RTree;
@ -107,6 +107,7 @@ pub mod db_name {
pub const VECTOR_ARROY: &str = "vector-arroy";
pub const DOCUMENTS: &str = "documents";
}
const NUMBER_OF_DBS: u32 = 25;
#[derive(Clone)]
pub struct Index {
@ -186,7 +187,7 @@ impl Index {
) -> Result<Index> {
use db_name::*;
options.max_dbs(25);
options.max_dbs(NUMBER_OF_DBS);
let env = unsafe { options.open(path) }?;
let mut wtxn = env.write_txn()?;
@ -261,11 +262,7 @@ impl Index {
if this.get_version(&wtxn)?.is_none() && creation {
this.put_version(
&mut wtxn,
(
constants::VERSION_MAJOR,
constants::VERSION_MINOR,
constants::VERSION_PATCH,
),
(constants::VERSION_MAJOR, constants::VERSION_MINOR, constants::VERSION_PATCH),
)?;
}
wtxn.commit()?;
@ -284,6 +281,76 @@ impl Index {
Self::new_with_creation_dates(options, path, now, now, creation)
}
/// Attempts to rollback the index at `path` to the version specified by `requested_version`.
pub fn rollback<P: AsRef<Path>>(
mut options: heed::EnvOpenOptions<WithoutTls>,
path: P,
requested_version: (u32, u32, u32),
) -> Result<RollbackOutcome> {
options.max_dbs(NUMBER_OF_DBS);
// optimistically check if the index is already at the requested version.
let env = unsafe { options.open(path.as_ref()) }?;
let rtxn = env.read_txn()?;
let Some(main) = env.database_options().name(db_name::MAIN).open(&rtxn)? else {
return Err(crate::Error::InternalError(crate::InternalError::DatabaseMissingEntry {
db_name: db_name::MAIN,
key: None,
}));
};
let rollback_version =
main.remap_types::<Str, VersionCodec>().get(&rtxn, main_key::VERSION_KEY)?;
if rollback_version == Some(requested_version) {
return Ok(RollbackOutcome::NoRollback);
}
// explicitly drop the environment before reopening it.
drop(rtxn);
drop(env);
// really need to rollback then...
unsafe { options.flags(heed::EnvFlags::PREV_SNAPSHOT) };
let env = unsafe { options.open(path) }?;
let mut wtxn = env.write_txn()?;
let Some(main) = env.database_options().name(db_name::MAIN).open(&wtxn)? else {
return Err(crate::Error::InternalError(crate::InternalError::DatabaseMissingEntry {
db_name: db_name::MAIN,
key: None,
}));
};
let main = main.remap_key_type::<Str>();
let Some(rollback_version) =
main.remap_data_type::<VersionCodec>().get(&wtxn, main_key::VERSION_KEY)?
else {
return Ok(RollbackOutcome::VersionMismatch {
requested_version,
rollback_version: None,
});
};
if requested_version != rollback_version {
return Ok(RollbackOutcome::VersionMismatch {
requested_version,
rollback_version: Some(rollback_version),
});
}
// this is a bit of a trick to force a change in the index
// which is necessary to actually discard the next snapshot, replacing it with this transaction.
let now = time::OffsetDateTime::now_utc();
main.remap_data_type::<SerdeJson<OffsetDateTime>>().put(
&mut wtxn,
main_key::UPDATED_AT_KEY,
&OffsetDateTime(now),
)?;
wtxn.commit()?;
Ok(RollbackOutcome::Rollback)
}
fn set_creation_dates(
env: &heed::Env<WithoutTls>,
main: Database<Unspecified, Unspecified>,
@ -1864,6 +1931,35 @@ pub enum PrefixSearch {
Disabled,
}
#[derive(Debug)]
pub enum RollbackOutcome {
VersionMismatch {
requested_version: (u32, u32, u32),
rollback_version: Option<(u32, u32, u32)>,
},
Rollback,
NoRollback,
}
impl RollbackOutcome {
pub fn succeeded(&self) -> bool {
matches!(self, RollbackOutcome::Rollback | RollbackOutcome::NoRollback)
}
}
impl std::fmt::Display for RollbackOutcome {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
RollbackOutcome::VersionMismatch { requested_version, rollback_version: Some(rollback_version) } => write!(f, "cannot rollback to the requested version\n - note: requested version is v{}.{}.{}\n - note: only possible to rollback to v{}.{}.{}",
requested_version.0, requested_version.1, requested_version.2, rollback_version.0, rollback_version.1, rollback_version.2),
RollbackOutcome::VersionMismatch { requested_version, rollback_version: None } => write!(f, "cannot rollback to the requested version\n - note: requested version is v{}.{}.{}\n - note: only possible to rollback to an unknown version",
requested_version.0, requested_version.1, requested_version.2),
RollbackOutcome::Rollback => f.write_str("rollback complete"),
RollbackOutcome::NoRollback => f.write_str("no rollback necessary"),
}
}
}
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
struct OffsetDateTime(#[serde(with = "time::serde::rfc3339")] time::OffsetDateTime);