add the version in the index-scheduler

This commit is contained in:
Tamo 2025-01-23 00:25:39 +01:00
parent 27d04d7932
commit 7dba0eb5f5
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
8 changed files with 212 additions and 67 deletions

View File

@ -93,15 +93,16 @@ impl FeatureData {
NUMBER_OF_DATABASES
}
pub fn new(env: &Env, instance_features: InstanceTogglableFeatures) -> Result<Self> {
let mut wtxn = env.write_txn()?;
pub fn new(
env: &Env,
wtxn: &mut RwTxn,
instance_features: InstanceTogglableFeatures,
) -> Result<Self> {
let runtime_features_db =
env.create_database(&mut wtxn, Some(db_name::EXPERIMENTAL_FEATURES))?;
wtxn.commit()?;
env.create_database(wtxn, Some(db_name::EXPERIMENTAL_FEATURES))?;
let txn = env.read_txn()?;
let persisted_features: RuntimeTogglableFeatures =
runtime_features_db.get(&txn, db_name::EXPERIMENTAL_FEATURES)?.unwrap_or_default();
runtime_features_db.get(wtxn, db_name::EXPERIMENTAL_FEATURES)?.unwrap_or_default();
let InstanceTogglableFeatures { metrics, logs_route, contains_filter } = instance_features;
let runtime = Arc::new(RwLock::new(RuntimeTogglableFeatures {
metrics: metrics || persisted_features.metrics,

View File

@ -21,6 +21,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
cleanup_enabled: _,
processing_tasks,
env,
version: _,
queue,
scheduler,

View File

@ -33,6 +33,7 @@ mod test_utils;
pub mod upgrade;
mod utils;
pub mod uuid_codec;
mod versioning;
pub type Result<T, E = Error> = std::result::Result<T, E>;
pub type TaskId = u32;
@ -66,6 +67,7 @@ use queue::Queue;
use roaring::RoaringBitmap;
use scheduler::Scheduler;
use time::OffsetDateTime;
use versioning::Versioning;
use crate::index_mapper::IndexMapper;
use crate::utils::clamp_to_page_size;
@ -134,17 +136,18 @@ pub struct IndexScheduler {
/// The list of tasks currently processing
pub(crate) processing_tasks: Arc<RwLock<ProcessingTasks>>,
/// A database containing only the version of the index-scheduler
pub version: versioning::Versioning,
/// The queue containing both the tasks and the batches.
pub queue: queue::Queue,
pub scheduler: scheduler::Scheduler,
/// In charge of creating, opening, storing and returning indexes.
pub(crate) index_mapper: IndexMapper,
/// In charge of fetching and setting the status of experimental features.
features: features::FeatureData,
/// Everything related to the processing of the tasks
pub scheduler: scheduler::Scheduler,
/// Whether we should automatically cleanup the task queue or not.
pub(crate) cleanup_enabled: bool,
@ -179,6 +182,7 @@ impl IndexScheduler {
IndexScheduler {
env: self.env.clone(),
processing_tasks: self.processing_tasks.clone(),
version: self.version.clone(),
queue: self.queue.private_clone(),
scheduler: self.scheduler.private_clone(),
@ -198,13 +202,14 @@ impl IndexScheduler {
}
pub(crate) const fn nb_db() -> u32 {
Queue::nb_db() + IndexMapper::nb_db() + features::FeatureData::nb_db()
Versioning::nb_db() + Queue::nb_db() + IndexMapper::nb_db() + features::FeatureData::nb_db()
}
/// Create an index scheduler and start its run loop.
#[allow(private_interfaces)] // because test_utils is private
pub fn new(
options: IndexSchedulerOptions,
from_db_version: (u32, u32, u32),
#[cfg(test)] test_breakpoint_sdr: crossbeam_channel::Sender<(test_utils::Breakpoint, bool)>,
#[cfg(test)] planned_failures: Vec<(usize, test_utils::FailureLocation)>,
) -> Result<Self> {
@ -241,9 +246,11 @@ impl IndexScheduler {
.open(&options.tasks_path)
}?;
let features = features::FeatureData::new(&env, options.instance_features)?;
// We **must** starts by upgrading the version because it'll also upgrade the required database before we can open them
let version = versioning::Versioning::new(&env, from_db_version)?;
let mut wtxn = env.write_txn()?;
let features = features::FeatureData::new(&env, &mut wtxn, options.instance_features)?;
let queue = Queue::new(&env, &mut wtxn, &options)?;
let index_mapper = IndexMapper::new(&env, &mut wtxn, &options, budget)?;
wtxn.commit()?;
@ -251,6 +258,7 @@ impl IndexScheduler {
// allow unreachable_code to get rids of the warning in the case of a test build.
let this = Self {
processing_tasks: Arc::new(RwLock::new(ProcessingTasks::new())),
version,
queue,
scheduler: Scheduler::new(&options),

View File

@ -9,7 +9,7 @@ use meilisearch_types::document_formats::DocumentFormatError;
use meilisearch_types::milli::update::IndexDocumentsMethod::ReplaceDocuments;
use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::tasks::KindWithContent;
use meilisearch_types::VERSION_FILE_NAME;
use meilisearch_types::{versioning, VERSION_FILE_NAME};
use tempfile::{NamedTempFile, TempDir};
use uuid::Uuid;
use Breakpoint::*;
@ -113,7 +113,13 @@ impl IndexScheduler {
};
configuration(&mut options);
let index_scheduler = Self::new(options, sender, planned_failures).unwrap();
let version = (
versioning::VERSION_MAJOR.parse().unwrap(),
versioning::VERSION_MINOR.parse().unwrap(),
versioning::VERSION_PATCH.parse().unwrap(),
);
let index_scheduler = Self::new(options, version, sender, planned_failures).unwrap();
// To be 100% consistent between all test we're going to start the scheduler right now
// and ensure it's in the expected starting state.
@ -406,8 +412,7 @@ impl IndexSchedulerHandle {
.recv_timeout(std::time::Duration::from_secs(1)) {
Ok((_, true)) => continue,
Ok((b, false)) => panic!("The scheduler was supposed to be down but successfully moved to the next breakpoint: {b:?}"),
Err(RecvTimeoutError::Timeout) => panic!(),
Err(RecvTimeoutError::Disconnected) => break,
Err(RecvTimeoutError::Timeout | RecvTimeoutError::Disconnected) => break,
}
}
}

View File

@ -1,25 +1,28 @@
use std::path::Path;
use anyhow::bail;
use meilisearch_types::heed;
use meilisearch_types::heed::{Env, RwTxn};
use meilisearch_types::tasks::{Details, KindWithContent, Status, Task};
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
use time::OffsetDateTime;
use tracing::info;
use crate::queue::TaskQueue;
use crate::IndexSchedulerOptions;
pub fn upgrade_task_queue(
opt: &IndexSchedulerOptions,
trait UpgradeIndexScheduler {
fn upgrade(&self, env: &Env, wtxn: &mut RwTxn, original: (u32, u32, u32))
-> anyhow::Result<()>;
fn target_version(&self) -> (u32, u32, u32);
}
pub fn upgrade_index_scheduler(
env: &Env,
from: (u32, u32, u32),
to: (u32, u32, u32),
) -> anyhow::Result<()> {
let current_major: u32 = VERSION_MAJOR.parse().unwrap();
let current_minor: u32 = VERSION_MINOR.parse().unwrap();
let current_patch: u32 = VERSION_PATCH.parse().unwrap();
let current_major = to.0;
let current_minor = to.1;
let current_patch = to.2;
let upgrade_functions =
[(v1_12_to_current as fn(&Path) -> anyhow::Result<()>, "Upgrading from v1.12 to v1.13")];
let upgrade_functions: &[&dyn UpgradeIndexScheduler] = &[&V1_12_ToCurrent {}];
let start = match from {
(1, 12, _) => 0,
@ -41,20 +44,23 @@ pub fn upgrade_task_queue(
}
};
let mut current_version = from;
info!("Upgrading the task queue");
for (upgrade, upgrade_name) in upgrade_functions[start..].iter() {
info!("{upgrade_name}");
(upgrade)(&opt.tasks_path)?;
for upgrade in upgrade_functions[start..].iter() {
let target = upgrade.target_version();
info!(
"Upgrading from v{}.{}.{} to v{}.{}.{}",
from.0, from.1, from.2, current_version.0, current_version.1, current_version.2
);
let mut wtxn = env.write_txn()?;
upgrade.upgrade(env, &mut wtxn, from)?;
wtxn.commit()?;
current_version = target;
}
let env = unsafe {
heed::EnvOpenOptions::new()
.max_dbs(TaskQueue::nb_db())
.map_size(opt.task_db_size)
.open(&opt.tasks_path)
}?;
let mut wtxn = env.write_txn()?;
let queue = TaskQueue::new(&env, &mut wtxn)?;
let queue = TaskQueue::new(env, &mut wtxn)?;
let uid = queue.next_task_id(&wtxn)?;
queue.register(
&mut wtxn,
@ -72,12 +78,28 @@ pub fn upgrade_task_queue(
},
)?;
wtxn.commit()?;
// Should be pretty much instantaneous since we're the only one reading this env
env.prepare_for_closing().wait();
Ok(())
}
/// The task queue is 100% compatible with the previous versions
fn v1_12_to_current(_path: &Path) -> anyhow::Result<()> {
Ok(())
#[allow(non_camel_case_types)]
struct V1_12_ToCurrent {}
impl UpgradeIndexScheduler for V1_12_ToCurrent {
fn upgrade(
&self,
_env: &Env,
_wtxn: &mut RwTxn,
_original: (u32, u32, u32),
) -> anyhow::Result<()> {
Ok(())
}
fn target_version(&self) -> (u32, u32, u32) {
(
VERSION_MAJOR.parse().unwrap(),
VERSION_MINOR.parse().unwrap(),
VERSION_PATCH.parse().unwrap(),
)
}
}

View File

@ -0,0 +1,61 @@
use crate::{upgrade::upgrade_index_scheduler, Result};
use meilisearch_types::{
heed::{types::Str, Database, Env, RoTxn, RwTxn},
milli::heed_codec::version::VersionCodec,
versioning,
};
/// The number of database used by queue itself
const NUMBER_OF_DATABASES: u32 = 1;
/// Database const names for the `IndexScheduler`.
mod db_name {
pub const VERSION: &str = "version";
}
mod entry_name {
pub const MAIN: &str = "main";
}
#[derive(Clone)]
pub struct Versioning {
pub version: Database<Str, VersionCodec>,
}
impl Versioning {
pub(crate) const fn nb_db() -> u32 {
NUMBER_OF_DATABASES
}
pub fn get_version(&self, rtxn: &RoTxn) -> Result<Option<(u32, u32, u32)>> {
Ok(self.version.get(rtxn, entry_name::MAIN)?)
}
pub fn set_version(&self, wtxn: &mut RwTxn, version: (u32, u32, u32)) -> Result<()> {
Ok(self.version.put(wtxn, entry_name::MAIN, &version)?)
}
pub fn set_current_version(&self, wtxn: &mut RwTxn) -> Result<()> {
let major = versioning::VERSION_MAJOR.parse().unwrap();
let minor = versioning::VERSION_MINOR.parse().unwrap();
let patch = versioning::VERSION_PATCH.parse().unwrap();
self.set_version(wtxn, (major, minor, patch))
}
/// Create an index scheduler and start its run loop.
pub(crate) fn new(env: &Env, db_version: (u32, u32, u32)) -> Result<Self> {
let mut wtxn = env.write_txn()?;
let version = env.create_database(&mut wtxn, Some(db_name::VERSION))?;
let this = Self { version };
let from = this.get_version(&wtxn)?.unwrap_or(db_version);
wtxn.commit()?;
let bin_major: u32 = versioning::VERSION_MAJOR.parse().unwrap();
let bin_minor: u32 = versioning::VERSION_MINOR.parse().unwrap();
let bin_patch: u32 = versioning::VERSION_PATCH.parse().unwrap();
let to = (bin_major, bin_minor, bin_patch);
if from != to {
upgrade_index_scheduler(env, from, to)?;
}
Ok(this)
}
}

View File

@ -9,6 +9,36 @@ pub static VERSION_MAJOR: &str = env!("CARGO_PKG_VERSION_MAJOR");
pub static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR");
pub static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH");
/// Persists the version of the current Meilisearch binary to a VERSION file
pub fn update_version_file_for_dumpless_upgrade(
db_path: &Path,
from: (u32, u32, u32),
to: (u32, u32, u32),
) -> Result<(), VersionFileError> {
let (from_major, from_minor, from_patch) = from;
let (to_major, to_minor, to_patch) = to;
if from_major > to_major
|| (from_major == to_major && from_minor > to_minor)
|| (from_major == to_major && from_minor == to_minor && from_patch > to_patch)
{
Err(VersionFileError::DowngradeNotSupported {
major: from_major,
minor: from_minor,
patch: from_patch,
})
} else if from_major < 1 || (from_major == to_major && from_minor < 12) {
Err(VersionFileError::TooOldForAutomaticUpgrade {
major: from_major,
minor: from_minor,
patch: from_patch,
})
} else {
create_current_version_file(db_path)?;
Ok(())
}
}
/// Persists the version of the current Meilisearch binary to a VERSION file
pub fn create_current_version_file(db_path: &Path) -> io::Result<()> {
create_version_file(db_path, VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH)
@ -78,6 +108,10 @@ pub enum VersionFileError {
env!("CARGO_PKG_VERSION").to_string()
)]
VersionMismatch { major: u32, minor: u32, patch: u32 },
#[error("Database version {major}.{minor}.{patch} is higher than the Meilisearch version {VERSION_MAJOR}.{VERSION_MINOR}.{VERSION_PATCH}. Downgrade is not supported")]
DowngradeNotSupported { major: u32, minor: u32, patch: u32 },
#[error("Database version {major}.{minor}.{patch} is too old for the experimental dumpless upgrade feature. Please generate a dump using the v{major}.{minor}.{patch} and import it in the v{VERSION_MAJOR}.{VERSION_MINOR}.{VERSION_PATCH}")]
TooOldForAutomaticUpgrade { major: u32, minor: u32, patch: u32 },
#[error(transparent)]
IoError(#[from] std::io::Error),

View File

@ -32,7 +32,6 @@ use analytics::Analytics;
use anyhow::bail;
use error::PayloadError;
use extractors::payload::PayloadConfig;
use index_scheduler::upgrade::upgrade_task_queue;
use index_scheduler::{IndexScheduler, IndexSchedulerOptions};
use meilisearch_auth::AuthController;
use meilisearch_types::milli::constants::VERSION_MAJOR;
@ -41,7 +40,8 @@ use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMetho
use meilisearch_types::settings::apply_settings_to_builder;
use meilisearch_types::tasks::KindWithContent;
use meilisearch_types::versioning::{
create_current_version_file, get_version, VersionFileError, VERSION_MINOR, VERSION_PATCH,
create_current_version_file, get_version, update_version_file_for_dumpless_upgrade,
VersionFileError, VERSION_MINOR, VERSION_PATCH,
};
use meilisearch_types::{compression, milli, VERSION_FILE_NAME};
pub use option::Opt;
@ -234,6 +234,10 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
instance_features: opt.to_instance_features(),
auto_upgrade: opt.experimental_dumpless_upgrade,
};
let bin_major: u32 = VERSION_MAJOR.parse().unwrap();
let bin_minor: u32 = VERSION_MINOR.parse().unwrap();
let bin_patch: u32 = VERSION_PATCH.parse().unwrap();
let binary_version = (bin_major, bin_minor, bin_patch);
let empty_db = is_empty_db(&opt.db_path);
let (index_scheduler, auth_controller) = if let Some(ref snapshot_path) = opt.import_snapshot {
@ -245,6 +249,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
opt,
index_scheduler_opt,
OnFailure::RemoveDb,
binary_version, // the db is empty
)?,
Err(e) => {
std::fs::remove_dir_all(&opt.db_path)?;
@ -262,14 +267,18 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
bail!("snapshot doesn't exist at {}", snapshot_path.display())
// the snapshot and the db exist, and we can ignore the snapshot because of the ignore_snapshot_if_db_exists flag
} else {
open_or_create_database(opt, index_scheduler_opt, empty_db)?
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)?
}
} else if let Some(ref path) = opt.import_dump {
let src_path_exists = path.exists();
// the db is empty and the dump exists, import it
if empty_db && src_path_exists {
let (mut index_scheduler, mut auth_controller) =
open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::RemoveDb)?;
let (mut index_scheduler, mut auth_controller) = open_or_create_database_unchecked(
opt,
index_scheduler_opt,
OnFailure::RemoveDb,
binary_version, // the db is empty
)?;
match import_dump(&opt.db_path, path, &mut index_scheduler, &mut auth_controller) {
Ok(()) => (index_scheduler, auth_controller),
Err(e) => {
@ -289,10 +298,10 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
// the dump and the db exist and we can ignore the dump because of the ignore_dump_if_db_exists flag
// or, the dump is missing but we can ignore that because of the ignore_missing_dump flag
} else {
open_or_create_database(opt, index_scheduler_opt, empty_db)?
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)?
}
} else {
open_or_create_database(opt, index_scheduler_opt, empty_db)?
open_or_create_database(opt, index_scheduler_opt, empty_db, binary_version)?
};
// We create a loop in a thread that registers snapshotCreation tasks
@ -322,12 +331,13 @@ fn open_or_create_database_unchecked(
opt: &Opt,
index_scheduler_opt: IndexSchedulerOptions,
on_failure: OnFailure,
version: (u32, u32, u32),
) -> anyhow::Result<(IndexScheduler, AuthController)> {
// we don't want to create anything in the data.ms yet, thus we
// wrap our two builders in a closure that'll be executed later.
let auth_controller = AuthController::new(&opt.db_path, &opt.master_key);
let index_scheduler_builder =
|| -> anyhow::Result<_> { Ok(IndexScheduler::new(index_scheduler_opt)?) };
|| -> anyhow::Result<_> { Ok(IndexScheduler::new(index_scheduler_opt, version)?) };
match (
index_scheduler_builder(),
@ -345,25 +355,29 @@ fn open_or_create_database_unchecked(
}
/// Ensures Meilisearch version is compatible with the database, returns an error in case of version mismatch.
fn check_version_and_update_task_queue(
opt: &Opt,
index_scheduler_opt: &IndexSchedulerOptions,
) -> anyhow::Result<()> {
let (major, minor, patch) = get_version(&opt.db_path)?;
/// Returns the version that was contained in the version file
fn check_version(opt: &Opt, binary_version: (u32, u32, u32)) -> anyhow::Result<(u32, u32, u32)> {
let (bin_major, bin_minor, bin_patch) = binary_version;
let (db_major, db_minor, db_patch) = get_version(&opt.db_path)?;
let version_major: u32 = VERSION_MAJOR.parse().unwrap();
let version_minor: u32 = VERSION_MINOR.parse().unwrap();
let version_patch: u32 = VERSION_PATCH.parse().unwrap();
if major != version_major || minor != version_minor || patch > version_patch {
if db_major != bin_major || db_minor != bin_minor || db_patch > bin_patch {
if opt.experimental_dumpless_upgrade {
return upgrade_task_queue(index_scheduler_opt, (major, minor, patch));
update_version_file_for_dumpless_upgrade(
&opt.db_path,
(db_major, db_minor, db_patch),
(bin_major, bin_minor, bin_patch),
)?;
} else {
return Err(VersionFileError::VersionMismatch { major, minor, patch }.into());
return Err(VersionFileError::VersionMismatch {
major: db_major,
minor: db_minor,
patch: db_patch,
}
.into());
}
}
Ok(())
Ok((db_major, db_minor, db_patch))
}
/// Ensure you're in a valid state and open the IndexScheduler + AuthController for you.
@ -371,12 +385,11 @@ fn open_or_create_database(
opt: &Opt,
index_scheduler_opt: IndexSchedulerOptions,
empty_db: bool,
binary_version: (u32, u32, u32),
) -> anyhow::Result<(IndexScheduler, AuthController)> {
if !empty_db {
check_version_and_update_task_queue(opt, &index_scheduler_opt)?;
}
let version = if !empty_db { check_version(opt, binary_version)? } else { binary_version };
open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb)
open_or_create_database_unchecked(opt, index_scheduler_opt, OnFailure::KeepDb, version)
}
fn import_dump(