From c27c9234394e7662771c559d43bc196c33b91a1e Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 23 Jan 2025 10:50:16 +0100 Subject: [PATCH] introduce a trait to upgrade the indexes --- .../src/scheduler/process_batch.rs | 5 +- .../src/scheduler/process_upgrade/mod.rs | 16 +++-- crates/milli/src/update/upgrade/mod.rs | 61 +++++++++++------ crates/milli/src/update/upgrade/v1_12.rs | 68 +++++++++++++------ 4 files changed, 104 insertions(+), 46 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index d54c9a171..7eda1d56f 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -316,7 +316,10 @@ impl IndexScheduler { Ok(vec![task]) } Batch::UpgradeDatabase { mut tasks } => { - let ret = catch_unwind(AssertUnwindSafe(|| self.process_upgrade(progress))); + let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else { + unreachable!(); + }; + let ret = catch_unwind(AssertUnwindSafe(|| self.process_upgrade(from, progress))); match ret { Ok(Ok(())) => (), Ok(Err(e)) => return Err(Error::DatabaseUpgrade(Box::new(e))), diff --git a/crates/index-scheduler/src/scheduler/process_upgrade/mod.rs b/crates/index-scheduler/src/scheduler/process_upgrade/mod.rs index 1195f14ba..4feebabc4 100644 --- a/crates/index-scheduler/src/scheduler/process_upgrade/mod.rs +++ b/crates/index-scheduler/src/scheduler/process_upgrade/mod.rs @@ -4,7 +4,11 @@ use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use crate::{Error, IndexScheduler, Result}; impl IndexScheduler { - pub(super) fn process_upgrade(&self, progress: Progress) -> Result<()> { + pub(super) fn process_upgrade( + &self, + db_version: (u32, u32, u32), + progress: Progress, + ) -> Result<()> { #[cfg(test)] self.maybe_fail(crate::test_utils::FailureLocation::ProcessUpgrade)?; @@ -19,9 +23,13 @@ impl IndexScheduler { )); let index = self.index(uid)?; let mut index_wtxn = index.write_txn()?; - let regen_stats = - milli::update::upgrade::upgrade(&mut index_wtxn, &index, progress.clone()) - .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; + let regen_stats = milli::update::upgrade::upgrade( + &mut index_wtxn, + &index, + db_version, + progress.clone(), + ) + .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; if regen_stats { let stats = crate::index_mapper::IndexStats::new(&index, &index_wtxn) .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; diff --git a/crates/milli/src/update/upgrade/mod.rs b/crates/milli/src/update/upgrade/mod.rs index 4be55e942..a62740e25 100644 --- a/crates/milli/src/update/upgrade/mod.rs +++ b/crates/milli/src/update/upgrade/mod.rs @@ -1,32 +1,39 @@ mod v1_12; use heed::RwTxn; -use v1_12::{v1_12_3_to_v1_13, v1_12_to_v1_12_3}; +use v1_12::{V1_12_3_To_Current, V1_12_To_V1_12_3}; use crate::progress::{Progress, VariableNameStep}; use crate::{Index, InternalError, Result}; +trait UpgradeIndex { + /// Returns true if the index scheduler must regenerate its cached stats + fn upgrade( + &self, + wtxn: &mut RwTxn, + index: &Index, + original: (u32, u32, u32), + progress: Progress, + ) -> Result; + fn target_version(&self) -> (u32, u32, u32); +} + /// Return true if the cached stats of the index must be regenerated -pub fn upgrade(wtxn: &mut RwTxn, index: &Index, progress: Progress) -> Result { - let from = index.get_version(wtxn)?; - let upgrade_functions = [ - ( - v1_12_to_v1_12_3 as fn(&mut RwTxn, &Index, Progress) -> Result, - "Upgrading from v1.12.(0/1/2) to v1.12.3", - ), - ( - v1_12_3_to_v1_13 as fn(&mut RwTxn, &Index, Progress) -> Result, - "Upgrading from v1.12.3+ to v1.13", - ), - ]; +pub fn upgrade( + wtxn: &mut RwTxn, + index: &Index, + db_version: (u32, u32, u32), + progress: Progress, +) -> Result { + let from = index.get_version(wtxn)?.unwrap_or(db_version); + let upgrade_functions: &[&dyn UpgradeIndex] = &[&V1_12_To_V1_12_3 {}, &V1_12_3_To_Current()]; let start = match from { - // If there was no version it means we're coming from the v1.12 - None | Some((1, 12, 0..=2)) => 0, - Some((1, 12, 3..)) => 1, + (1, 12, 0..=2) => 0, + (1, 12, 3..) => 1, // We must handle the current version in the match because in case of a failure some index may have been upgraded but not other. - Some((1, 13, _)) => return Ok(false), - Some((major, minor, patch)) => { + (1, 13, _) => return Ok(false), + (major, minor, patch) => { return Err(InternalError::CannotUpgradeToVersion(major, minor, patch).into()) } }; @@ -34,14 +41,26 @@ pub fn upgrade(wtxn: &mut RwTxn, index: &Index, progress: Progress) -> Result::new( - upgrade_msg.to_string(), + format!( + "Upgrading from v{}.{}.{} to v{}.{}.{}", + current_version.0, + current_version.1, + current_version.2, + target.0, + target.1, + target.2 + ), i as u32, upgrade_path.len() as u32, )); - regenerate_stats |= (upgrade_function)(wtxn, index, progress.clone())?; + regenerate_stats |= upgrade.upgrade(wtxn, index, from, progress.clone())?; + current_version = target; } Ok(regenerate_stats) diff --git a/crates/milli/src/update/upgrade/v1_12.rs b/crates/milli/src/update/upgrade/v1_12.rs index 082896610..e48ecfe36 100644 --- a/crates/milli/src/update/upgrade/v1_12.rs +++ b/crates/milli/src/update/upgrade/v1_12.rs @@ -1,28 +1,56 @@ use heed::RwTxn; +use crate::constants::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; use crate::progress::Progress; use crate::{make_enum_progress, Index, Result}; -// The field distribution was not computed correctly in the v1.12 until the v1.12.3 -pub(super) fn v1_12_to_v1_12_3( - wtxn: &mut RwTxn, - index: &Index, - progress: Progress, -) -> Result { - make_enum_progress! { - enum FieldDistribution { - RebuildingFieldDistribution, - } - }; - progress.update_progress(FieldDistribution::RebuildingFieldDistribution); - crate::update::new::reindex::field_distribution(index, wtxn, &progress)?; - Ok(true) +use super::UpgradeIndex; + +#[allow(non_camel_case_types)] +pub(super) struct V1_12_To_V1_12_3 {} + +impl UpgradeIndex for V1_12_To_V1_12_3 { + fn upgrade( + &self, + wtxn: &mut RwTxn, + index: &Index, + _original: (u32, u32, u32), + progress: Progress, + ) -> Result { + make_enum_progress! { + enum FieldDistribution { + RebuildingFieldDistribution, + } + }; + progress.update_progress(FieldDistribution::RebuildingFieldDistribution); + crate::update::new::reindex::field_distribution(index, wtxn, &progress)?; + Ok(true) + } + + fn target_version(&self) -> (u32, u32, u32) { + (1, 12, 3) + } } -pub(super) fn v1_12_3_to_v1_13( - _wtxn: &mut RwTxn, - _index: &Index, - _progress: Progress, -) -> Result { - Ok(false) +#[allow(non_camel_case_types)] +pub(super) struct V1_12_3_To_Current(); + +impl UpgradeIndex for V1_12_3_To_Current { + fn upgrade( + &self, + _wtxn: &mut RwTxn, + _index: &Index, + _original: (u32, u32, u32), + _progress: Progress, + ) -> Result { + Ok(false) + } + + fn target_version(&self) -> (u32, u32, u32) { + ( + VERSION_MAJOR.parse().unwrap(), + VERSION_MINOR.parse().unwrap(), + VERSION_PATCH.parse().unwrap(), + ) + } }