mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-11 05:54:30 +01:00
Add cache on the indexes stats
This commit is contained in:
parent
c0ede6d152
commit
fd5c48941a
@ -828,20 +828,36 @@ impl IndexScheduler {
|
|||||||
Ok(vec![task])
|
Ok(vec![task])
|
||||||
}
|
}
|
||||||
Batch::IndexOperation { op, must_create_index } => {
|
Batch::IndexOperation { op, must_create_index } => {
|
||||||
let index_uid = op.index_uid();
|
let index_uid = op.index_uid().to_string();
|
||||||
let index = if must_create_index {
|
let index = if must_create_index {
|
||||||
// create the index if it doesn't already exist
|
// create the index if it doesn't already exist
|
||||||
let wtxn = self.env.write_txn()?;
|
let wtxn = self.env.write_txn()?;
|
||||||
self.index_mapper.create_index(wtxn, index_uid, None)?
|
self.index_mapper.create_index(wtxn, &index_uid, None)?
|
||||||
} else {
|
} else {
|
||||||
let rtxn = self.env.read_txn()?;
|
let rtxn = self.env.read_txn()?;
|
||||||
self.index_mapper.index(&rtxn, index_uid)?
|
self.index_mapper.index(&rtxn, &index_uid)?
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut index_wtxn = index.write_txn()?;
|
let mut index_wtxn = index.write_txn()?;
|
||||||
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
|
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
|
||||||
index_wtxn.commit()?;
|
index_wtxn.commit()?;
|
||||||
|
|
||||||
|
// if the update processed successfully, we're going to store the new
|
||||||
|
// stats of the index. Since the tasks have already been processed and
|
||||||
|
// this is a non-critical operation. If it fails, we should not fail
|
||||||
|
// the entire batch.
|
||||||
|
let res = || -> Result<()> {
|
||||||
|
let mut wtxn = self.env.write_txn()?;
|
||||||
|
self.index_mapper.compute_and_store_stats_of(&mut wtxn, &index_uid)?;
|
||||||
|
wtxn.commit()?;
|
||||||
|
Ok(())
|
||||||
|
}();
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(e) => error!("Could not write the stats of the index {}", e),
|
||||||
|
}
|
||||||
|
|
||||||
Ok(tasks)
|
Ok(tasks)
|
||||||
}
|
}
|
||||||
Batch::IndexCreation { index_uid, primary_key, task } => {
|
Batch::IndexCreation { index_uid, primary_key, task } => {
|
||||||
@ -875,6 +891,22 @@ impl IndexScheduler {
|
|||||||
task.status = Status::Succeeded;
|
task.status = Status::Succeeded;
|
||||||
task.details = Some(Details::IndexInfo { primary_key });
|
task.details = Some(Details::IndexInfo { primary_key });
|
||||||
|
|
||||||
|
// if the update processed successfully, we're going to store the new
|
||||||
|
// stats of the index. Since the tasks have already been processed and
|
||||||
|
// this is a non-critical operation. If it fails, we should not fail
|
||||||
|
// the entire batch.
|
||||||
|
let res = || -> Result<()> {
|
||||||
|
let mut wtxn = self.env.write_txn()?;
|
||||||
|
self.index_mapper.compute_and_store_stats_of(&mut wtxn, &index_uid)?;
|
||||||
|
wtxn.commit()?;
|
||||||
|
Ok(())
|
||||||
|
}();
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(e) => error!("Could not write the stats of the index {}", e),
|
||||||
|
}
|
||||||
|
|
||||||
Ok(vec![task])
|
Ok(vec![task])
|
||||||
}
|
}
|
||||||
Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => {
|
Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => {
|
||||||
|
@ -4,10 +4,11 @@ use std::time::Duration;
|
|||||||
use std::{fs, thread};
|
use std::{fs, thread};
|
||||||
|
|
||||||
use log::error;
|
use log::error;
|
||||||
use meilisearch_types::heed::types::Str;
|
use meilisearch_types::heed::types::{SerdeJson, Str};
|
||||||
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
|
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
|
||||||
use meilisearch_types::milli::update::IndexerConfig;
|
use meilisearch_types::milli::update::IndexerConfig;
|
||||||
use meilisearch_types::milli::Index;
|
use meilisearch_types::milli::{FieldDistribution, Index};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
@ -19,6 +20,7 @@ use crate::{Error, Result};
|
|||||||
mod index_map;
|
mod index_map;
|
||||||
|
|
||||||
const INDEX_MAPPING: &str = "index-mapping";
|
const INDEX_MAPPING: &str = "index-mapping";
|
||||||
|
const INDEX_STATS: &str = "index-stats";
|
||||||
|
|
||||||
/// Structure managing meilisearch's indexes.
|
/// Structure managing meilisearch's indexes.
|
||||||
///
|
///
|
||||||
@ -52,6 +54,8 @@ pub struct IndexMapper {
|
|||||||
|
|
||||||
/// Map an index name with an index uuid currently available on disk.
|
/// Map an index name with an index uuid currently available on disk.
|
||||||
pub(crate) index_mapping: Database<Str, UuidCodec>,
|
pub(crate) index_mapping: Database<Str, UuidCodec>,
|
||||||
|
/// Map an index name with the cached stats associated to the index.
|
||||||
|
pub(crate) index_stats: Database<Str, SerdeJson<IndexStats>>,
|
||||||
|
|
||||||
/// Path to the folder where the LMDB environments of each index are.
|
/// Path to the folder where the LMDB environments of each index are.
|
||||||
base_path: PathBuf,
|
base_path: PathBuf,
|
||||||
@ -76,6 +80,15 @@ pub enum IndexStatus {
|
|||||||
Available(Index),
|
Available(Index),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
|
pub struct IndexStats {
|
||||||
|
pub number_of_documents: u64,
|
||||||
|
pub database_size: u64,
|
||||||
|
pub field_distribution: FieldDistribution,
|
||||||
|
pub created_at: OffsetDateTime,
|
||||||
|
pub updated_at: OffsetDateTime,
|
||||||
|
}
|
||||||
|
|
||||||
impl IndexMapper {
|
impl IndexMapper {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
env: &Env,
|
env: &Env,
|
||||||
@ -88,6 +101,7 @@ impl IndexMapper {
|
|||||||
Ok(Self {
|
Ok(Self {
|
||||||
index_map: Arc::new(RwLock::new(IndexMap::new(index_count))),
|
index_map: Arc::new(RwLock::new(IndexMap::new(index_count))),
|
||||||
index_mapping: env.create_database(Some(INDEX_MAPPING))?,
|
index_mapping: env.create_database(Some(INDEX_MAPPING))?,
|
||||||
|
index_stats: env.create_database(Some(INDEX_STATS))?,
|
||||||
base_path,
|
base_path,
|
||||||
index_base_map_size,
|
index_base_map_size,
|
||||||
index_growth_amount,
|
index_growth_amount,
|
||||||
@ -135,6 +149,7 @@ impl IndexMapper {
|
|||||||
/// Removes the index from the mapping table and the in-memory index map
|
/// Removes the index from the mapping table and the in-memory index map
|
||||||
/// but keeps the associated tasks.
|
/// but keeps the associated tasks.
|
||||||
pub fn delete_index(&self, mut wtxn: RwTxn, name: &str) -> Result<()> {
|
pub fn delete_index(&self, mut wtxn: RwTxn, name: &str) -> Result<()> {
|
||||||
|
self.index_stats.delete(&mut wtxn, name)?;
|
||||||
let uuid = self
|
let uuid = self
|
||||||
.index_mapping
|
.index_mapping
|
||||||
.get(&wtxn, name)?
|
.get(&wtxn, name)?
|
||||||
@ -360,6 +375,29 @@ impl IndexMapper {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return the stored stats of an index.
|
||||||
|
pub fn stats_of(&self, rtxn: &RoTxn, index_uid: &str) -> Result<IndexStats> {
|
||||||
|
self.index_stats
|
||||||
|
.get(rtxn, index_uid)?
|
||||||
|
.ok_or_else(|| Error::IndexNotFound(index_uid.to_string()))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return the stats of an index and write it in the index-mapper database.
|
||||||
|
pub fn compute_and_store_stats_of(&self, wtxn: &mut RwTxn, index_uid: &str) -> Result<()> {
|
||||||
|
let index = self.index(wtxn, index_uid)?;
|
||||||
|
let database_size = index.on_disk_size()?;
|
||||||
|
let rtxn = index.read_txn()?;
|
||||||
|
let stats = IndexStats {
|
||||||
|
number_of_documents: index.number_of_documents(&rtxn)?,
|
||||||
|
database_size,
|
||||||
|
field_distribution: index.field_distribution(&rtxn)?,
|
||||||
|
created_at: index.created_at(&rtxn)?,
|
||||||
|
updated_at: index.updated_at(&rtxn)?,
|
||||||
|
};
|
||||||
|
self.index_stats.put(wtxn, index_uid, &stats)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn index_exists(&self, rtxn: &RoTxn, name: &str) -> Result<bool> {
|
pub fn index_exists(&self, rtxn: &RoTxn, name: &str) -> Result<bool> {
|
||||||
Ok(self.index_mapping.get(rtxn, name)?.is_some())
|
Ok(self.index_mapping.get(rtxn, name)?.is_some())
|
||||||
}
|
}
|
||||||
|
@ -44,10 +44,9 @@ use file_store::FileStore;
|
|||||||
use meilisearch_types::error::ResponseError;
|
use meilisearch_types::error::ResponseError;
|
||||||
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
|
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
|
||||||
use meilisearch_types::heed::{self, Database, Env, RoTxn};
|
use meilisearch_types::heed::{self, Database, Env, RoTxn};
|
||||||
use meilisearch_types::milli;
|
|
||||||
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
|
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
|
||||||
use meilisearch_types::milli::update::IndexerConfig;
|
use meilisearch_types::milli::update::IndexerConfig;
|
||||||
use meilisearch_types::milli::{CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32};
|
use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32};
|
||||||
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
|
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
|
||||||
use roaring::RoaringBitmap;
|
use roaring::RoaringBitmap;
|
||||||
use synchronoise::SignalEvent;
|
use synchronoise::SignalEvent;
|
||||||
@ -566,7 +565,7 @@ impl IndexScheduler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Return the name of all indexes without opening them.
|
/// Return the name of all indexes without opening them.
|
||||||
pub fn index_names(self) -> Result<Vec<String>> {
|
pub fn index_names(&self) -> Result<Vec<String>> {
|
||||||
let rtxn = self.env.read_txn()?;
|
let rtxn = self.env.read_txn()?;
|
||||||
self.index_mapper.index_names(&rtxn)
|
self.index_mapper.index_names(&rtxn)
|
||||||
}
|
}
|
||||||
@ -1186,6 +1185,14 @@ impl IndexScheduler {
|
|||||||
Ok(TickOutcome::TickAgain(processed_tasks))
|
Ok(TickOutcome::TickAgain(processed_tasks))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn index_stats(&self, index_uid: &str) -> Result<IndexStats> {
|
||||||
|
let is_indexing = self.is_index_processing(index_uid)?;
|
||||||
|
let rtxn = self.read_txn()?;
|
||||||
|
let index_stats = self.index_mapper.stats_of(&rtxn, index_uid)?;
|
||||||
|
|
||||||
|
Ok(IndexStats { is_indexing, inner_stats: index_stats })
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> {
|
pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> {
|
||||||
match task.content_uuid() {
|
match task.content_uuid() {
|
||||||
Some(content_file) => self.delete_update_file(content_file),
|
Some(content_file) => self.delete_update_file(content_file),
|
||||||
@ -1238,6 +1245,12 @@ struct IndexBudget {
|
|||||||
task_db_size: usize,
|
task_db_size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct IndexStats {
|
||||||
|
pub is_indexing: bool,
|
||||||
|
pub inner_stats: index_mapper::IndexStats,
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use std::io::{BufWriter, Seek, Write};
|
use std::io::{BufWriter, Seek, Write};
|
||||||
|
@ -220,6 +220,24 @@ pub async fn delete_index(
|
|||||||
Ok(HttpResponse::Accepted().json(task))
|
Ok(HttpResponse::Accepted().json(task))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Debug)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct IndexStats {
|
||||||
|
pub number_of_documents: u64,
|
||||||
|
pub is_indexing: bool,
|
||||||
|
pub field_distribution: FieldDistribution,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<index_scheduler::IndexStats> for IndexStats {
|
||||||
|
fn from(stats: index_scheduler::IndexStats) -> Self {
|
||||||
|
IndexStats {
|
||||||
|
number_of_documents: stats.inner_stats.number_of_documents,
|
||||||
|
is_indexing: stats.is_indexing,
|
||||||
|
field_distribution: stats.inner_stats.field_distribution,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn get_index_stats(
|
pub async fn get_index_stats(
|
||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::STATS_GET }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::STATS_GET }>, Data<IndexScheduler>>,
|
||||||
index_uid: web::Path<String>,
|
index_uid: web::Path<String>,
|
||||||
@ -229,33 +247,8 @@ pub async fn get_index_stats(
|
|||||||
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
|
||||||
analytics.publish("Stats Seen".to_string(), json!({ "per_index_uid": true }), Some(&req));
|
analytics.publish("Stats Seen".to_string(), json!({ "per_index_uid": true }), Some(&req));
|
||||||
|
|
||||||
let stats = IndexStats::new((*index_scheduler).clone(), index_uid.into_inner())?;
|
let stats = IndexStats::from(index_scheduler.index_stats(&index_uid)?);
|
||||||
|
|
||||||
debug!("returns: {:?}", stats);
|
debug!("returns: {:?}", stats);
|
||||||
Ok(HttpResponse::Ok().json(stats))
|
Ok(HttpResponse::Ok().json(stats))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Debug)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
pub struct IndexStats {
|
|
||||||
pub number_of_documents: u64,
|
|
||||||
pub is_indexing: bool,
|
|
||||||
pub field_distribution: FieldDistribution,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl IndexStats {
|
|
||||||
pub fn new(
|
|
||||||
index_scheduler: Data<IndexScheduler>,
|
|
||||||
index_uid: String,
|
|
||||||
) -> Result<Self, ResponseError> {
|
|
||||||
// we check if there is currently a task processing associated with this index.
|
|
||||||
let is_processing = index_scheduler.is_index_processing(&index_uid)?;
|
|
||||||
let index = index_scheduler.index(&index_uid)?;
|
|
||||||
let rtxn = index.read_txn()?;
|
|
||||||
Ok(IndexStats {
|
|
||||||
number_of_documents: index.number_of_documents(&rtxn)?,
|
|
||||||
is_indexing: is_processing,
|
|
||||||
field_distribution: index.field_distribution(&rtxn)?,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -2,7 +2,7 @@ use std::collections::BTreeMap;
|
|||||||
|
|
||||||
use actix_web::web::Data;
|
use actix_web::web::Data;
|
||||||
use actix_web::{web, HttpRequest, HttpResponse};
|
use actix_web::{web, HttpRequest, HttpResponse};
|
||||||
use index_scheduler::{IndexScheduler, Query};
|
use index_scheduler::IndexScheduler;
|
||||||
use log::debug;
|
use log::debug;
|
||||||
use meilisearch_auth::AuthController;
|
use meilisearch_auth::AuthController;
|
||||||
use meilisearch_types::error::ResponseError;
|
use meilisearch_types::error::ResponseError;
|
||||||
@ -12,7 +12,6 @@ use serde::{Deserialize, Serialize};
|
|||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
|
|
||||||
use self::indexes::IndexStats;
|
|
||||||
use crate::analytics::Analytics;
|
use crate::analytics::Analytics;
|
||||||
use crate::extractors::authentication::policies::*;
|
use crate::extractors::authentication::policies::*;
|
||||||
use crate::extractors::authentication::GuardedData;
|
use crate::extractors::authentication::GuardedData;
|
||||||
@ -234,7 +233,7 @@ pub struct Stats {
|
|||||||
pub database_size: u64,
|
pub database_size: u64,
|
||||||
#[serde(serialize_with = "time::serde::rfc3339::option::serialize")]
|
#[serde(serialize_with = "time::serde::rfc3339::option::serialize")]
|
||||||
pub last_update: Option<OffsetDateTime>,
|
pub last_update: Option<OffsetDateTime>,
|
||||||
pub indexes: BTreeMap<String, IndexStats>,
|
pub indexes: BTreeMap<String, indexes::IndexStats>,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_stats(
|
async fn get_stats(
|
||||||
@ -260,32 +259,19 @@ pub fn create_all_stats(
|
|||||||
let mut last_task: Option<OffsetDateTime> = None;
|
let mut last_task: Option<OffsetDateTime> = None;
|
||||||
let mut indexes = BTreeMap::new();
|
let mut indexes = BTreeMap::new();
|
||||||
let mut database_size = 0;
|
let mut database_size = 0;
|
||||||
let processing_task = index_scheduler.get_tasks_from_authorized_indexes(
|
|
||||||
Query { statuses: Some(vec![Status::Processing]), limit: Some(1), ..Query::default() },
|
|
||||||
filters,
|
|
||||||
)?;
|
|
||||||
// accumulate the size of each indexes
|
// accumulate the size of each indexes
|
||||||
let processing_index = processing_task.first().and_then(|task| task.index_uid());
|
for index_uid in index_scheduler.index_names()? {
|
||||||
index_scheduler.try_for_each_index(|name, index| {
|
if !filters.is_index_authorized(&index_uid) {
|
||||||
if !filters.is_index_authorized(name) {
|
continue;
|
||||||
return Ok(());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
database_size += index.on_disk_size()?;
|
let stats = index_scheduler.index_stats(&index_uid)?;
|
||||||
|
last_task = last_task.map_or(Some(stats.inner_stats.updated_at), |last| {
|
||||||
let rtxn = index.read_txn()?;
|
Some(last.max(stats.inner_stats.updated_at))
|
||||||
let stats = IndexStats {
|
});
|
||||||
number_of_documents: index.number_of_documents(&rtxn)?,
|
indexes.insert(index_uid.to_string(), stats.into());
|
||||||
is_indexing: processing_index.map_or(false, |index_name| name == index_name),
|
}
|
||||||
field_distribution: index.field_distribution(&rtxn)?,
|
|
||||||
};
|
|
||||||
|
|
||||||
let updated_at = index.updated_at(&rtxn)?;
|
|
||||||
last_task = last_task.map_or(Some(updated_at), |last| Some(last.max(updated_at)));
|
|
||||||
|
|
||||||
indexes.insert(name.to_string(), stats);
|
|
||||||
Ok(())
|
|
||||||
})?;
|
|
||||||
|
|
||||||
database_size += index_scheduler.size()?;
|
database_size += index_scheduler.size()?;
|
||||||
database_size += auth_controller.size()?;
|
database_size += auth_controller.size()?;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user