add the version to the index-scheduler snapshots + fix a bug when opening an index scheduler for the first time

This commit is contained in:
Tamo 2025-01-23 00:39:28 +01:00
parent 7dba0eb5f5
commit a475ea7619
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
4 changed files with 23 additions and 2 deletions

View File

@ -6,6 +6,7 @@ use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{Database, RoTxn}; use meilisearch_types::heed::{Database, RoTxn};
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32}; use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Details, Kind, Status, Task}; use meilisearch_types::tasks::{Details, Kind, Status, Task};
use meilisearch_types::versioning;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use crate::index_mapper::IndexMapper; use crate::index_mapper::IndexMapper;
@ -21,7 +22,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
cleanup_enabled: _, cleanup_enabled: _,
processing_tasks, processing_tasks,
env, env,
version: _, version,
queue, queue,
scheduler, scheduler,
@ -39,6 +40,16 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
let mut snap = String::new(); let mut snap = String::new();
let indx_sched_version = version.get_version(&rtxn).unwrap();
let latest_version = (
versioning::VERSION_MAJOR.parse().unwrap(),
versioning::VERSION_MINOR.parse().unwrap(),
versioning::VERSION_PATCH.parse().unwrap(),
);
if indx_sched_version != Some(latest_version) {
snap.push_str(&format!("index scheduler running on version {indx_sched_version:?}\n"));
}
let processing = processing_tasks.read().unwrap().clone(); let processing = processing_tasks.read().unwrap().clone();
snap.push_str(&format!("### Autobatching Enabled = {}\n", scheduler.autobatching_enabled)); snap.push_str(&format!("### Autobatching Enabled = {}\n", scheduler.autobatching_enabled));
snap.push_str(&format!( snap.push_str(&format!(

View File

@ -6,6 +6,7 @@ use time::OffsetDateTime;
use tracing::info; use tracing::info;
use crate::queue::TaskQueue; use crate::queue::TaskQueue;
use crate::versioning::Versioning;
trait UpgradeIndexScheduler { trait UpgradeIndexScheduler {
fn upgrade(&self, env: &Env, wtxn: &mut RwTxn, original: (u32, u32, u32)) fn upgrade(&self, env: &Env, wtxn: &mut RwTxn, original: (u32, u32, u32))
@ -15,6 +16,7 @@ trait UpgradeIndexScheduler {
pub fn upgrade_index_scheduler( pub fn upgrade_index_scheduler(
env: &Env, env: &Env,
versioning: &Versioning,
from: (u32, u32, u32), from: (u32, u32, u32),
to: (u32, u32, u32), to: (u32, u32, u32),
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
@ -55,6 +57,7 @@ pub fn upgrade_index_scheduler(
); );
let mut wtxn = env.write_txn()?; let mut wtxn = env.write_txn()?;
upgrade.upgrade(env, &mut wtxn, from)?; upgrade.upgrade(env, &mut wtxn, from)?;
versioning.set_version(&mut wtxn, target)?;
wtxn.commit()?; wtxn.commit()?;
current_version = target; current_version = target;
} }

View File

@ -54,8 +54,15 @@ impl Versioning {
let to = (bin_major, bin_minor, bin_patch); let to = (bin_major, bin_minor, bin_patch);
if from != to { if from != to {
upgrade_index_scheduler(env, from, to)?; upgrade_index_scheduler(env, &this, from, to)?;
} }
// Once we reach this point it means the upgrade process, if there was one is entirely finished
// we can safely say we reached the latest version of the index scheduler
let mut wtxn = env.write_txn()?;
this.set_current_version(&mut wtxn)?;
wtxn.commit()?;
Ok(this) Ok(this)
} }
} }

Binary file not shown.