fix the dumpless upgrade potential corruption when upgrading from the v1.12

This commit is contained in:
Tamo 2025-02-05 15:25:05 +01:00
parent 78867b6852
commit 628119e31e
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
4 changed files with 89 additions and 47 deletions

View File

@ -33,7 +33,7 @@ mod test_utils;
pub mod upgrade; pub mod upgrade;
mod utils; mod utils;
pub mod uuid_codec; pub mod uuid_codec;
mod versioning; pub mod versioning;
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
pub type TaskId = u32; pub type TaskId = u32;

View File

@ -1,6 +1,6 @@
use crate::{upgrade::upgrade_index_scheduler, Result}; use crate::{upgrade::upgrade_index_scheduler, Result};
use meilisearch_types::{ use meilisearch_types::{
heed::{types::Str, Database, Env, RoTxn, RwTxn}, heed::{self, types::Str, Database, Env, RoTxn, RwTxn},
milli::heed_codec::version::VersionCodec, milli::heed_codec::version::VersionCodec,
versioning, versioning,
}; };
@ -21,30 +21,38 @@ pub struct Versioning {
} }
impl Versioning { impl Versioning {
pub(crate) const fn nb_db() -> u32 { pub const fn nb_db() -> u32 {
NUMBER_OF_DATABASES NUMBER_OF_DATABASES
} }
pub fn get_version(&self, rtxn: &RoTxn) -> Result<Option<(u32, u32, u32)>> { pub fn get_version(&self, rtxn: &RoTxn) -> Result<Option<(u32, u32, u32)>, heed::Error> {
Ok(self.version.get(rtxn, entry_name::MAIN)?) self.version.get(rtxn, entry_name::MAIN)
} }
pub fn set_version(&self, wtxn: &mut RwTxn, version: (u32, u32, u32)) -> Result<()> { pub fn set_version(
Ok(self.version.put(wtxn, entry_name::MAIN, &version)?) &self,
wtxn: &mut RwTxn,
version: (u32, u32, u32),
) -> Result<(), heed::Error> {
self.version.put(wtxn, entry_name::MAIN, &version)
} }
pub fn set_current_version(&self, wtxn: &mut RwTxn) -> Result<()> { pub fn set_current_version(&self, wtxn: &mut RwTxn) -> Result<(), heed::Error> {
let major = versioning::VERSION_MAJOR.parse().unwrap(); let major = versioning::VERSION_MAJOR.parse().unwrap();
let minor = versioning::VERSION_MINOR.parse().unwrap(); let minor = versioning::VERSION_MINOR.parse().unwrap();
let patch = versioning::VERSION_PATCH.parse().unwrap(); let patch = versioning::VERSION_PATCH.parse().unwrap();
self.set_version(wtxn, (major, minor, patch)) self.set_version(wtxn, (major, minor, patch))
} }
/// Create an index scheduler and start its run loop. /// Return `Self` without checking anything about the version
pub fn raw_new(env: &Env, wtxn: &mut RwTxn) -> Result<Self, heed::Error> {
let version = env.create_database(wtxn, Some(db_name::VERSION))?;
Ok(Self { version })
}
pub(crate) fn new(env: &Env, db_version: (u32, u32, u32)) -> Result<Self> { pub(crate) fn new(env: &Env, db_version: (u32, u32, u32)) -> Result<Self> {
let mut wtxn = env.write_txn()?; let mut wtxn = env.write_txn()?;
let version = env.create_database(&mut wtxn, Some(db_name::VERSION))?; let this = Self::raw_new(env, &mut wtxn)?;
let this = Self { version };
let from = match this.get_version(&wtxn)? { let from = match this.get_version(&wtxn)? {
Some(version) => version, Some(version) => version,
// fresh DB: use the db version // fresh DB: use the db version

View File

@ -2,6 +2,8 @@ use std::fs;
use std::io::{self, ErrorKind}; use std::io::{self, ErrorKind};
use std::path::Path; use std::path::Path;
use milli::heed;
/// The name of the file that contains the version of the database. /// The name of the file that contains the version of the database.
pub const VERSION_FILE_NAME: &str = "VERSION"; pub const VERSION_FILE_NAME: &str = "VERSION";
@ -9,36 +11,6 @@ pub static VERSION_MAJOR: &str = env!("CARGO_PKG_VERSION_MAJOR");
pub static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR"); pub static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR");
pub static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH"); pub static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH");
/// Persists the version of the current Meilisearch binary to a VERSION file
pub fn update_version_file_for_dumpless_upgrade(
db_path: &Path,
from: (u32, u32, u32),
to: (u32, u32, u32),
) -> Result<(), VersionFileError> {
let (from_major, from_minor, from_patch) = from;
let (to_major, to_minor, to_patch) = to;
if from_major > to_major
|| (from_major == to_major && from_minor > to_minor)
|| (from_major == to_major && from_minor == to_minor && from_patch > to_patch)
{
Err(VersionFileError::DowngradeNotSupported {
major: from_major,
minor: from_minor,
patch: from_patch,
})
} else if from_major < 1 || (from_major == to_major && from_minor < 12) {
Err(VersionFileError::TooOldForAutomaticUpgrade {
major: from_major,
minor: from_minor,
patch: from_patch,
})
} else {
create_current_version_file(db_path)?;
Ok(())
}
}
/// Persists the version of the current Meilisearch binary to a VERSION file /// Persists the version of the current Meilisearch binary to a VERSION file
pub fn create_current_version_file(db_path: &Path) -> io::Result<()> { pub fn create_current_version_file(db_path: &Path) -> io::Result<()> {
create_version_file(db_path, VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH) create_version_file(db_path, VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH)
@ -112,6 +84,8 @@ pub enum VersionFileError {
DowngradeNotSupported { major: u32, minor: u32, patch: u32 }, DowngradeNotSupported { major: u32, minor: u32, patch: u32 },
#[error("Database version {major}.{minor}.{patch} is too old for the experimental dumpless upgrade feature. Please generate a dump using the v{major}.{minor}.{patch} and import it in the v{VERSION_MAJOR}.{VERSION_MINOR}.{VERSION_PATCH}")] #[error("Database version {major}.{minor}.{patch} is too old for the experimental dumpless upgrade feature. Please generate a dump using the v{major}.{minor}.{patch} and import it in the v{VERSION_MAJOR}.{VERSION_MINOR}.{VERSION_PATCH}")]
TooOldForAutomaticUpgrade { major: u32, minor: u32, patch: u32 }, TooOldForAutomaticUpgrade { major: u32, minor: u32, patch: u32 },
#[error("Error while modifying the database: {0}")]
ErrorWhileModifyingTheDatabase(#[from] heed::Error),
#[error(transparent)] #[error(transparent)]
IoError(#[from] std::io::Error), IoError(#[from] std::io::Error),

View File

@ -32,6 +32,7 @@ use analytics::Analytics;
use anyhow::bail; use anyhow::bail;
use error::PayloadError; use error::PayloadError;
use extractors::payload::PayloadConfig; use extractors::payload::PayloadConfig;
use index_scheduler::versioning::Versioning;
use index_scheduler::{IndexScheduler, IndexSchedulerOptions}; use index_scheduler::{IndexScheduler, IndexSchedulerOptions};
use meilisearch_auth::AuthController; use meilisearch_auth::AuthController;
use meilisearch_types::milli::constants::VERSION_MAJOR; use meilisearch_types::milli::constants::VERSION_MAJOR;
@ -40,10 +41,9 @@ use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMetho
use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::settings::apply_settings_to_builder;
use meilisearch_types::tasks::KindWithContent; use meilisearch_types::tasks::KindWithContent;
use meilisearch_types::versioning::{ use meilisearch_types::versioning::{
create_current_version_file, get_version, update_version_file_for_dumpless_upgrade, create_current_version_file, get_version, VersionFileError, VERSION_MINOR, VERSION_PATCH,
VersionFileError, VERSION_MINOR, VERSION_PATCH,
}; };
use meilisearch_types::{compression, milli, VERSION_FILE_NAME}; use meilisearch_types::{compression, heed, milli, VERSION_FILE_NAME};
pub use option::Opt; pub use option::Opt;
use option::ScheduleSnapshot; use option::ScheduleSnapshot;
use search_queue::SearchQueue; use search_queue::SearchQueue;
@ -356,14 +356,19 @@ fn open_or_create_database_unchecked(
/// Ensures Meilisearch version is compatible with the database, returns an error in case of version mismatch. /// Ensures Meilisearch version is compatible with the database, returns an error in case of version mismatch.
/// Returns the version that was contained in the version file /// Returns the version that was contained in the version file
fn check_version(opt: &Opt, binary_version: (u32, u32, u32)) -> anyhow::Result<(u32, u32, u32)> { fn check_version(
opt: &Opt,
index_scheduler_opt: &IndexSchedulerOptions,
binary_version: (u32, u32, u32),
) -> anyhow::Result<(u32, u32, u32)> {
let (bin_major, bin_minor, bin_patch) = binary_version; let (bin_major, bin_minor, bin_patch) = binary_version;
let (db_major, db_minor, db_patch) = get_version(&opt.db_path)?; let (db_major, db_minor, db_patch) = get_version(&opt.db_path)?;
if db_major != bin_major || db_minor != bin_minor || db_patch > bin_patch { if db_major != bin_major || db_minor != bin_minor || db_patch > bin_patch {
if opt.experimental_dumpless_upgrade { if opt.experimental_dumpless_upgrade {
update_version_file_for_dumpless_upgrade( update_version_file_for_dumpless_upgrade(
&opt.db_path, opt,
index_scheduler_opt,
(db_major, db_minor, db_patch), (db_major, db_minor, db_patch),
(bin_major, bin_minor, bin_patch), (bin_major, bin_minor, bin_patch),
)?; )?;
@ -380,6 +385,57 @@ fn check_version(opt: &Opt, binary_version: (u32, u32, u32)) -> anyhow::Result<(
Ok((db_major, db_minor, db_patch)) Ok((db_major, db_minor, db_patch))
} }
/// Persists the version of the current Meilisearch binary to a VERSION file
pub fn update_version_file_for_dumpless_upgrade(
opt: &Opt,
index_scheduler_opt: &IndexSchedulerOptions,
from: (u32, u32, u32),
to: (u32, u32, u32),
) -> Result<(), VersionFileError> {
let (from_major, from_minor, from_patch) = from;
let (to_major, to_minor, to_patch) = to;
// Early exit in case of error
if from_major > to_major
|| (from_major == to_major && from_minor > to_minor)
|| (from_major == to_major && from_minor == to_minor && from_patch > to_patch)
{
return Err(VersionFileError::DowngradeNotSupported {
major: from_major,
minor: from_minor,
patch: from_patch,
});
} else if from_major < 1 || (from_major == to_major && from_minor < 12) {
return Err(VersionFileError::TooOldForAutomaticUpgrade {
major: from_major,
minor: from_minor,
patch: from_patch,
});
}
// In the case of v1.12, the index-scheduler didn't store its internal version at the time.
// => We must write it immediately **in the index-scheduler** otherwise we'll update the version file
// there is a risk of DB corruption if a restart happens after writing the version file but before
// writing the version in the index-scheduler. See <https://github.com/meilisearch/meilisearch/issues/5280>
if from_major == 1 && from_minor == 12 {
let env = unsafe {
heed::EnvOpenOptions::new()
.max_dbs(Versioning::nb_db())
.map_size(index_scheduler_opt.task_db_size)
.open(&index_scheduler_opt.tasks_path)
}?;
let mut wtxn = env.write_txn()?;
let versioning = Versioning::raw_new(&env, &mut wtxn)?;
versioning.set_version(&mut wtxn, (from_major, from_minor, from_patch))?;
wtxn.commit()?;
// Should be instant since we're the only one using the env
env.prepare_for_closing().wait();
}
create_current_version_file(&opt.db_path)?;
Ok(())
}
/// Ensure you're in a valid state and open the IndexScheduler + AuthController for you. /// Ensure you're in a valid state and open the IndexScheduler + AuthController for you.
fn open_or_create_database( fn open_or_create_database(
opt: &Opt, opt: &Opt,
@ -387,7 +443,11 @@ fn open_or_create_database(
empty_db: bool, empty_db: bool,
binary_version: (u32, u32, u32), binary_version: (u32, u32, u32),
) -> anyhow::Result<(IndexScheduler, AuthController)> { ) -> anyhow::Result<(IndexScheduler, AuthController)> {
let version = if !empty_db { check_version(opt, binary_version)? } else { binary_version }; let version = if !empty_db {
check_version(opt, &index_scheduler_opt, binary_version)?
} else {
binary_version
};
open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb, version) open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb, version)
} }