From 637bea0370af5ab727c750eb9ab3445797322615 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 25 Mar 2025 16:52:00 +0100 Subject: [PATCH] Compute and store the database sizes --- Cargo.lock | 2 + crates/index-scheduler/Cargo.toml | 2 + crates/index-scheduler/src/scheduler/mod.rs | 34 ++++++++++-- .../src/scheduler/process_batch.rs | 52 ++++++++++++++----- crates/meilisearch/Cargo.toml | 6 +-- crates/milli/src/index.rs | 5 +- 6 files changed, 77 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 65b85cbcc..96cfcf76c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2745,6 +2745,7 @@ dependencies = [ "bincode", "bumpalo", "bumparaw-collections", + "byte-unit", "convert_case 0.6.0", "crossbeam-channel", "csv", @@ -2753,6 +2754,7 @@ dependencies = [ "enum-iterator", "file-store", "flate2", + "indexmap", "insta", "maplit", "meili-snap", diff --git a/crates/index-scheduler/Cargo.toml b/crates/index-scheduler/Cargo.toml index 37b3ea835..31ff5f7d0 100644 --- a/crates/index-scheduler/Cargo.toml +++ b/crates/index-scheduler/Cargo.toml @@ -13,6 +13,7 @@ license.workspace = true [dependencies] anyhow = "1.0.95" bincode = "1.3.3" +byte-unit = "5.1.6" bumpalo = "3.16.0" bumparaw-collections = "0.1.4" convert_case = "0.6.0" @@ -22,6 +23,7 @@ dump = { path = "../dump" } enum-iterator = "2.1.0" file-store = { path = "../file-store" } flate2 = "1.0.35" +indexmap = "2.7.0" meilisearch-auth = { path = "../meilisearch-auth" } meilisearch-types = { path = "../meilisearch-types" } memmap2 = "0.9.5" diff --git a/crates/index-scheduler/src/scheduler/mod.rs b/crates/index-scheduler/src/scheduler/mod.rs index 1cbfece34..fe3084034 100644 --- a/crates/index-scheduler/src/scheduler/mod.rs +++ b/crates/index-scheduler/src/scheduler/mod.rs @@ -24,6 +24,7 @@ use meilisearch_types::error::ResponseError; use meilisearch_types::heed::{Env, WithoutTls}; use meilisearch_types::milli; use meilisearch_types::tasks::Status; +use process_batch::ProcessBatchInfo; use rayon::current_num_threads; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use roaring::RoaringBitmap; @@ -223,16 +224,16 @@ impl IndexScheduler { let mut stop_scheduler_forever = false; let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?; let mut canceled = RoaringBitmap::new(); - let mut congestion = None; + let mut process_batch_info = ProcessBatchInfo::default(); match res { - Ok((tasks, cong)) => { + Ok((tasks, info)) => { #[cfg(test)] self.breakpoint(crate::test_utils::Breakpoint::ProcessBatchSucceeded); let (task_progress, task_progress_obj) = AtomicTaskStep::new(tasks.len() as u32); progress.update_progress(task_progress_obj); - congestion = cong; + process_batch_info = info; let mut success = 0; let mut failure = 0; let mut canceled_by = None; @@ -350,6 +351,9 @@ impl IndexScheduler { // We must re-add the canceled task so they're part of the same batch. ids |= canceled; + let ProcessBatchInfo { congestion, pre_commit_dabases_sizes, post_commit_dabases_sizes } = + process_batch_info; + processing_batch.stats.progress_trace = progress.accumulated_durations().into_iter().map(|(k, v)| (k, v.into())).collect(); processing_batch.stats.write_channel_congestion = congestion.map(|congestion| { @@ -359,6 +363,30 @@ impl IndexScheduler { congestion_info.insert("blocking_ratio".into(), congestion.congestion_ratio().into()); congestion_info }); + processing_batch.stats.internal_database_sizes = pre_commit_dabases_sizes + .iter() + .flat_map(|(dbname, pre_size)| { + post_commit_dabases_sizes + .get(dbname) + .map(|post_size| { + use byte_unit::{Byte, UnitType::Binary}; + use std::cmp::Ordering::{Equal, Greater, Less}; + + let post = Byte::from_u64(*post_size as u64).get_appropriate_unit(Binary); + let diff_size = post_size.abs_diff(*pre_size) as u64; + let diff = Byte::from_u64(diff_size).get_appropriate_unit(Binary); + let sign = match post_size.cmp(pre_size) { + Equal => return None, + Greater => "+", + Less => "-", + }; + + Some((dbname.to_string(), format!("{post:#.2} ({sign}{diff:#.2})").into())) + }) + .into_iter() + .flatten() + }) + .collect(); if let Some(congestion) = congestion { tracing::debug!( diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 8f3987bf6..996b548c2 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -22,6 +22,16 @@ use crate::utils::{ }; use crate::{Error, IndexScheduler, Result, TaskId}; +#[derive(Debug, Default)] +pub struct ProcessBatchInfo { + /// The write channel congestion. None when unavailable: settings update. + pub congestion: Option, + /// The sizes of the different databases before starting the indexation. + pub pre_commit_dabases_sizes: indexmap::IndexMap<&'static str, usize>, + /// The sizes of the different databases after commiting the indexation. + pub post_commit_dabases_sizes: indexmap::IndexMap<&'static str, usize>, +} + impl IndexScheduler { /// Apply the operation associated with the given batch. /// @@ -35,7 +45,7 @@ impl IndexScheduler { batch: Batch, current_batch: &mut ProcessingBatch, progress: Progress, - ) -> Result<(Vec, Option)> { + ) -> Result<(Vec, ProcessBatchInfo)> { #[cfg(test)] { self.maybe_fail(crate::test_utils::FailureLocation::InsideProcessBatch)?; @@ -76,7 +86,7 @@ impl IndexScheduler { canceled_tasks.push(task); - Ok((canceled_tasks, None)) + Ok((canceled_tasks, ProcessBatchInfo::default())) } Batch::TaskDeletions(mut tasks) => { // 1. Retrieve the tasks that matched the query at enqueue-time. @@ -115,14 +125,14 @@ impl IndexScheduler { _ => unreachable!(), } } - Ok((tasks, None)) - } - Batch::SnapshotCreation(tasks) => { - self.process_snapshot(progress, tasks).map(|tasks| (tasks, None)) - } - Batch::Dump(task) => { - self.process_dump_creation(progress, task).map(|tasks| (tasks, None)) + Ok((tasks, ProcessBatchInfo::default())) } + Batch::SnapshotCreation(tasks) => self + .process_snapshot(progress, tasks) + .map(|tasks| (tasks, ProcessBatchInfo::default())), + Batch::Dump(task) => self + .process_dump_creation(progress, task) + .map(|tasks| (tasks, ProcessBatchInfo::default())), Batch::IndexOperation { op, must_create_index } => { let index_uid = op.index_uid().to_string(); let index = if must_create_index { @@ -139,6 +149,7 @@ impl IndexScheduler { .set_currently_updating_index(Some((index_uid.clone(), index.clone()))); let mut index_wtxn = index.write_txn()?; + let pre_commit_dabases_sizes = index.database_sizes(&index_wtxn)?; let (tasks, congestion) = self.apply_index_operation(&mut index_wtxn, &index, op, progress)?; @@ -153,12 +164,14 @@ impl IndexScheduler { // stats of the index. Since the tasks have already been processed and // this is a non-critical operation. If it fails, we should not fail // the entire batch. + let mut post_commit_dabases_sizes = None; let res = || -> Result<()> { let index_rtxn = index.read_txn()?; let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn) .map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?; let mut wtxn = self.env.write_txn()?; self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?; + post_commit_dabases_sizes = Some(index.database_sizes(&index_rtxn)?); wtxn.commit()?; Ok(()) }(); @@ -171,7 +184,16 @@ impl IndexScheduler { ), } - Ok((tasks, congestion)) + let info = ProcessBatchInfo { + congestion, + // In case we fail to the get post-commit sizes we decide + // that nothing changed and use the pre-commit sizes. + post_commit_dabases_sizes: post_commit_dabases_sizes + .unwrap_or_else(|| pre_commit_dabases_sizes.clone()), + pre_commit_dabases_sizes, + }; + + Ok((tasks, info)) } Batch::IndexCreation { index_uid, primary_key, task } => { progress.update_progress(CreateIndexProgress::CreatingTheIndex); @@ -239,7 +261,7 @@ impl IndexScheduler { ), } - Ok((vec![task], None)) + Ok((vec![task], ProcessBatchInfo::default())) } Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => { progress.update_progress(DeleteIndexProgress::DeletingTheIndex); @@ -273,7 +295,9 @@ impl IndexScheduler { }; } - Ok((tasks, None)) + // Here we could also show that all the internal database sizes goes to 0 + // but it would mean opening the index and that's costly. + Ok((tasks, ProcessBatchInfo::default())) } Batch::IndexSwap { mut task } => { progress.update_progress(SwappingTheIndexes::EnsuringCorrectnessOfTheSwap); @@ -321,7 +345,7 @@ impl IndexScheduler { } wtxn.commit()?; task.status = Status::Succeeded; - Ok((vec![task], None)) + Ok((vec![task], ProcessBatchInfo::default())) } Batch::UpgradeDatabase { mut tasks } => { let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else { @@ -351,7 +375,7 @@ impl IndexScheduler { task.error = None; } - Ok((tasks, None)) + Ok((tasks, ProcessBatchInfo::default())) } } } diff --git a/crates/meilisearch/Cargo.toml b/crates/meilisearch/Cargo.toml index 428f13c10..6360cdbde 100644 --- a/crates/meilisearch/Cargo.toml +++ b/crates/meilisearch/Cargo.toml @@ -30,11 +30,7 @@ actix-web = { version = "4.9.0", default-features = false, features = [ anyhow = { version = "1.0.95", features = ["backtrace"] } async-trait = "0.1.85" bstr = "1.11.3" -byte-unit = { version = "5.1.6", default-features = false, features = [ - "std", - "byte", - "serde", -] } +byte-unit = { version = "5.1.6", features = ["serde"] } bytes = "1.9.0" clap = { version = "4.5.24", features = ["derive", "env"] } crossbeam-channel = "0.5.14" diff --git a/crates/milli/src/index.rs b/crates/milli/src/index.rs index e0c124859..a2d839d03 100644 --- a/crates/milli/src/index.rs +++ b/crates/milli/src/index.rs @@ -5,6 +5,7 @@ use std::path::Path; use heed::{types::*, DatabaseStat, WithoutTls}; use heed::{CompactionOption, Database, RoTxn, RwTxn, Unspecified}; +use indexmap::IndexMap; use roaring::RoaringBitmap; use rstar::RTree; use serde::{Deserialize, Serialize}; @@ -1770,7 +1771,7 @@ impl Index { } /// Returns the sizes in bytes of each of the index database at the given rtxn. - pub fn database_sizes(&self, rtxn: &RoTxn<'_>) -> Result> { + pub fn database_sizes(&self, rtxn: &RoTxn<'_>) -> heed::Result> { let Self { env: _, main, @@ -1812,7 +1813,7 @@ impl Index { (branch_pages + leaf_pages + overflow_pages) * page_size as usize } - let mut sizes = HashMap::new(); + let mut sizes = IndexMap::new(); sizes.insert("main", main.stat(rtxn).map(compute_size)?); sizes .insert("external_documents_ids", external_documents_ids.stat(rtxn).map(compute_size)?);