Compute and store the database sizes

This commit is contained in:
Kerollmops 2025-03-25 16:52:00 +01:00
parent fd079c6757
commit 637bea0370
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
6 changed files with 77 additions and 24 deletions

2
Cargo.lock generated
View File

@ -2745,6 +2745,7 @@ dependencies = [
"bincode", "bincode",
"bumpalo", "bumpalo",
"bumparaw-collections", "bumparaw-collections",
"byte-unit",
"convert_case 0.6.0", "convert_case 0.6.0",
"crossbeam-channel", "crossbeam-channel",
"csv", "csv",
@ -2753,6 +2754,7 @@ dependencies = [
"enum-iterator", "enum-iterator",
"file-store", "file-store",
"flate2", "flate2",
"indexmap",
"insta", "insta",
"maplit", "maplit",
"meili-snap", "meili-snap",

View File

@ -13,6 +13,7 @@ license.workspace = true
[dependencies] [dependencies]
anyhow = "1.0.95" anyhow = "1.0.95"
bincode = "1.3.3" bincode = "1.3.3"
byte-unit = "5.1.6"
bumpalo = "3.16.0" bumpalo = "3.16.0"
bumparaw-collections = "0.1.4" bumparaw-collections = "0.1.4"
convert_case = "0.6.0" convert_case = "0.6.0"
@ -22,6 +23,7 @@ dump = { path = "../dump" }
enum-iterator = "2.1.0" enum-iterator = "2.1.0"
file-store = { path = "../file-store" } file-store = { path = "../file-store" }
flate2 = "1.0.35" flate2 = "1.0.35"
indexmap = "2.7.0"
meilisearch-auth = { path = "../meilisearch-auth" } meilisearch-auth = { path = "../meilisearch-auth" }
meilisearch-types = { path = "../meilisearch-types" } meilisearch-types = { path = "../meilisearch-types" }
memmap2 = "0.9.5" memmap2 = "0.9.5"

View File

@ -24,6 +24,7 @@ use meilisearch_types::error::ResponseError;
use meilisearch_types::heed::{Env, WithoutTls}; use meilisearch_types::heed::{Env, WithoutTls};
use meilisearch_types::milli; use meilisearch_types::milli;
use meilisearch_types::tasks::Status; use meilisearch_types::tasks::Status;
use process_batch::ProcessBatchInfo;
use rayon::current_num_threads; use rayon::current_num_threads;
use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::iter::{IntoParallelIterator, ParallelIterator};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
@ -223,16 +224,16 @@ impl IndexScheduler {
let mut stop_scheduler_forever = false; let mut stop_scheduler_forever = false;
let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?; let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?;
let mut canceled = RoaringBitmap::new(); let mut canceled = RoaringBitmap::new();
let mut congestion = None; let mut process_batch_info = ProcessBatchInfo::default();
match res { match res {
Ok((tasks, cong)) => { Ok((tasks, info)) => {
#[cfg(test)] #[cfg(test)]
self.breakpoint(crate::test_utils::Breakpoint::ProcessBatchSucceeded); self.breakpoint(crate::test_utils::Breakpoint::ProcessBatchSucceeded);
let (task_progress, task_progress_obj) = AtomicTaskStep::new(tasks.len() as u32); let (task_progress, task_progress_obj) = AtomicTaskStep::new(tasks.len() as u32);
progress.update_progress(task_progress_obj); progress.update_progress(task_progress_obj);
congestion = cong; process_batch_info = info;
let mut success = 0; let mut success = 0;
let mut failure = 0; let mut failure = 0;
let mut canceled_by = None; let mut canceled_by = None;
@ -350,6 +351,9 @@ impl IndexScheduler {
// We must re-add the canceled task so they're part of the same batch. // We must re-add the canceled task so they're part of the same batch.
ids |= canceled; ids |= canceled;
let ProcessBatchInfo { congestion, pre_commit_dabases_sizes, post_commit_dabases_sizes } =
process_batch_info;
processing_batch.stats.progress_trace = processing_batch.stats.progress_trace =
progress.accumulated_durations().into_iter().map(|(k, v)| (k, v.into())).collect(); progress.accumulated_durations().into_iter().map(|(k, v)| (k, v.into())).collect();
processing_batch.stats.write_channel_congestion = congestion.map(|congestion| { processing_batch.stats.write_channel_congestion = congestion.map(|congestion| {
@ -359,6 +363,30 @@ impl IndexScheduler {
congestion_info.insert("blocking_ratio".into(), congestion.congestion_ratio().into()); congestion_info.insert("blocking_ratio".into(), congestion.congestion_ratio().into());
congestion_info congestion_info
}); });
processing_batch.stats.internal_database_sizes = pre_commit_dabases_sizes
.iter()
.flat_map(|(dbname, pre_size)| {
post_commit_dabases_sizes
.get(dbname)
.map(|post_size| {
use byte_unit::{Byte, UnitType::Binary};
use std::cmp::Ordering::{Equal, Greater, Less};
let post = Byte::from_u64(*post_size as u64).get_appropriate_unit(Binary);
let diff_size = post_size.abs_diff(*pre_size) as u64;
let diff = Byte::from_u64(diff_size).get_appropriate_unit(Binary);
let sign = match post_size.cmp(pre_size) {
Equal => return None,
Greater => "+",
Less => "-",
};
Some((dbname.to_string(), format!("{post:#.2} ({sign}{diff:#.2})").into()))
})
.into_iter()
.flatten()
})
.collect();
if let Some(congestion) = congestion { if let Some(congestion) = congestion {
tracing::debug!( tracing::debug!(

View File

@ -22,6 +22,16 @@ use crate::utils::{
}; };
use crate::{Error, IndexScheduler, Result, TaskId}; use crate::{Error, IndexScheduler, Result, TaskId};
#[derive(Debug, Default)]
pub struct ProcessBatchInfo {
/// The write channel congestion. None when unavailable: settings update.
pub congestion: Option<ChannelCongestion>,
/// The sizes of the different databases before starting the indexation.
pub pre_commit_dabases_sizes: indexmap::IndexMap<&'static str, usize>,
/// The sizes of the different databases after commiting the indexation.
pub post_commit_dabases_sizes: indexmap::IndexMap<&'static str, usize>,
}
impl IndexScheduler { impl IndexScheduler {
/// Apply the operation associated with the given batch. /// Apply the operation associated with the given batch.
/// ///
@ -35,7 +45,7 @@ impl IndexScheduler {
batch: Batch, batch: Batch,
current_batch: &mut ProcessingBatch, current_batch: &mut ProcessingBatch,
progress: Progress, progress: Progress,
) -> Result<(Vec<Task>, Option<ChannelCongestion>)> { ) -> Result<(Vec<Task>, ProcessBatchInfo)> {
#[cfg(test)] #[cfg(test)]
{ {
self.maybe_fail(crate::test_utils::FailureLocation::InsideProcessBatch)?; self.maybe_fail(crate::test_utils::FailureLocation::InsideProcessBatch)?;
@ -76,7 +86,7 @@ impl IndexScheduler {
canceled_tasks.push(task); canceled_tasks.push(task);
Ok((canceled_tasks, None)) Ok((canceled_tasks, ProcessBatchInfo::default()))
} }
Batch::TaskDeletions(mut tasks) => { Batch::TaskDeletions(mut tasks) => {
// 1. Retrieve the tasks that matched the query at enqueue-time. // 1. Retrieve the tasks that matched the query at enqueue-time.
@ -115,14 +125,14 @@ impl IndexScheduler {
_ => unreachable!(), _ => unreachable!(),
} }
} }
Ok((tasks, None)) Ok((tasks, ProcessBatchInfo::default()))
}
Batch::SnapshotCreation(tasks) => {
self.process_snapshot(progress, tasks).map(|tasks| (tasks, None))
}
Batch::Dump(task) => {
self.process_dump_creation(progress, task).map(|tasks| (tasks, None))
} }
Batch::SnapshotCreation(tasks) => self
.process_snapshot(progress, tasks)
.map(|tasks| (tasks, ProcessBatchInfo::default())),
Batch::Dump(task) => self
.process_dump_creation(progress, task)
.map(|tasks| (tasks, ProcessBatchInfo::default())),
Batch::IndexOperation { op, must_create_index } => { Batch::IndexOperation { op, must_create_index } => {
let index_uid = op.index_uid().to_string(); let index_uid = op.index_uid().to_string();
let index = if must_create_index { let index = if must_create_index {
@ -139,6 +149,7 @@ impl IndexScheduler {
.set_currently_updating_index(Some((index_uid.clone(), index.clone()))); .set_currently_updating_index(Some((index_uid.clone(), index.clone())));
let mut index_wtxn = index.write_txn()?; let mut index_wtxn = index.write_txn()?;
let pre_commit_dabases_sizes = index.database_sizes(&index_wtxn)?;
let (tasks, congestion) = let (tasks, congestion) =
self.apply_index_operation(&mut index_wtxn, &index, op, progress)?; self.apply_index_operation(&mut index_wtxn, &index, op, progress)?;
@ -153,12 +164,14 @@ impl IndexScheduler {
// stats of the index. Since the tasks have already been processed and // stats of the index. Since the tasks have already been processed and
// this is a non-critical operation. If it fails, we should not fail // this is a non-critical operation. If it fails, we should not fail
// the entire batch. // the entire batch.
let mut post_commit_dabases_sizes = None;
let res = || -> Result<()> { let res = || -> Result<()> {
let index_rtxn = index.read_txn()?; let index_rtxn = index.read_txn()?;
let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn) let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)
.map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?; .map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?;
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?; self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?;
post_commit_dabases_sizes = Some(index.database_sizes(&index_rtxn)?);
wtxn.commit()?; wtxn.commit()?;
Ok(()) Ok(())
}(); }();
@ -171,7 +184,16 @@ impl IndexScheduler {
), ),
} }
Ok((tasks, congestion)) let info = ProcessBatchInfo {
congestion,
// In case we fail to the get post-commit sizes we decide
// that nothing changed and use the pre-commit sizes.
post_commit_dabases_sizes: post_commit_dabases_sizes
.unwrap_or_else(|| pre_commit_dabases_sizes.clone()),
pre_commit_dabases_sizes,
};
Ok((tasks, info))
} }
Batch::IndexCreation { index_uid, primary_key, task } => { Batch::IndexCreation { index_uid, primary_key, task } => {
progress.update_progress(CreateIndexProgress::CreatingTheIndex); progress.update_progress(CreateIndexProgress::CreatingTheIndex);
@ -239,7 +261,7 @@ impl IndexScheduler {
), ),
} }
Ok((vec![task], None)) Ok((vec![task], ProcessBatchInfo::default()))
} }
Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => { Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => {
progress.update_progress(DeleteIndexProgress::DeletingTheIndex); progress.update_progress(DeleteIndexProgress::DeletingTheIndex);
@ -273,7 +295,9 @@ impl IndexScheduler {
}; };
} }
Ok((tasks, None)) // Here we could also show that all the internal database sizes goes to 0
// but it would mean opening the index and that's costly.
Ok((tasks, ProcessBatchInfo::default()))
} }
Batch::IndexSwap { mut task } => { Batch::IndexSwap { mut task } => {
progress.update_progress(SwappingTheIndexes::EnsuringCorrectnessOfTheSwap); progress.update_progress(SwappingTheIndexes::EnsuringCorrectnessOfTheSwap);
@ -321,7 +345,7 @@ impl IndexScheduler {
} }
wtxn.commit()?; wtxn.commit()?;
task.status = Status::Succeeded; task.status = Status::Succeeded;
Ok((vec![task], None)) Ok((vec![task], ProcessBatchInfo::default()))
} }
Batch::UpgradeDatabase { mut tasks } => { Batch::UpgradeDatabase { mut tasks } => {
let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else { let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else {
@ -351,7 +375,7 @@ impl IndexScheduler {
task.error = None; task.error = None;
} }
Ok((tasks, None)) Ok((tasks, ProcessBatchInfo::default()))
} }
} }
} }

View File

@ -30,11 +30,7 @@ actix-web = { version = "4.9.0", default-features = false, features = [
anyhow = { version = "1.0.95", features = ["backtrace"] } anyhow = { version = "1.0.95", features = ["backtrace"] }
async-trait = "0.1.85" async-trait = "0.1.85"
bstr = "1.11.3" bstr = "1.11.3"
byte-unit = { version = "5.1.6", default-features = false, features = [ byte-unit = { version = "5.1.6", features = ["serde"] }
"std",
"byte",
"serde",
] }
bytes = "1.9.0" bytes = "1.9.0"
clap = { version = "4.5.24", features = ["derive", "env"] } clap = { version = "4.5.24", features = ["derive", "env"] }
crossbeam-channel = "0.5.14" crossbeam-channel = "0.5.14"

View File

@ -5,6 +5,7 @@ use std::path::Path;
use heed::{types::*, DatabaseStat, WithoutTls}; use heed::{types::*, DatabaseStat, WithoutTls};
use heed::{CompactionOption, Database, RoTxn, RwTxn, Unspecified}; use heed::{CompactionOption, Database, RoTxn, RwTxn, Unspecified};
use indexmap::IndexMap;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use rstar::RTree; use rstar::RTree;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -1770,7 +1771,7 @@ impl Index {
} }
/// Returns the sizes in bytes of each of the index database at the given rtxn. /// Returns the sizes in bytes of each of the index database at the given rtxn.
pub fn database_sizes(&self, rtxn: &RoTxn<'_>) -> Result<HashMap<&'static str, usize>> { pub fn database_sizes(&self, rtxn: &RoTxn<'_>) -> heed::Result<IndexMap<&'static str, usize>> {
let Self { let Self {
env: _, env: _,
main, main,
@ -1812,7 +1813,7 @@ impl Index {
(branch_pages + leaf_pages + overflow_pages) * page_size as usize (branch_pages + leaf_pages + overflow_pages) * page_size as usize
} }
let mut sizes = HashMap::new(); let mut sizes = IndexMap::new();
sizes.insert("main", main.stat(rtxn).map(compute_size)?); sizes.insert("main", main.stat(rtxn).map(compute_size)?);
sizes sizes
.insert("external_documents_ids", external_documents_ids.stat(rtxn).map(compute_size)?); .insert("external_documents_ids", external_documents_ids.stat(rtxn).map(compute_size)?);