Eagerly compute stats as fallback to the cache.

- Refactor all around to avoid spawning indexes more times than necessary
This commit is contained in:
Louis Dureuil 2023-02-28 15:24:31 +01:00 committed by Tamo
parent 3bbf760542
commit 076a3d371c
4 changed files with 83 additions and 21 deletions

View File

@ -847,8 +847,10 @@ impl IndexScheduler {
// this is a non-critical operation. If it fails, we should not fail // this is a non-critical operation. If it fails, we should not fail
// the entire batch. // the entire batch.
let res = || -> Result<()> { let res = || -> Result<()> {
let index_rtxn = index.read_txn()?;
let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)?;
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
self.index_mapper.compute_and_store_stats_of(&mut wtxn, &index_uid)?; self.index_mapper.store_stats_of(&mut wtxn, &index_uid, stats)?;
wtxn.commit()?; wtxn.commit()?;
Ok(()) Ok(())
}(); }();
@ -888,6 +890,10 @@ impl IndexScheduler {
)?; )?;
index_wtxn.commit()?; index_wtxn.commit()?;
} }
// drop rtxn before starting a new wtxn on the same db
rtxn.commit()?;
task.status = Status::Succeeded; task.status = Status::Succeeded;
task.details = Some(Details::IndexInfo { primary_key }); task.details = Some(Details::IndexInfo { primary_key });
@ -897,7 +903,9 @@ impl IndexScheduler {
// the entire batch. // the entire batch.
let res = || -> Result<()> { let res = || -> Result<()> {
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
self.index_mapper.compute_and_store_stats_of(&mut wtxn, &index_uid)?; let index_rtxn = index.read_txn()?;
let stats = crate::index_mapper::IndexStats::new(&index, &index_rtxn)?;
self.index_mapper.store_stats_of(&mut wtxn, &index_uid, stats)?;
wtxn.commit()?; wtxn.commit()?;
Ok(()) Ok(())
}(); }();

View File

@ -54,8 +54,11 @@ pub struct IndexMapper {
/// Map an index name with an index uuid currently available on disk. /// Map an index name with an index uuid currently available on disk.
pub(crate) index_mapping: Database<Str, UuidCodec>, pub(crate) index_mapping: Database<Str, UuidCodec>,
/// Map an index name with the cached stats associated to the index. /// Map an index UUID with the cached stats associated to the index.
pub(crate) index_stats: Database<Str, SerdeJson<IndexStats>>, ///
/// Using an UUID forces to use the index_mapping table to recover the index behind a name, ensuring
/// consistency wrt index swapping.
pub(crate) index_stats: Database<UuidCodec, SerdeJson<IndexStats>>,
/// Path to the folder where the LMDB environments of each index are. /// Path to the folder where the LMDB environments of each index are.
base_path: PathBuf, base_path: PathBuf,
@ -80,15 +83,39 @@ pub enum IndexStatus {
Available(Index), Available(Index),
} }
/// The statistics that can be computed from an `Index` object.
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct IndexStats { pub struct IndexStats {
/// Number of documents in the index.
pub number_of_documents: u64, pub number_of_documents: u64,
/// Size of the index' DB, in bytes.
pub database_size: u64, pub database_size: u64,
/// Association of every field name with the number of times it occurs in the documents.
pub field_distribution: FieldDistribution, pub field_distribution: FieldDistribution,
/// Creation date of the index.
pub created_at: OffsetDateTime, pub created_at: OffsetDateTime,
/// Date of the last update of the index.
pub updated_at: OffsetDateTime, pub updated_at: OffsetDateTime,
} }
impl IndexStats {
/// Compute the stats of an index
///
/// # Parameters
///
/// - rtxn: a RO transaction for the index, obtained from `Index::read_txn()`.
pub fn new(index: &Index, rtxn: &RoTxn) -> Result<Self> {
let database_size = index.on_disk_size()?;
Ok(IndexStats {
number_of_documents: index.number_of_documents(rtxn)?,
database_size,
field_distribution: index.field_distribution(rtxn)?,
created_at: index.created_at(rtxn)?,
updated_at: index.updated_at(rtxn)?,
})
}
}
impl IndexMapper { impl IndexMapper {
pub fn new( pub fn new(
env: &Env, env: &Env,
@ -149,12 +176,14 @@ impl IndexMapper {
/// Removes the index from the mapping table and the in-memory index map /// Removes the index from the mapping table and the in-memory index map
/// but keeps the associated tasks. /// but keeps the associated tasks.
pub fn delete_index(&self, mut wtxn: RwTxn, name: &str) -> Result<()> { pub fn delete_index(&self, mut wtxn: RwTxn, name: &str) -> Result<()> {
self.index_stats.delete(&mut wtxn, name)?;
let uuid = self let uuid = self
.index_mapping .index_mapping
.get(&wtxn, name)? .get(&wtxn, name)?
.ok_or_else(|| Error::IndexNotFound(name.to_string()))?; .ok_or_else(|| Error::IndexNotFound(name.to_string()))?;
// Not an error if the index had no stats in cache.
self.index_stats.delete(&mut wtxn, &uuid)?;
// Once we retrieved the UUID of the index we remove it from the mapping table. // Once we retrieved the UUID of the index we remove it from the mapping table.
assert!(self.index_mapping.delete(&mut wtxn, name)?); assert!(self.index_mapping.delete(&mut wtxn, name)?);
@ -375,26 +404,42 @@ impl IndexMapper {
Ok(()) Ok(())
} }
/// Return the stored stats of an index. /// The stats of an index.
///
/// If available in the cache, they are directly returned.
/// Otherwise, the `Index` is opened to compute the stats on the fly (the result is not cached).
/// The stats for an index are cached after each `Index` update.
pub fn stats_of(&self, rtxn: &RoTxn, index_uid: &str) -> Result<IndexStats> { pub fn stats_of(&self, rtxn: &RoTxn, index_uid: &str) -> Result<IndexStats> {
self.index_stats let uuid = self
.index_mapping
.get(rtxn, index_uid)? .get(rtxn, index_uid)?
.ok_or_else(|| Error::IndexNotFound(index_uid.to_string())) .ok_or_else(|| Error::IndexNotFound(index_uid.to_string()))?;
match self.index_stats.get(rtxn, &uuid)? {
Some(stats) => Ok(stats),
None => {
let index = self.index(rtxn, index_uid)?;
let index_rtxn = index.read_txn()?;
IndexStats::new(&index, &index_rtxn)
}
}
} }
/// Return the stats of an index and write it in the index-mapper database. /// Stores the new stats for an index.
pub fn compute_and_store_stats_of(&self, wtxn: &mut RwTxn, index_uid: &str) -> Result<()> { ///
let index = self.index(wtxn, index_uid)?; /// Expected usage is to compute the stats the index using `IndexStats::new`, the pass it to this function.
let database_size = index.on_disk_size()?; pub fn store_stats_of(
let rtxn = index.read_txn()?; &self,
let stats = IndexStats { wtxn: &mut RwTxn,
number_of_documents: index.number_of_documents(&rtxn)?, index_uid: &str,
database_size, stats: IndexStats,
field_distribution: index.field_distribution(&rtxn)?, ) -> Result<()> {
created_at: index.created_at(&rtxn)?, let uuid = self
updated_at: index.updated_at(&rtxn)?, .index_mapping
}; .get(wtxn, index_uid)?
self.index_stats.put(wtxn, index_uid, &stats)?; .ok_or_else(|| Error::IndexNotFound(index_uid.to_string()))?;
self.index_stats.put(wtxn, &uuid, &stats)?;
Ok(()) Ok(())
} }

View File

@ -1245,9 +1245,14 @@ struct IndexBudget {
task_db_size: usize, task_db_size: usize,
} }
/// The statistics that can be computed from an `Index` object and the scheduler.
///
/// Compared with `index_mapper::IndexStats`, it adds the scheduling status.
#[derive(Debug)] #[derive(Debug)]
pub struct IndexStats { pub struct IndexStats {
/// Whether this index is currently performing indexation, according to the scheduler.
pub is_indexing: bool, pub is_indexing: bool,
/// Internal stats computed from the index.
pub inner_stats: index_mapper::IndexStats, pub inner_stats: index_mapper::IndexStats,
} }

View File

@ -220,11 +220,15 @@ pub async fn delete_index(
Ok(HttpResponse::Accepted().json(task)) Ok(HttpResponse::Accepted().json(task))
} }
/// Stats of an `Index`, as known to the `stats` route.
#[derive(Serialize, Debug)] #[derive(Serialize, Debug)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct IndexStats { pub struct IndexStats {
/// Number of documents in the index
pub number_of_documents: u64, pub number_of_documents: u64,
/// Whether the index is currently performing indexation, according to the scheduler.
pub is_indexing: bool, pub is_indexing: bool,
/// Association of every field name with the number of times it occurs in the documents.
pub field_distribution: FieldDistribution, pub field_distribution: FieldDistribution,
} }