diff --git a/Cargo.lock b/Cargo.lock index 65b85cbcc..96cfcf76c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2745,6 +2745,7 @@ dependencies = [ "bincode", "bumpalo", "bumparaw-collections", + "byte-unit", "convert_case 0.6.0", "crossbeam-channel", "csv", @@ -2753,6 +2754,7 @@ dependencies = [ "enum-iterator", "file-store", "flate2", + "indexmap", "insta", "maplit", "meili-snap", diff --git a/crates/dump/src/lib.rs b/crates/dump/src/lib.rs index 4e2d6ac2f..ee63f7048 100644 --- a/crates/dump/src/lib.rs +++ b/crates/dump/src/lib.rs @@ -326,6 +326,7 @@ pub(crate) mod test { index_uids: maplit::btreemap! { "doggo".to_string() => 1 }, progress_trace: Default::default(), write_channel_congestion: None, + internal_database_sizes: Default::default(), }, enqueued_at: Some(BatchEnqueuedAt { earliest: datetime!(2022-11-11 0:00 UTC), diff --git a/crates/index-scheduler/Cargo.toml b/crates/index-scheduler/Cargo.toml index 37b3ea835..31ff5f7d0 100644 --- a/crates/index-scheduler/Cargo.toml +++ b/crates/index-scheduler/Cargo.toml @@ -13,6 +13,7 @@ license.workspace = true [dependencies] anyhow = "1.0.95" bincode = "1.3.3" +byte-unit = "5.1.6" bumpalo = "3.16.0" bumparaw-collections = "0.1.4" convert_case = "0.6.0" @@ -22,6 +23,7 @@ dump = { path = "../dump" } enum-iterator = "2.1.0" file-store = { path = "../file-store" } flate2 = "1.0.35" +indexmap = "2.7.0" meilisearch-auth = { path = "../meilisearch-auth" } meilisearch-types = { path = "../meilisearch-types" } memmap2 = "0.9.5" diff --git a/crates/index-scheduler/src/insta_snapshot.rs b/crates/index-scheduler/src/insta_snapshot.rs index bcc295afd..949edf369 100644 --- a/crates/index-scheduler/src/insta_snapshot.rs +++ b/crates/index-scheduler/src/insta_snapshot.rs @@ -344,6 +344,7 @@ pub fn snapshot_batch(batch: &Batch) -> String { let Batch { uid, details, stats, started_at, finished_at, progress: _, enqueued_at } = batch; let stats = BatchStats { progress_trace: Default::default(), + internal_database_sizes: Default::default(), write_channel_congestion: None, ..stats.clone() }; diff --git a/crates/index-scheduler/src/processing.rs b/crates/index-scheduler/src/processing.rs index fed26aeb7..09ce46884 100644 --- a/crates/index-scheduler/src/processing.rs +++ b/crates/index-scheduler/src/processing.rs @@ -64,6 +64,13 @@ make_enum_progress! { } } +make_enum_progress! { + pub enum FinalizingIndexStep { + Committing, + ComputingStats, + } +} + make_enum_progress! { pub enum TaskCancelationProgress { RetrievingTasks, diff --git a/crates/index-scheduler/src/scheduler/mod.rs b/crates/index-scheduler/src/scheduler/mod.rs index 1cbfece34..fe3084034 100644 --- a/crates/index-scheduler/src/scheduler/mod.rs +++ b/crates/index-scheduler/src/scheduler/mod.rs @@ -24,6 +24,7 @@ use meilisearch_types::error::ResponseError; use meilisearch_types::heed::{Env, WithoutTls}; use meilisearch_types::milli; use meilisearch_types::tasks::Status; +use process_batch::ProcessBatchInfo; use rayon::current_num_threads; use rayon::iter::{IntoParallelIterator, ParallelIterator}; use roaring::RoaringBitmap; @@ -223,16 +224,16 @@ impl IndexScheduler { let mut stop_scheduler_forever = false; let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?; let mut canceled = RoaringBitmap::new(); - let mut congestion = None; + let mut process_batch_info = ProcessBatchInfo::default(); match res { - Ok((tasks, cong)) => { + Ok((tasks, info)) => { #[cfg(test)] self.breakpoint(crate::test_utils::Breakpoint::ProcessBatchSucceeded); let (task_progress, task_progress_obj) = AtomicTaskStep::new(tasks.len() as u32); progress.update_progress(task_progress_obj); - congestion = cong; + process_batch_info = info; let mut success = 0; let mut failure = 0; let mut canceled_by = None; @@ -350,6 +351,9 @@ impl IndexScheduler { // We must re-add the canceled task so they're part of the same batch. ids |= canceled; + let ProcessBatchInfo { congestion, pre_commit_dabases_sizes, post_commit_dabases_sizes } = + process_batch_info; + processing_batch.stats.progress_trace = progress.accumulated_durations().into_iter().map(|(k, v)| (k, v.into())).collect(); processing_batch.stats.write_channel_congestion = congestion.map(|congestion| { @@ -359,6 +363,30 @@ impl IndexScheduler { congestion_info.insert("blocking_ratio".into(), congestion.congestion_ratio().into()); congestion_info }); + processing_batch.stats.internal_database_sizes = pre_commit_dabases_sizes + .iter() + .flat_map(|(dbname, pre_size)| { + post_commit_dabases_sizes + .get(dbname) + .map(|post_size| { + use byte_unit::{Byte, UnitType::Binary}; + use std::cmp::Ordering::{Equal, Greater, Less}; + + let post = Byte::from_u64(*post_size as u64).get_appropriate_unit(Binary); + let diff_size = post_size.abs_diff(*pre_size) as u64; + let diff = Byte::from_u64(diff_size).get_appropriate_unit(Binary); + let sign = match post_size.cmp(pre_size) { + Equal => return None, + Greater => "+", + Less => "-", + }; + + Some((dbname.to_string(), format!("{post:#.2} ({sign}{diff:#.2})").into())) + }) + .into_iter() + .flatten() + }) + .collect(); if let Some(congestion) = congestion { tracing::debug!( diff --git a/crates/index-scheduler/src/scheduler/process_batch.rs b/crates/index-scheduler/src/scheduler/process_batch.rs index 8f3987bf6..42de1d137 100644 --- a/crates/index-scheduler/src/scheduler/process_batch.rs +++ b/crates/index-scheduler/src/scheduler/process_batch.rs @@ -12,7 +12,7 @@ use roaring::RoaringBitmap; use super::create_batch::Batch; use crate::processing::{ - AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress, + AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress, FinalizingIndexStep, InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress, UpdateIndexProgress, }; @@ -22,6 +22,16 @@ use crate::utils::{ }; use crate::{Error, IndexScheduler, Result, TaskId}; +#[derive(Debug, Default)] +pub struct ProcessBatchInfo { + /// The write channel congestion. None when unavailable: settings update. + pub congestion: Option, + /// The sizes of the different databases before starting the indexation. + pub pre_commit_dabases_sizes: indexmap::IndexMap<&'static str, usize>, + /// The sizes of the different databases after commiting the indexation. + pub post_commit_dabases_sizes: indexmap::IndexMap<&'static str, usize>, +} + impl IndexScheduler { /// Apply the operation associated with the given batch. /// @@ -35,7 +45,7 @@ impl IndexScheduler { batch: Batch, current_batch: &mut ProcessingBatch, progress: Progress, - ) -> Result<(Vec, Option)> { + ) -> Result<(Vec, ProcessBatchInfo)> { #[cfg(test)] { self.maybe_fail(crate::test_utils::FailureLocation::InsideProcessBatch)?; @@ -76,7 +86,7 @@ impl IndexScheduler { canceled_tasks.push(task); - Ok((canceled_tasks, None)) + Ok((canceled_tasks, ProcessBatchInfo::default())) } Batch::TaskDeletions(mut tasks) => { // 1. Retrieve the tasks that matched the query at enqueue-time. @@ -115,14 +125,14 @@ impl IndexScheduler { _ => unreachable!(), } } - Ok((tasks, None)) - } - Batch::SnapshotCreation(tasks) => { - self.process_snapshot(progress, tasks).map(|tasks| (tasks, None)) - } - Batch::Dump(task) => { - self.process_dump_creation(progress, task).map(|tasks| (tasks, None)) + Ok((tasks, ProcessBatchInfo::default())) } + Batch::SnapshotCreation(tasks) => self + .process_snapshot(progress, tasks) + .map(|tasks| (tasks, ProcessBatchInfo::default())), + Batch::Dump(task) => self + .process_dump_creation(progress, task) + .map(|tasks| (tasks, ProcessBatchInfo::default())), Batch::IndexOperation { op, must_create_index } => { let index_uid = op.index_uid().to_string(); let index = if must_create_index { @@ -139,10 +149,12 @@ impl IndexScheduler { .set_currently_updating_index(Some((index_uid.clone(), index.clone()))); let mut index_wtxn = index.write_txn()?; + let pre_commit_dabases_sizes = index.database_sizes(&index_wtxn)?; let (tasks, congestion) = - self.apply_index_operation(&mut index_wtxn, &index, op, progress)?; + self.apply_index_operation(&mut index_wtxn, &index, op, &progress)?; { + progress.update_progress(FinalizingIndexStep::Committing); let span = tracing::trace_span!(target: "indexing::scheduler", "commit"); let _entered = span.enter(); @@ -153,12 +165,15 @@ impl IndexScheduler { // stats of the index. Since the tasks have already been processed and // this is a non-critical operation. If it fails, we should not fail // the entire batch. + let mut post_commit_dabases_sizes = None; let res = || -> Result<()> { + progress.update_progress(FinalizingIndexStep::ComputingStats); let index_rtxn = index.read_txn()?; let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn) .map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?; let mut wtxn = self.env.write_txn()?; self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?; + post_commit_dabases_sizes = Some(index.database_sizes(&index_rtxn)?); wtxn.commit()?; Ok(()) }(); @@ -171,7 +186,16 @@ impl IndexScheduler { ), } - Ok((tasks, congestion)) + let info = ProcessBatchInfo { + congestion, + // In case we fail to the get post-commit sizes we decide + // that nothing changed and use the pre-commit sizes. + post_commit_dabases_sizes: post_commit_dabases_sizes + .unwrap_or_else(|| pre_commit_dabases_sizes.clone()), + pre_commit_dabases_sizes, + }; + + Ok((tasks, info)) } Batch::IndexCreation { index_uid, primary_key, task } => { progress.update_progress(CreateIndexProgress::CreatingTheIndex); @@ -239,7 +263,7 @@ impl IndexScheduler { ), } - Ok((vec![task], None)) + Ok((vec![task], ProcessBatchInfo::default())) } Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => { progress.update_progress(DeleteIndexProgress::DeletingTheIndex); @@ -273,7 +297,9 @@ impl IndexScheduler { }; } - Ok((tasks, None)) + // Here we could also show that all the internal database sizes goes to 0 + // but it would mean opening the index and that's costly. + Ok((tasks, ProcessBatchInfo::default())) } Batch::IndexSwap { mut task } => { progress.update_progress(SwappingTheIndexes::EnsuringCorrectnessOfTheSwap); @@ -321,7 +347,7 @@ impl IndexScheduler { } wtxn.commit()?; task.status = Status::Succeeded; - Ok((vec![task], None)) + Ok((vec![task], ProcessBatchInfo::default())) } Batch::UpgradeDatabase { mut tasks } => { let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else { @@ -351,7 +377,7 @@ impl IndexScheduler { task.error = None; } - Ok((tasks, None)) + Ok((tasks, ProcessBatchInfo::default())) } } } diff --git a/crates/index-scheduler/src/scheduler/process_index_operation.rs b/crates/index-scheduler/src/scheduler/process_index_operation.rs index 690fe2efd..9b12d61cf 100644 --- a/crates/index-scheduler/src/scheduler/process_index_operation.rs +++ b/crates/index-scheduler/src/scheduler/process_index_operation.rs @@ -32,7 +32,7 @@ impl IndexScheduler { index_wtxn: &mut RwTxn<'i>, index: &'i Index, operation: IndexOperation, - progress: Progress, + progress: &Progress, ) -> Result<(Vec, Option)> { let indexer_alloc = Bump::new(); let started_processing_at = std::time::Instant::now(); @@ -186,7 +186,7 @@ impl IndexScheduler { &document_changes, embedders, &|| must_stop_processing.get(), - &progress, + progress, ) .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?, ); @@ -307,7 +307,7 @@ impl IndexScheduler { &document_changes, embedders, &|| must_stop_processing.get(), - &progress, + progress, ) .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?, ); @@ -465,7 +465,7 @@ impl IndexScheduler { &document_changes, embedders, &|| must_stop_processing.get(), - &progress, + progress, ) .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?, ); @@ -520,7 +520,7 @@ impl IndexScheduler { index_uid: index_uid.clone(), tasks: cleared_tasks, }, - progress.clone(), + progress, )?; let (settings_tasks, _congestion) = self.apply_index_operation( diff --git a/crates/meilisearch-types/src/batches.rs b/crates/meilisearch-types/src/batches.rs index 904682585..c7b9d6cfa 100644 --- a/crates/meilisearch-types/src/batches.rs +++ b/crates/meilisearch-types/src/batches.rs @@ -64,4 +64,6 @@ pub struct BatchStats { pub progress_trace: serde_json::Map, #[serde(default, skip_serializing_if = "Option::is_none")] pub write_channel_congestion: Option>, + #[serde(default, skip_serializing_if = "serde_json::Map::is_empty")] + pub internal_database_sizes: serde_json::Map, } diff --git a/crates/meilisearch/Cargo.toml b/crates/meilisearch/Cargo.toml index 428f13c10..6360cdbde 100644 --- a/crates/meilisearch/Cargo.toml +++ b/crates/meilisearch/Cargo.toml @@ -30,11 +30,7 @@ actix-web = { version = "4.9.0", default-features = false, features = [ anyhow = { version = "1.0.95", features = ["backtrace"] } async-trait = "0.1.85" bstr = "1.11.3" -byte-unit = { version = "5.1.6", default-features = false, features = [ - "std", - "byte", - "serde", -] } +byte-unit = { version = "5.1.6", features = ["serde"] } bytes = "1.9.0" clap = { version = "4.5.24", features = ["derive", "env"] } crossbeam-channel = "0.5.14" diff --git a/crates/meilisearch/tests/batches/mod.rs b/crates/meilisearch/tests/batches/mod.rs index 468963631..6c2aa4aaf 100644 --- a/crates/meilisearch/tests/batches/mod.rs +++ b/crates/meilisearch/tests/batches/mod.rs @@ -281,7 +281,8 @@ async fn test_summarized_document_addition_or_update() { ".startedAt" => "[date]", ".finishedAt" => "[date]", ".stats.progressTrace" => "[progressTrace]", - ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + ".stats.writeChannelCongestion" => "[writeChannelCongestion]", + ".stats.internalDatabaseSizes" => "[internalDatabaseSizes]" }, @r###" { @@ -303,7 +304,8 @@ async fn test_summarized_document_addition_or_update() { "test": 1 }, "progressTrace": "[progressTrace]", - "writeChannelCongestion": "[writeChannelCongestion]" + "writeChannelCongestion": "[writeChannelCongestion]", + "internalDatabaseSizes": "[internalDatabaseSizes]" }, "duration": "[duration]", "startedAt": "[date]", @@ -322,7 +324,8 @@ async fn test_summarized_document_addition_or_update() { ".startedAt" => "[date]", ".finishedAt" => "[date]", ".stats.progressTrace" => "[progressTrace]", - ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + ".stats.writeChannelCongestion" => "[writeChannelCongestion]", + ".stats.internalDatabaseSizes" => "[internalDatabaseSizes]" }, @r###" { @@ -407,7 +410,8 @@ async fn test_summarized_delete_documents_by_batch() { ".startedAt" => "[date]", ".finishedAt" => "[date]", ".stats.progressTrace" => "[progressTrace]", - ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + ".stats.writeChannelCongestion" => "[writeChannelCongestion]", + ".stats.internalDatabaseSizes" => "[internalDatabaseSizes]" }, @r###" { @@ -495,7 +499,8 @@ async fn test_summarized_delete_documents_by_filter() { ".startedAt" => "[date]", ".finishedAt" => "[date]", ".stats.progressTrace" => "[progressTrace]", - ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + ".stats.writeChannelCongestion" => "[writeChannelCongestion]", + ".stats.internalDatabaseSizes" => "[internalDatabaseSizes]" }, @r###" { @@ -537,7 +542,8 @@ async fn test_summarized_delete_documents_by_filter() { ".startedAt" => "[date]", ".finishedAt" => "[date]", ".stats.progressTrace" => "[progressTrace]", - ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + ".stats.writeChannelCongestion" => "[writeChannelCongestion]", + ".stats.internalDatabaseSizes" => "[internalDatabaseSizes]" }, @r#" { @@ -623,7 +629,8 @@ async fn test_summarized_delete_document_by_id() { ".startedAt" => "[date]", ".finishedAt" => "[date]", ".stats.progressTrace" => "[progressTrace]", - ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + ".stats.writeChannelCongestion" => "[writeChannelCongestion]", + ".stats.internalDatabaseSizes" => "[internalDatabaseSizes]" }, @r#" { @@ -679,7 +686,8 @@ async fn test_summarized_settings_update() { ".startedAt" => "[date]", ".finishedAt" => "[date]", ".stats.progressTrace" => "[progressTrace]", - ".stats.writeChannelCongestion" => "[writeChannelCongestion]" + ".stats.writeChannelCongestion" => "[writeChannelCongestion]", + ".stats.internalDatabaseSizes" => "[internalDatabaseSizes]" }, @r###" { diff --git a/crates/meilisearch/tests/dumps/mod.rs b/crates/meilisearch/tests/dumps/mod.rs index ff0b027cb..addcbeeb5 100644 --- a/crates/meilisearch/tests/dumps/mod.rs +++ b/crates/meilisearch/tests/dumps/mod.rs @@ -2237,6 +2237,7 @@ async fn import_dump_v6_containing_batches_and_enqueued_tasks() { ".results[0].duration" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]", + ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", }), name: "batches"); let (indexes, code) = server.list_indexes(None, None).await; diff --git a/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs b/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs index 11ba2882a..9fc4d0e5b 100644 --- a/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs +++ b/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs @@ -193,26 +193,26 @@ async fn check_the_index_scheduler(server: &Server) { // Tests all the batches query parameters let (batches, _) = server.batches_filter("uids=10").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_uids_equal_10"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_uids_equal_10"); let (batches, _) = server.batches_filter("batchUids=10").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_batchUids_equal_10"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_batchUids_equal_10"); let (batches, _) = server.batches_filter("statuses=canceled").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_statuses_equal_canceled"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_statuses_equal_canceled"); // types has already been tested above to retrieve the upgrade database let (batches, _) = server.batches_filter("canceledBy=19").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_canceledBy_equal_19"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_canceledBy_equal_19"); let (batches, _) = server.batches_filter("beforeEnqueuedAt=2025-01-16T16:47:41Z").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeEnqueuedAt_equal_2025-01-16T16_47_41"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeEnqueuedAt_equal_2025-01-16T16_47_41"); let (batches, _) = server.batches_filter("afterEnqueuedAt=2025-01-16T16:47:41Z").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterEnqueuedAt_equal_2025-01-16T16_47_41"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterEnqueuedAt_equal_2025-01-16T16_47_41"); let (batches, _) = server.batches_filter("beforeStartedAt=2025-01-16T16:47:41Z").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeStartedAt_equal_2025-01-16T16_47_41"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeStartedAt_equal_2025-01-16T16_47_41"); let (batches, _) = server.batches_filter("afterStartedAt=2025-01-16T16:47:41Z").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterStartedAt_equal_2025-01-16T16_47_41"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterStartedAt_equal_2025-01-16T16_47_41"); let (batches, _) = server.batches_filter("beforeFinishedAt=2025-01-16T16:47:41Z").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeFinishedAt_equal_2025-01-16T16_47_41"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeFinishedAt_equal_2025-01-16T16_47_41"); let (batches, _) = server.batches_filter("afterFinishedAt=2025-01-16T16:47:41Z").await; - snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterFinishedAt_equal_2025-01-16T16_47_41"); + snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterFinishedAt_equal_2025-01-16T16_47_41"); let (stats, _) = server.stats().await; assert_json_snapshot!(stats, { diff --git a/crates/milli/src/index.rs b/crates/milli/src/index.rs index fcb8962d2..a2d839d03 100644 --- a/crates/milli/src/index.rs +++ b/crates/milli/src/index.rs @@ -3,8 +3,9 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::fs::File; use std::path::Path; -use heed::{types::*, WithoutTls}; +use heed::{types::*, DatabaseStat, WithoutTls}; use heed::{CompactionOption, Database, RoTxn, RwTxn, Unspecified}; +use indexmap::IndexMap; use roaring::RoaringBitmap; use rstar::RTree; use serde::{Deserialize, Serialize}; @@ -1768,6 +1769,109 @@ impl Index { Ok(self.word_docids.remap_data_type::().get(rtxn, word)?.is_some() || self.exact_word_docids.remap_data_type::().get(rtxn, word)?.is_some()) } + + /// Returns the sizes in bytes of each of the index database at the given rtxn. + pub fn database_sizes(&self, rtxn: &RoTxn<'_>) -> heed::Result> { + let Self { + env: _, + main, + external_documents_ids, + word_docids, + exact_word_docids, + word_prefix_docids, + exact_word_prefix_docids, + word_pair_proximity_docids, + word_position_docids, + word_fid_docids, + word_prefix_position_docids, + word_prefix_fid_docids, + field_id_word_count_docids, + facet_id_f64_docids, + facet_id_string_docids, + facet_id_normalized_string_strings, + facet_id_string_fst, + facet_id_exists_docids, + facet_id_is_null_docids, + facet_id_is_empty_docids, + field_id_docid_facet_f64s, + field_id_docid_facet_strings, + vector_arroy, + embedder_category_id, + documents, + } = self; + + fn compute_size(stats: DatabaseStat) -> usize { + let DatabaseStat { + page_size, + depth: _, + branch_pages, + leaf_pages, + overflow_pages, + entries: _, + } = stats; + + (branch_pages + leaf_pages + overflow_pages) * page_size as usize + } + + let mut sizes = IndexMap::new(); + sizes.insert("main", main.stat(rtxn).map(compute_size)?); + sizes + .insert("external_documents_ids", external_documents_ids.stat(rtxn).map(compute_size)?); + sizes.insert("word_docids", word_docids.stat(rtxn).map(compute_size)?); + sizes.insert("exact_word_docids", exact_word_docids.stat(rtxn).map(compute_size)?); + sizes.insert("word_prefix_docids", word_prefix_docids.stat(rtxn).map(compute_size)?); + sizes.insert( + "exact_word_prefix_docids", + exact_word_prefix_docids.stat(rtxn).map(compute_size)?, + ); + sizes.insert( + "word_pair_proximity_docids", + word_pair_proximity_docids.stat(rtxn).map(compute_size)?, + ); + sizes.insert("word_position_docids", word_position_docids.stat(rtxn).map(compute_size)?); + sizes.insert("word_fid_docids", word_fid_docids.stat(rtxn).map(compute_size)?); + sizes.insert( + "word_prefix_position_docids", + word_prefix_position_docids.stat(rtxn).map(compute_size)?, + ); + sizes + .insert("word_prefix_fid_docids", word_prefix_fid_docids.stat(rtxn).map(compute_size)?); + sizes.insert( + "field_id_word_count_docids", + field_id_word_count_docids.stat(rtxn).map(compute_size)?, + ); + sizes.insert("facet_id_f64_docids", facet_id_f64_docids.stat(rtxn).map(compute_size)?); + sizes + .insert("facet_id_string_docids", facet_id_string_docids.stat(rtxn).map(compute_size)?); + sizes.insert( + "facet_id_normalized_string_strings", + facet_id_normalized_string_strings.stat(rtxn).map(compute_size)?, + ); + sizes.insert("facet_id_string_fst", facet_id_string_fst.stat(rtxn).map(compute_size)?); + sizes + .insert("facet_id_exists_docids", facet_id_exists_docids.stat(rtxn).map(compute_size)?); + sizes.insert( + "facet_id_is_null_docids", + facet_id_is_null_docids.stat(rtxn).map(compute_size)?, + ); + sizes.insert( + "facet_id_is_empty_docids", + facet_id_is_empty_docids.stat(rtxn).map(compute_size)?, + ); + sizes.insert( + "field_id_docid_facet_f64s", + field_id_docid_facet_f64s.stat(rtxn).map(compute_size)?, + ); + sizes.insert( + "field_id_docid_facet_strings", + field_id_docid_facet_strings.stat(rtxn).map(compute_size)?, + ); + sizes.insert("vector_arroy", vector_arroy.stat(rtxn).map(compute_size)?); + sizes.insert("embedder_category_id", embedder_category_id.stat(rtxn).map(compute_size)?); + sizes.insert("documents", documents.stat(rtxn).map(compute_size)?); + + Ok(sizes) + } } #[derive(Debug, Deserialize, Serialize)]