Upgrade supports cancelling

This commit is contained in:
Louis Dureuil 2025-04-22 09:34:37 +02:00
parent b82dda2d0d
commit 121c1ac1dd
No known key found for this signature in database
2 changed files with 16 additions and 3 deletions

View File

@ -1,5 +1,5 @@
use meilisearch_types::milli;
use meilisearch_types::milli::progress::{Progress, VariableNameStep}; use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::{self};
use crate::{Error, IndexScheduler, Result}; use crate::{Error, IndexScheduler, Result};
@ -16,6 +16,11 @@ impl IndexScheduler {
let indexes = self.index_names()?; let indexes = self.index_names()?;
for (i, uid) in indexes.iter().enumerate() { for (i, uid) in indexes.iter().enumerate() {
let must_stop_processing = self.scheduler.must_stop_processing.clone();
if must_stop_processing.get() {
return Err(Error::AbortedTask);
}
progress.update_progress(VariableNameStep::<UpgradeIndex>::new( progress.update_progress(VariableNameStep::<UpgradeIndex>::new(
format!("Upgrading index `{uid}`"), format!("Upgrading index `{uid}`"),
i as u32, i as u32,
@ -27,6 +32,7 @@ impl IndexScheduler {
&mut index_wtxn, &mut index_wtxn,
&index, &index,
db_version, db_version,
|| must_stop_processing.get(),
progress.clone(), progress.clone(),
) )
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;

View File

@ -23,12 +23,16 @@ trait UpgradeIndex {
} }
/// Return true if the cached stats of the index must be regenerated /// Return true if the cached stats of the index must be regenerated
pub fn upgrade( pub fn upgrade<MSP>(
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
index: &Index, index: &Index,
db_version: (u32, u32, u32), db_version: (u32, u32, u32),
must_stop_processing: MSP,
progress: Progress, progress: Progress,
) -> Result<bool> { ) -> Result<bool>
where
MSP: Fn() -> bool + Sync,
{
let from = index.get_version(wtxn)?.unwrap_or(db_version); let from = index.get_version(wtxn)?.unwrap_or(db_version);
let upgrade_functions: &[&dyn UpgradeIndex] = &[ let upgrade_functions: &[&dyn UpgradeIndex] = &[
&V1_12_To_V1_12_3 {}, &V1_12_To_V1_12_3 {},
@ -56,6 +60,9 @@ pub fn upgrade(
let mut current_version = from; let mut current_version = from;
let mut regenerate_stats = false; let mut regenerate_stats = false;
for (i, upgrade) in upgrade_path.iter().enumerate() { for (i, upgrade) in upgrade_path.iter().enumerate() {
if (must_stop_processing)() {
return Err(crate::Error::InternalError(InternalError::AbortedIndexation));
}
let target = upgrade.target_version(); let target = upgrade.target_version();
progress.update_progress(VariableNameStep::<UpgradeVersion>::new( progress.update_progress(VariableNameStep::<UpgradeVersion>::new(
format!( format!(