Merge pull request #5457 from meilisearch/show-database-sizes-changes

Show database sizes batches
This commit is contained in:
Clément Renault 2025-03-26 10:19:40 +00:00 committed by GitHub
commit 9ce7ccfbe7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 226 additions and 48 deletions

2
Cargo.lock generated
View File

@ -2745,6 +2745,7 @@ dependencies = [
"bincode",
"bumpalo",
"bumparaw-collections",
"byte-unit",
"convert_case 0.6.0",
"crossbeam-channel",
"csv",
@ -2753,6 +2754,7 @@ dependencies = [
"enum-iterator",
"file-store",
"flate2",
"indexmap",
"insta",
"maplit",
"meili-snap",

View File

@ -326,6 +326,7 @@ pub(crate) mod test {
index_uids: maplit::btreemap! { "doggo".to_string() => 1 },
progress_trace: Default::default(),
write_channel_congestion: None,
internal_database_sizes: Default::default(),
},
enqueued_at: Some(BatchEnqueuedAt {
earliest: datetime!(2022-11-11 0:00 UTC),

View File

@ -13,6 +13,7 @@ license.workspace = true
[dependencies]
anyhow = "1.0.95"
bincode = "1.3.3"
byte-unit = "5.1.6"
bumpalo = "3.16.0"
bumparaw-collections = "0.1.4"
convert_case = "0.6.0"
@ -22,6 +23,7 @@ dump = { path = "../dump" }
enum-iterator = "2.1.0"
file-store = { path = "../file-store" }
flate2 = "1.0.35"
indexmap = "2.7.0"
meilisearch-auth = { path = "../meilisearch-auth" }
meilisearch-types = { path = "../meilisearch-types" }
memmap2 = "0.9.5"

View File

@ -344,6 +344,7 @@ pub fn snapshot_batch(batch: &Batch) -> String {
let Batch { uid, details, stats, started_at, finished_at, progress: _, enqueued_at } = batch;
let stats = BatchStats {
progress_trace: Default::default(),
internal_database_sizes: Default::default(),
write_channel_congestion: None,
..stats.clone()
};

View File

@ -64,6 +64,13 @@ make_enum_progress! {
}
}
make_enum_progress! {
pub enum FinalizingIndexStep {
Committing,
ComputingStats,
}
}
make_enum_progress! {
pub enum TaskCancelationProgress {
RetrievingTasks,

View File

@ -24,6 +24,7 @@ use meilisearch_types::error::ResponseError;
use meilisearch_types::heed::{Env, WithoutTls};
use meilisearch_types::milli;
use meilisearch_types::tasks::Status;
use process_batch::ProcessBatchInfo;
use rayon::current_num_threads;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use roaring::RoaringBitmap;
@ -223,16 +224,16 @@ impl IndexScheduler {
let mut stop_scheduler_forever = false;
let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?;
let mut canceled = RoaringBitmap::new();
let mut congestion = None;
let mut process_batch_info = ProcessBatchInfo::default();
match res {
Ok((tasks, cong)) => {
Ok((tasks, info)) => {
#[cfg(test)]
self.breakpoint(crate::test_utils::Breakpoint::ProcessBatchSucceeded);
let (task_progress, task_progress_obj) = AtomicTaskStep::new(tasks.len() as u32);
progress.update_progress(task_progress_obj);
congestion = cong;
process_batch_info = info;
let mut success = 0;
let mut failure = 0;
let mut canceled_by = None;
@ -350,6 +351,9 @@ impl IndexScheduler {
// We must re-add the canceled task so they're part of the same batch.
ids |= canceled;
let ProcessBatchInfo { congestion, pre_commit_dabases_sizes, post_commit_dabases_sizes } =
process_batch_info;
processing_batch.stats.progress_trace =
progress.accumulated_durations().into_iter().map(|(k, v)| (k, v.into())).collect();
processing_batch.stats.write_channel_congestion = congestion.map(|congestion| {
@ -359,6 +363,30 @@ impl IndexScheduler {
congestion_info.insert("blocking_ratio".into(), congestion.congestion_ratio().into());
congestion_info
});
processing_batch.stats.internal_database_sizes = pre_commit_dabases_sizes
.iter()
.flat_map(|(dbname, pre_size)| {
post_commit_dabases_sizes
.get(dbname)
.map(|post_size| {
use byte_unit::{Byte, UnitType::Binary};
use std::cmp::Ordering::{Equal, Greater, Less};
let post = Byte::from_u64(*post_size as u64).get_appropriate_unit(Binary);
let diff_size = post_size.abs_diff(*pre_size) as u64;
let diff = Byte::from_u64(diff_size).get_appropriate_unit(Binary);
let sign = match post_size.cmp(pre_size) {
Equal => return None,
Greater => "+",
Less => "-",
};
Some((dbname.to_string(), format!("{post:#.2} ({sign}{diff:#.2})").into()))
})
.into_iter()
.flatten()
})
.collect();
if let Some(congestion) = congestion {
tracing::debug!(

View File

@ -12,7 +12,7 @@ use roaring::RoaringBitmap;
use super::create_batch::Batch;
use crate::processing::{
AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress,
AtomicBatchStep, AtomicTaskStep, CreateIndexProgress, DeleteIndexProgress, FinalizingIndexStep,
InnerSwappingTwoIndexes, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
UpdateIndexProgress,
};
@ -22,6 +22,16 @@ use crate::utils::{
};
use crate::{Error, IndexScheduler, Result, TaskId};
#[derive(Debug, Default)]
pub struct ProcessBatchInfo {
/// The write channel congestion. None when unavailable: settings update.
pub congestion: Option<ChannelCongestion>,
/// The sizes of the different databases before starting the indexation.
pub pre_commit_dabases_sizes: indexmap::IndexMap<&'static str, usize>,
/// The sizes of the different databases after commiting the indexation.
pub post_commit_dabases_sizes: indexmap::IndexMap<&'static str, usize>,
}
impl IndexScheduler {
/// Apply the operation associated with the given batch.
///
@ -35,7 +45,7 @@ impl IndexScheduler {
batch: Batch,
current_batch: &mut ProcessingBatch,
progress: Progress,
) -> Result<(Vec<Task>, Option<ChannelCongestion>)> {
) -> Result<(Vec<Task>, ProcessBatchInfo)> {
#[cfg(test)]
{
self.maybe_fail(crate::test_utils::FailureLocation::InsideProcessBatch)?;
@ -76,7 +86,7 @@ impl IndexScheduler {
canceled_tasks.push(task);
Ok((canceled_tasks, None))
Ok((canceled_tasks, ProcessBatchInfo::default()))
}
Batch::TaskDeletions(mut tasks) => {
// 1. Retrieve the tasks that matched the query at enqueue-time.
@ -115,14 +125,14 @@ impl IndexScheduler {
_ => unreachable!(),
}
}
Ok((tasks, None))
}
Batch::SnapshotCreation(tasks) => {
self.process_snapshot(progress, tasks).map(|tasks| (tasks, None))
}
Batch::Dump(task) => {
self.process_dump_creation(progress, task).map(|tasks| (tasks, None))
Ok((tasks, ProcessBatchInfo::default()))
}
Batch::SnapshotCreation(tasks) => self
.process_snapshot(progress, tasks)
.map(|tasks| (tasks, ProcessBatchInfo::default())),
Batch::Dump(task) => self
.process_dump_creation(progress, task)
.map(|tasks| (tasks, ProcessBatchInfo::default())),
Batch::IndexOperation { op, must_create_index } => {
let index_uid = op.index_uid().to_string();
let index = if must_create_index {
@ -139,10 +149,12 @@ impl IndexScheduler {
.set_currently_updating_index(Some((index_uid.clone(), index.clone())));
let mut index_wtxn = index.write_txn()?;
let pre_commit_dabases_sizes = index.database_sizes(&index_wtxn)?;
let (tasks, congestion) =
self.apply_index_operation(&mut index_wtxn, &index, op, progress)?;
self.apply_index_operation(&mut index_wtxn, &index, op, &progress)?;
{
progress.update_progress(FinalizingIndexStep::Committing);
let span = tracing::trace_span!(target: "indexing::scheduler", "commit");
let _entered = span.enter();
@ -153,12 +165,15 @@ impl IndexScheduler {
// stats of the index. Since the tasks have already been processed and
// this is a non-critical operation. If it fails, we should not fail
// the entire batch.
let mut post_commit_dabases_sizes = None;
let res = || -> Result<()> {
progress.update_progress(FinalizingIndexStep::ComputingStats);
let index_rtxn = index.read_txn()?;
let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)
.map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?;
let mut wtxn = self.env.write_txn()?;
self.index_mapper.store_stats_of(&mut wtxn, &index_uid, &stats)?;
post_commit_dabases_sizes = Some(index.database_sizes(&index_rtxn)?);
wtxn.commit()?;
Ok(())
}();
@ -171,7 +186,16 @@ impl IndexScheduler {
),
}
Ok((tasks, congestion))
let info = ProcessBatchInfo {
congestion,
// In case we fail to the get post-commit sizes we decide
// that nothing changed and use the pre-commit sizes.
post_commit_dabases_sizes: post_commit_dabases_sizes
.unwrap_or_else(|| pre_commit_dabases_sizes.clone()),
pre_commit_dabases_sizes,
};
Ok((tasks, info))
}
Batch::IndexCreation { index_uid, primary_key, task } => {
progress.update_progress(CreateIndexProgress::CreatingTheIndex);
@ -239,7 +263,7 @@ impl IndexScheduler {
),
}
Ok((vec![task], None))
Ok((vec![task], ProcessBatchInfo::default()))
}
Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => {
progress.update_progress(DeleteIndexProgress::DeletingTheIndex);
@ -273,7 +297,9 @@ impl IndexScheduler {
};
}
Ok((tasks, None))
// Here we could also show that all the internal database sizes goes to 0
// but it would mean opening the index and that's costly.
Ok((tasks, ProcessBatchInfo::default()))
}
Batch::IndexSwap { mut task } => {
progress.update_progress(SwappingTheIndexes::EnsuringCorrectnessOfTheSwap);
@ -321,7 +347,7 @@ impl IndexScheduler {
}
wtxn.commit()?;
task.status = Status::Succeeded;
Ok((vec![task], None))
Ok((vec![task], ProcessBatchInfo::default()))
}
Batch::UpgradeDatabase { mut tasks } => {
let KindWithContent::UpgradeDatabase { from } = tasks.last().unwrap().kind else {
@ -351,7 +377,7 @@ impl IndexScheduler {
task.error = None;
}
Ok((tasks, None))
Ok((tasks, ProcessBatchInfo::default()))
}
}
}

View File

@ -32,7 +32,7 @@ impl IndexScheduler {
index_wtxn: &mut RwTxn<'i>,
index: &'i Index,
operation: IndexOperation,
progress: Progress,
progress: &Progress,
) -> Result<(Vec<Task>, Option<ChannelCongestion>)> {
let indexer_alloc = Bump::new();
let started_processing_at = std::time::Instant::now();
@ -186,7 +186,7 @@ impl IndexScheduler {
&document_changes,
embedders,
&|| must_stop_processing.get(),
&progress,
progress,
)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?,
);
@ -307,7 +307,7 @@ impl IndexScheduler {
&document_changes,
embedders,
&|| must_stop_processing.get(),
&progress,
progress,
)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?,
);
@ -465,7 +465,7 @@ impl IndexScheduler {
&document_changes,
embedders,
&|| must_stop_processing.get(),
&progress,
progress,
)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?,
);
@ -520,7 +520,7 @@ impl IndexScheduler {
index_uid: index_uid.clone(),
tasks: cleared_tasks,
},
progress.clone(),
progress,
)?;
let (settings_tasks, _congestion) = self.apply_index_operation(

View File

@ -64,4 +64,6 @@ pub struct BatchStats {
pub progress_trace: serde_json::Map<String, serde_json::Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub write_channel_congestion: Option<serde_json::Map<String, serde_json::Value>>,
#[serde(default, skip_serializing_if = "serde_json::Map::is_empty")]
pub internal_database_sizes: serde_json::Map<String, serde_json::Value>,
}

View File

@ -30,11 +30,7 @@ actix-web = { version = "4.9.0", default-features = false, features = [
anyhow = { version = "1.0.95", features = ["backtrace"] }
async-trait = "0.1.85"
bstr = "1.11.3"
byte-unit = { version = "5.1.6", default-features = false, features = [
"std",
"byte",
"serde",
] }
byte-unit = { version = "5.1.6", features = ["serde"] }
bytes = "1.9.0"
clap = { version = "4.5.24", features = ["derive", "env"] }
crossbeam-channel = "0.5.14"

View File

@ -281,7 +281,8 @@ async fn test_summarized_document_addition_or_update() {
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.progressTrace" => "[progressTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
".stats.writeChannelCongestion" => "[writeChannelCongestion]",
".stats.internalDatabaseSizes" => "[internalDatabaseSizes]"
},
@r###"
{
@ -303,7 +304,8 @@ async fn test_summarized_document_addition_or_update() {
"test": 1
},
"progressTrace": "[progressTrace]",
"writeChannelCongestion": "[writeChannelCongestion]"
"writeChannelCongestion": "[writeChannelCongestion]",
"internalDatabaseSizes": "[internalDatabaseSizes]"
},
"duration": "[duration]",
"startedAt": "[date]",
@ -322,7 +324,8 @@ async fn test_summarized_document_addition_or_update() {
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.progressTrace" => "[progressTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
".stats.writeChannelCongestion" => "[writeChannelCongestion]",
".stats.internalDatabaseSizes" => "[internalDatabaseSizes]"
},
@r###"
{
@ -407,7 +410,8 @@ async fn test_summarized_delete_documents_by_batch() {
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.progressTrace" => "[progressTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
".stats.writeChannelCongestion" => "[writeChannelCongestion]",
".stats.internalDatabaseSizes" => "[internalDatabaseSizes]"
},
@r###"
{
@ -495,7 +499,8 @@ async fn test_summarized_delete_documents_by_filter() {
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.progressTrace" => "[progressTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
".stats.writeChannelCongestion" => "[writeChannelCongestion]",
".stats.internalDatabaseSizes" => "[internalDatabaseSizes]"
},
@r###"
{
@ -537,7 +542,8 @@ async fn test_summarized_delete_documents_by_filter() {
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.progressTrace" => "[progressTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
".stats.writeChannelCongestion" => "[writeChannelCongestion]",
".stats.internalDatabaseSizes" => "[internalDatabaseSizes]"
},
@r#"
{
@ -623,7 +629,8 @@ async fn test_summarized_delete_document_by_id() {
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.progressTrace" => "[progressTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
".stats.writeChannelCongestion" => "[writeChannelCongestion]",
".stats.internalDatabaseSizes" => "[internalDatabaseSizes]"
},
@r#"
{
@ -679,7 +686,8 @@ async fn test_summarized_settings_update() {
".startedAt" => "[date]",
".finishedAt" => "[date]",
".stats.progressTrace" => "[progressTrace]",
".stats.writeChannelCongestion" => "[writeChannelCongestion]"
".stats.writeChannelCongestion" => "[writeChannelCongestion]",
".stats.internalDatabaseSizes" => "[internalDatabaseSizes]"
},
@r###"
{

View File

@ -2237,6 +2237,7 @@ async fn import_dump_v6_containing_batches_and_enqueued_tasks() {
".results[0].duration" => "[date]",
".results[0].stats.progressTrace" => "[progressTrace]",
".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]",
".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]",
}), name: "batches");
let (indexes, code) = server.list_indexes(None, None).await;

View File

@ -193,26 +193,26 @@ async fn check_the_index_scheduler(server: &Server) {
// Tests all the batches query parameters
let (batches, _) = server.batches_filter("uids=10").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_uids_equal_10");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_uids_equal_10");
let (batches, _) = server.batches_filter("batchUids=10").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_batchUids_equal_10");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_batchUids_equal_10");
let (batches, _) = server.batches_filter("statuses=canceled").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_statuses_equal_canceled");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_statuses_equal_canceled");
// types has already been tested above to retrieve the upgrade database
let (batches, _) = server.batches_filter("canceledBy=19").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_canceledBy_equal_19");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_canceledBy_equal_19");
let (batches, _) = server.batches_filter("beforeEnqueuedAt=2025-01-16T16:47:41Z").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeEnqueuedAt_equal_2025-01-16T16_47_41");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeEnqueuedAt_equal_2025-01-16T16_47_41");
let (batches, _) = server.batches_filter("afterEnqueuedAt=2025-01-16T16:47:41Z").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterEnqueuedAt_equal_2025-01-16T16_47_41");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterEnqueuedAt_equal_2025-01-16T16_47_41");
let (batches, _) = server.batches_filter("beforeStartedAt=2025-01-16T16:47:41Z").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeStartedAt_equal_2025-01-16T16_47_41");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeStartedAt_equal_2025-01-16T16_47_41");
let (batches, _) = server.batches_filter("afterStartedAt=2025-01-16T16:47:41Z").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterStartedAt_equal_2025-01-16T16_47_41");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterStartedAt_equal_2025-01-16T16_47_41");
let (batches, _) = server.batches_filter("beforeFinishedAt=2025-01-16T16:47:41Z").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeFinishedAt_equal_2025-01-16T16_47_41");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_beforeFinishedAt_equal_2025-01-16T16_47_41");
let (batches, _) = server.batches_filter("afterFinishedAt=2025-01-16T16:47:41Z").await;
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterFinishedAt_equal_2025-01-16T16_47_41");
snapshot!(json_string!(batches, { ".results[0].duration" => "[duration]", ".results[0].enqueuedAt" => "[date]", ".results[0].startedAt" => "[date]", ".results[0].finishedAt" => "[date]", ".results[0].stats.progressTrace" => "[progressTrace]", ".results[0].stats.internalDatabaseSizes" => "[internalDatabaseSizes]", ".results[0].stats.writeChannelCongestion" => "[writeChannelCongestion]" }), name: "batches_filter_afterFinishedAt_equal_2025-01-16T16_47_41");
let (stats, _) = server.stats().await;
assert_json_snapshot!(stats, {

View File

@ -3,8 +3,9 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::fs::File;
use std::path::Path;
use heed::{types::*, WithoutTls};
use heed::{types::*, DatabaseStat, WithoutTls};
use heed::{CompactionOption, Database, RoTxn, RwTxn, Unspecified};
use indexmap::IndexMap;
use roaring::RoaringBitmap;
use rstar::RTree;
use serde::{Deserialize, Serialize};
@ -1768,6 +1769,109 @@ impl Index {
Ok(self.word_docids.remap_data_type::<DecodeIgnore>().get(rtxn, word)?.is_some()
|| self.exact_word_docids.remap_data_type::<DecodeIgnore>().get(rtxn, word)?.is_some())
}
/// Returns the sizes in bytes of each of the index database at the given rtxn.
pub fn database_sizes(&self, rtxn: &RoTxn<'_>) -> heed::Result<IndexMap<&'static str, usize>> {
let Self {
env: _,
main,
external_documents_ids,
word_docids,
exact_word_docids,
word_prefix_docids,
exact_word_prefix_docids,
word_pair_proximity_docids,
word_position_docids,
word_fid_docids,
word_prefix_position_docids,
word_prefix_fid_docids,
field_id_word_count_docids,
facet_id_f64_docids,
facet_id_string_docids,
facet_id_normalized_string_strings,
facet_id_string_fst,
facet_id_exists_docids,
facet_id_is_null_docids,
facet_id_is_empty_docids,
field_id_docid_facet_f64s,
field_id_docid_facet_strings,
vector_arroy,
embedder_category_id,
documents,
} = self;
fn compute_size(stats: DatabaseStat) -> usize {
let DatabaseStat {
page_size,
depth: _,
branch_pages,
leaf_pages,
overflow_pages,
entries: _,
} = stats;
(branch_pages + leaf_pages + overflow_pages) * page_size as usize
}
let mut sizes = IndexMap::new();
sizes.insert("main", main.stat(rtxn).map(compute_size)?);
sizes
.insert("external_documents_ids", external_documents_ids.stat(rtxn).map(compute_size)?);
sizes.insert("word_docids", word_docids.stat(rtxn).map(compute_size)?);
sizes.insert("exact_word_docids", exact_word_docids.stat(rtxn).map(compute_size)?);
sizes.insert("word_prefix_docids", word_prefix_docids.stat(rtxn).map(compute_size)?);
sizes.insert(
"exact_word_prefix_docids",
exact_word_prefix_docids.stat(rtxn).map(compute_size)?,
);
sizes.insert(
"word_pair_proximity_docids",
word_pair_proximity_docids.stat(rtxn).map(compute_size)?,
);
sizes.insert("word_position_docids", word_position_docids.stat(rtxn).map(compute_size)?);
sizes.insert("word_fid_docids", word_fid_docids.stat(rtxn).map(compute_size)?);
sizes.insert(
"word_prefix_position_docids",
word_prefix_position_docids.stat(rtxn).map(compute_size)?,
);
sizes
.insert("word_prefix_fid_docids", word_prefix_fid_docids.stat(rtxn).map(compute_size)?);
sizes.insert(
"field_id_word_count_docids",
field_id_word_count_docids.stat(rtxn).map(compute_size)?,
);
sizes.insert("facet_id_f64_docids", facet_id_f64_docids.stat(rtxn).map(compute_size)?);
sizes
.insert("facet_id_string_docids", facet_id_string_docids.stat(rtxn).map(compute_size)?);
sizes.insert(
"facet_id_normalized_string_strings",
facet_id_normalized_string_strings.stat(rtxn).map(compute_size)?,
);
sizes.insert("facet_id_string_fst", facet_id_string_fst.stat(rtxn).map(compute_size)?);
sizes
.insert("facet_id_exists_docids", facet_id_exists_docids.stat(rtxn).map(compute_size)?);
sizes.insert(
"facet_id_is_null_docids",
facet_id_is_null_docids.stat(rtxn).map(compute_size)?,
);
sizes.insert(
"facet_id_is_empty_docids",
facet_id_is_empty_docids.stat(rtxn).map(compute_size)?,
);
sizes.insert(
"field_id_docid_facet_f64s",
field_id_docid_facet_f64s.stat(rtxn).map(compute_size)?,
);
sizes.insert(
"field_id_docid_facet_strings",
field_id_docid_facet_strings.stat(rtxn).map(compute_size)?,
);
sizes.insert("vector_arroy", vector_arroy.stat(rtxn).map(compute_size)?);
sizes.insert("embedder_category_id", embedder_category_id.stat(rtxn).map(compute_size)?);
sizes.insert("documents", documents.stat(rtxn).map(compute_size)?);
Ok(sizes)
}
}
#[derive(Debug, Deserialize, Serialize)]