mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-10 13:34:30 +01:00
write stats after rebuilding facet distribution
This commit is contained in:
parent
5fde2a3ee1
commit
1d153c1867
@ -10,11 +10,15 @@ use anyhow::Context;
|
|||||||
use file_store::FileStore;
|
use file_store::FileStore;
|
||||||
use indexmap::IndexMap;
|
use indexmap::IndexMap;
|
||||||
use meilisearch_types::milli::documents::DocumentsBatchReader;
|
use meilisearch_types::milli::documents::DocumentsBatchReader;
|
||||||
use milli::heed::types::Str;
|
use milli::heed::types::{SerdeJson, Str};
|
||||||
use milli::heed::{Database, EnvOpenOptions};
|
use milli::heed::{Database, EnvOpenOptions, RoTxn, RwTxn};
|
||||||
use milli::progress::Step;
|
use milli::progress::Step;
|
||||||
|
use milli::{FieldDistribution, Index};
|
||||||
|
use serde::Serialize;
|
||||||
use serde_json::value::RawValue;
|
use serde_json::value::RawValue;
|
||||||
use tempfile::NamedTempFile;
|
use tempfile::NamedTempFile;
|
||||||
|
use time::OffsetDateTime;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::try_opening_database;
|
use crate::try_opening_database;
|
||||||
use crate::uuid_codec::UuidCodec;
|
use crate::uuid_codec::UuidCodec;
|
||||||
@ -100,20 +104,30 @@ fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> {
|
|||||||
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
|
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
|
||||||
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
|
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
|
||||||
|
|
||||||
let sched_rtxn = env.read_txn()?;
|
let mut sched_wtxn = env.write_txn()?;
|
||||||
|
|
||||||
let index_mapping: Database<Str, UuidCodec> =
|
let index_mapping: Database<Str, UuidCodec> =
|
||||||
try_opening_database(&env, &sched_rtxn, "index-mapping")?;
|
try_opening_database(&env, &sched_wtxn, "index-mapping")?;
|
||||||
|
let stats_db: Database<UuidCodec, SerdeJson<IndexStats>> =
|
||||||
|
try_opening_database(&env, &sched_wtxn, "index-stats").with_context(|| {
|
||||||
|
format!("While trying to open {:?}", index_scheduler_path.display())
|
||||||
|
})?;
|
||||||
|
|
||||||
let index_count =
|
let index_count =
|
||||||
index_mapping.len(&sched_rtxn).context("while reading the number of indexes")?;
|
index_mapping.len(&sched_wtxn).context("while reading the number of indexes")?;
|
||||||
|
|
||||||
|
// FIXME: not ideal, we have to pre-populate all indexes to prevent double borrow of sched_wtxn
|
||||||
|
// 1. immutably for the iteration
|
||||||
|
// 2. mutably for updating index stats
|
||||||
|
let indexes: Vec<_> = index_mapping
|
||||||
|
.iter(&sched_wtxn)?
|
||||||
|
.map(|res| res.map(|(uid, uuid)| (uid.to_owned(), uuid)))
|
||||||
|
.collect();
|
||||||
|
|
||||||
let progress = milli::progress::Progress::default();
|
let progress = milli::progress::Progress::default();
|
||||||
let finished = AtomicBool::new(false);
|
let finished = AtomicBool::new(false);
|
||||||
|
|
||||||
std::thread::scope(|scope| {
|
std::thread::scope(|scope| {
|
||||||
let indexes = index_mapping.iter(&sched_rtxn)?;
|
|
||||||
|
|
||||||
let display_progress = std::thread::Builder::new()
|
let display_progress = std::thread::Builder::new()
|
||||||
.name("display_progress".into())
|
.name("display_progress".into())
|
||||||
.spawn_scoped(scope, || {
|
.spawn_scoped(scope, || {
|
||||||
@ -128,10 +142,10 @@ fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> {
|
|||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
for (index_index, result) in indexes.enumerate() {
|
for (index_index, result) in indexes.into_iter().enumerate() {
|
||||||
let (uid, uuid) = result?;
|
let (uid, uuid) = result?;
|
||||||
progress.update_progress(VariableNameStep::new(
|
progress.update_progress(VariableNameStep::new(
|
||||||
uid,
|
&uid,
|
||||||
index_index as u32,
|
index_index as u32,
|
||||||
index_count as u32,
|
index_count as u32,
|
||||||
));
|
));
|
||||||
@ -155,10 +169,14 @@ fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> {
|
|||||||
milli::update::new::reindex::field_distribution(&index, &mut index_txn, &progress)
|
milli::update::new::reindex::field_distribution(&index, &mut index_txn, &progress)
|
||||||
.context("while rebuilding field distribution")?;
|
.context("while rebuilding field distribution")?;
|
||||||
|
|
||||||
|
let stats = IndexStats::new(&index, &index_txn)
|
||||||
|
.with_context(|| format!("computing stats for index `{uid}`"))?;
|
||||||
|
store_stats_of(stats_db, uuid, &mut sched_wtxn, &uid, &stats)?;
|
||||||
|
|
||||||
index_txn.commit().context("while committing the write txn for the updated index")?;
|
index_txn.commit().context("while committing the write txn for the updated index")?;
|
||||||
}
|
}
|
||||||
|
|
||||||
sched_rtxn.commit().context("while committing the write txn for the index-scheduler")?;
|
sched_wtxn.commit().context("while committing the write txn for the index-scheduler")?;
|
||||||
|
|
||||||
finished.store(true, std::sync::atomic::Ordering::Relaxed);
|
finished.store(true, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
|
||||||
@ -203,3 +221,60 @@ impl Step for VariableNameStep {
|
|||||||
self.total
|
self.total
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn store_stats_of(
|
||||||
|
stats_db: Database<UuidCodec, SerdeJson<IndexStats>>,
|
||||||
|
index_uuid: Uuid,
|
||||||
|
sched_wtxn: &mut RwTxn,
|
||||||
|
index_uid: &str,
|
||||||
|
stats: &IndexStats,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
stats_db
|
||||||
|
.put(sched_wtxn, &index_uuid, stats)
|
||||||
|
.with_context(|| format!("storing stats for index `{index_uid}`"))?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The statistics that can be computed from an `Index` object.
|
||||||
|
#[derive(Serialize, Debug)]
|
||||||
|
pub struct IndexStats {
|
||||||
|
/// Number of documents in the index.
|
||||||
|
pub number_of_documents: u64,
|
||||||
|
/// Size taken up by the index' DB, in bytes.
|
||||||
|
///
|
||||||
|
/// This includes the size taken by both the used and free pages of the DB, and as the free pages
|
||||||
|
/// are not returned to the disk after a deletion, this number is typically larger than
|
||||||
|
/// `used_database_size` that only includes the size of the used pages.
|
||||||
|
pub database_size: u64,
|
||||||
|
/// Size taken by the used pages of the index' DB, in bytes.
|
||||||
|
///
|
||||||
|
/// As the DB backend does not return to the disk the pages that are not currently used by the DB,
|
||||||
|
/// this value is typically smaller than `database_size`.
|
||||||
|
pub used_database_size: u64,
|
||||||
|
/// Association of every field name with the number of times it occurs in the documents.
|
||||||
|
pub field_distribution: FieldDistribution,
|
||||||
|
/// Creation date of the index.
|
||||||
|
#[serde(with = "time::serde::rfc3339")]
|
||||||
|
pub created_at: OffsetDateTime,
|
||||||
|
/// Date of the last update of the index.
|
||||||
|
#[serde(with = "time::serde::rfc3339")]
|
||||||
|
pub updated_at: OffsetDateTime,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IndexStats {
|
||||||
|
/// Compute the stats of an index
|
||||||
|
///
|
||||||
|
/// # Parameters
|
||||||
|
///
|
||||||
|
/// - rtxn: a RO transaction for the index, obtained from `Index::read_txn()`.
|
||||||
|
pub fn new(index: &Index, rtxn: &RoTxn) -> milli::Result<Self> {
|
||||||
|
Ok(IndexStats {
|
||||||
|
number_of_documents: index.number_of_documents(rtxn)?,
|
||||||
|
database_size: index.on_disk_size()?,
|
||||||
|
used_database_size: index.used_size()?,
|
||||||
|
field_distribution: index.field_distribution(rtxn)?,
|
||||||
|
created_at: index.created_at(rtxn)?,
|
||||||
|
updated_at: index.updated_at(rtxn)?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user