mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-23 05:14:27 +01:00
Merge #3418
3418: Compute the size of the auth-controller, index-scheduler and all update files in the global stats r=dureuill a=irevoire Fix https://github.com/meilisearch/meilisearch/issues/3201 Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
commit
5beb1aab7d
@ -1,4 +1,3 @@
|
|||||||
use std::collections::BTreeSet;
|
|
||||||
use std::fs::File as StdFile;
|
use std::fs::File as StdFile;
|
||||||
use std::ops::{Deref, DerefMut};
|
use std::ops::{Deref, DerefMut};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
@ -11,10 +10,14 @@ const UPDATE_FILES_PATH: &str = "updates/updates_files";
|
|||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
|
#[error("Could not parse file name as utf-8")]
|
||||||
|
CouldNotParseFileNameAsUtf8,
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
IoError(#[from] std::io::Error),
|
IoError(#[from] std::io::Error),
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
PersistError(#[from] tempfile::PersistError),
|
PersistError(#[from] tempfile::PersistError),
|
||||||
|
#[error(transparent)]
|
||||||
|
UuidError(#[from] uuid::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type Result<T> = std::result::Result<T, Error>;
|
pub type Result<T> = std::result::Result<T, Error>;
|
||||||
@ -33,13 +36,11 @@ impl DerefMut for File {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg_attr(test, faux::create)]
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct FileStore {
|
pub struct FileStore {
|
||||||
path: PathBuf,
|
path: PathBuf,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(test))]
|
|
||||||
impl FileStore {
|
impl FileStore {
|
||||||
pub fn new(path: impl AsRef<Path>) -> Result<FileStore> {
|
pub fn new(path: impl AsRef<Path>) -> Result<FileStore> {
|
||||||
let path = path.as_ref().to_path_buf();
|
let path = path.as_ref().to_path_buf();
|
||||||
@ -48,7 +49,6 @@ impl FileStore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg_attr(test, faux::methods)]
|
|
||||||
impl FileStore {
|
impl FileStore {
|
||||||
/// Creates a new temporary update file.
|
/// Creates a new temporary update file.
|
||||||
/// A call to `persist` is needed to persist the file in the database.
|
/// A call to `persist` is needed to persist the file in the database.
|
||||||
@ -94,7 +94,17 @@ impl FileStore {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_size(&self, uuid: Uuid) -> Result<u64> {
|
/// Compute the size of all the updates contained in the file store.
|
||||||
|
pub fn compute_total_size(&self) -> Result<u64> {
|
||||||
|
let mut total = 0;
|
||||||
|
for uuid in self.all_uuids()? {
|
||||||
|
total += self.compute_size(uuid?).unwrap_or_default();
|
||||||
|
}
|
||||||
|
Ok(total)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Compute the size of one update
|
||||||
|
pub fn compute_size(&self, uuid: Uuid) -> Result<u64> {
|
||||||
Ok(self.get_update(uuid)?.metadata()?.len())
|
Ok(self.get_update(uuid)?.metadata()?.len())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -105,17 +115,12 @@ impl FileStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// List the Uuids of the files in the FileStore
|
/// List the Uuids of the files in the FileStore
|
||||||
///
|
pub fn all_uuids(&self) -> Result<impl Iterator<Item = Result<Uuid>>> {
|
||||||
/// This function is meant to be used by tests only.
|
Ok(self.path.read_dir()?.map(|entry| {
|
||||||
#[doc(hidden)]
|
Ok(Uuid::from_str(
|
||||||
pub fn __all_uuids(&self) -> BTreeSet<Uuid> {
|
entry?.file_name().to_str().ok_or(Error::CouldNotParseFileNameAsUtf8)?,
|
||||||
let mut uuids = BTreeSet::new();
|
)?)
|
||||||
for entry in self.path.read_dir().unwrap() {
|
}))
|
||||||
let entry = entry.unwrap();
|
|
||||||
let uuid = Uuid::from_str(entry.file_name().to_str().unwrap()).unwrap();
|
|
||||||
uuids.insert(uuid);
|
|
||||||
}
|
|
||||||
uuids
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,3 +1,4 @@
|
|||||||
|
use std::collections::BTreeSet;
|
||||||
use std::fmt::Write;
|
use std::fmt::Write;
|
||||||
|
|
||||||
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
|
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
|
||||||
@ -92,7 +93,9 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
|
|||||||
|
|
||||||
pub fn snapshot_file_store(file_store: &file_store::FileStore) -> String {
|
pub fn snapshot_file_store(file_store: &file_store::FileStore) -> String {
|
||||||
let mut snap = String::new();
|
let mut snap = String::new();
|
||||||
for uuid in file_store.__all_uuids() {
|
// we store the uuid in a `BTreeSet` to keep them ordered.
|
||||||
|
let all_uuids = file_store.all_uuids().unwrap().collect::<Result<BTreeSet<_>, _>>().unwrap();
|
||||||
|
for uuid in all_uuids {
|
||||||
snap.push_str(&format!("{uuid}\n"));
|
snap.push_str(&format!("{uuid}\n"));
|
||||||
}
|
}
|
||||||
snap
|
snap
|
||||||
|
@ -452,6 +452,10 @@ impl IndexScheduler {
|
|||||||
&self.index_mapper.indexer_config
|
&self.index_mapper.indexer_config
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn size(&self) -> Result<u64> {
|
||||||
|
Ok(self.env.real_disk_size()?)
|
||||||
|
}
|
||||||
|
|
||||||
/// Return the index corresponding to the name.
|
/// Return the index corresponding to the name.
|
||||||
///
|
///
|
||||||
/// * If the index wasn't opened before, the index will be opened.
|
/// * If the index wasn't opened before, the index will be opened.
|
||||||
@ -898,6 +902,11 @@ impl IndexScheduler {
|
|||||||
Ok(self.file_store.new_update_with_uuid(uuid)?)
|
Ok(self.file_store.new_update_with_uuid(uuid)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The size on disk taken by all the updates files contained in the `IndexScheduler`, in bytes.
|
||||||
|
pub fn compute_update_file_size(&self) -> Result<u64> {
|
||||||
|
Ok(self.file_store.compute_total_size()?)
|
||||||
|
}
|
||||||
|
|
||||||
/// Delete a file from the index scheduler.
|
/// Delete a file from the index scheduler.
|
||||||
///
|
///
|
||||||
/// Counterpart to the [`create_update_file`](IndexScheduler::create_update_file) method.
|
/// Counterpart to the [`create_update_file`](IndexScheduler::create_update_file) method.
|
||||||
|
@ -508,14 +508,21 @@ impl IndexScheduler {
|
|||||||
if let KindWithContent::DocumentAdditionOrUpdate { content_file, .. } = kind {
|
if let KindWithContent::DocumentAdditionOrUpdate { content_file, .. } = kind {
|
||||||
match status {
|
match status {
|
||||||
Status::Enqueued | Status::Processing => {
|
Status::Enqueued | Status::Processing => {
|
||||||
assert!(
|
assert!(self
|
||||||
self.file_store.__all_uuids().contains(&content_file),
|
.file_store
|
||||||
|
.all_uuids()
|
||||||
|
.unwrap()
|
||||||
|
.any(|uuid| uuid.as_ref().unwrap() == &content_file),
|
||||||
"Could not find uuid `{content_file}` in the file_store. Available uuids are {:?}.",
|
"Could not find uuid `{content_file}` in the file_store. Available uuids are {:?}.",
|
||||||
self.file_store.__all_uuids(),
|
self.file_store.all_uuids().unwrap().collect::<std::result::Result<Vec<_>, file_store::Error>>().unwrap(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
Status::Succeeded | Status::Failed | Status::Canceled => {
|
Status::Succeeded | Status::Failed | Status::Canceled => {
|
||||||
assert!(!self.file_store.__all_uuids().contains(&content_file));
|
assert!(self
|
||||||
|
.file_store
|
||||||
|
.all_uuids()
|
||||||
|
.unwrap()
|
||||||
|
.all(|uuid| uuid.as_ref().unwrap() != &content_file));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -33,6 +33,11 @@ impl AuthController {
|
|||||||
Ok(Self { store: Arc::new(store), master_key: master_key.clone() })
|
Ok(Self { store: Arc::new(store), master_key: master_key.clone() })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return the size of the `AuthController` database in bytes.
|
||||||
|
pub fn size(&self) -> Result<u64> {
|
||||||
|
self.store.size()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn create_key(&self, create_key: CreateApiKey) -> Result<Key> {
|
pub fn create_key(&self, create_key: CreateApiKey) -> Result<Key> {
|
||||||
match self.store.get_api_key(create_key.uid)? {
|
match self.store.get_api_key(create_key.uid)? {
|
||||||
Some(_) => Err(AuthControllerError::ApiKeyAlreadyExists(create_key.uid.to_string())),
|
Some(_) => Err(AuthControllerError::ApiKeyAlreadyExists(create_key.uid.to_string())),
|
||||||
|
@ -60,6 +60,11 @@ impl HeedAuthStore {
|
|||||||
Ok(Self { env, keys, action_keyid_index_expiration, should_close_on_drop: true })
|
Ok(Self { env, keys, action_keyid_index_expiration, should_close_on_drop: true })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return the size in bytes of database
|
||||||
|
pub fn size(&self) -> Result<u64> {
|
||||||
|
Ok(self.env.real_disk_size()?)
|
||||||
|
}
|
||||||
|
|
||||||
pub fn set_drop_on_close(&mut self, v: bool) {
|
pub fn set_drop_on_close(&mut self, v: bool) {
|
||||||
self.should_close_on_drop = v;
|
self.should_close_on_drop = v;
|
||||||
}
|
}
|
||||||
|
@ -343,6 +343,7 @@ impl ErrorCode for file_store::Error {
|
|||||||
match self {
|
match self {
|
||||||
Self::IoError(e) => e.error_code(),
|
Self::IoError(e) => e.error_code(),
|
||||||
Self::PersistError(e) => e.error_code(),
|
Self::PersistError(e) => e.error_code(),
|
||||||
|
Self::CouldNotParseFileNameAsUtf8 | Self::UuidError(_) => Code::Internal,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -9,7 +9,7 @@ use actix_web::HttpRequest;
|
|||||||
use byte_unit::Byte;
|
use byte_unit::Byte;
|
||||||
use http::header::CONTENT_TYPE;
|
use http::header::CONTENT_TYPE;
|
||||||
use index_scheduler::IndexScheduler;
|
use index_scheduler::IndexScheduler;
|
||||||
use meilisearch_auth::SearchRules;
|
use meilisearch_auth::{AuthController, SearchRules};
|
||||||
use meilisearch_types::InstanceUid;
|
use meilisearch_types::InstanceUid;
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
@ -82,7 +82,11 @@ pub struct SegmentAnalytics {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl SegmentAnalytics {
|
impl SegmentAnalytics {
|
||||||
pub async fn new(opt: &Opt, index_scheduler: Arc<IndexScheduler>) -> Arc<dyn Analytics> {
|
pub async fn new(
|
||||||
|
opt: &Opt,
|
||||||
|
index_scheduler: Arc<IndexScheduler>,
|
||||||
|
auth_controller: AuthController,
|
||||||
|
) -> Arc<dyn Analytics> {
|
||||||
let instance_uid = super::find_user_id(&opt.db_path);
|
let instance_uid = super::find_user_id(&opt.db_path);
|
||||||
let first_time_run = instance_uid.is_none();
|
let first_time_run = instance_uid.is_none();
|
||||||
let instance_uid = instance_uid.unwrap_or_else(|| Uuid::new_v4());
|
let instance_uid = instance_uid.unwrap_or_else(|| Uuid::new_v4());
|
||||||
@ -136,7 +140,7 @@ impl SegmentAnalytics {
|
|||||||
get_tasks_aggregator: TasksAggregator::default(),
|
get_tasks_aggregator: TasksAggregator::default(),
|
||||||
health_aggregator: HealthAggregator::default(),
|
health_aggregator: HealthAggregator::default(),
|
||||||
});
|
});
|
||||||
tokio::spawn(segment.run(index_scheduler.clone()));
|
tokio::spawn(segment.run(index_scheduler.clone(), auth_controller.clone()));
|
||||||
|
|
||||||
let this = Self { instance_uid, sender, user: user.clone() };
|
let this = Self { instance_uid, sender, user: user.clone() };
|
||||||
|
|
||||||
@ -361,7 +365,7 @@ impl Segment {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run(mut self, index_scheduler: Arc<IndexScheduler>) {
|
async fn run(mut self, index_scheduler: Arc<IndexScheduler>, auth_controller: AuthController) {
|
||||||
const INTERVAL: Duration = Duration::from_secs(60 * 60); // one hour
|
const INTERVAL: Duration = Duration::from_secs(60 * 60); // one hour
|
||||||
// The first batch must be sent after one hour.
|
// The first batch must be sent after one hour.
|
||||||
let mut interval =
|
let mut interval =
|
||||||
@ -370,7 +374,7 @@ impl Segment {
|
|||||||
loop {
|
loop {
|
||||||
select! {
|
select! {
|
||||||
_ = interval.tick() => {
|
_ = interval.tick() => {
|
||||||
self.tick(index_scheduler.clone()).await;
|
self.tick(index_scheduler.clone(), auth_controller.clone()).await;
|
||||||
},
|
},
|
||||||
msg = self.inbox.recv() => {
|
msg = self.inbox.recv() => {
|
||||||
match msg {
|
match msg {
|
||||||
@ -389,8 +393,14 @@ impl Segment {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn tick(&mut self, index_scheduler: Arc<IndexScheduler>) {
|
async fn tick(
|
||||||
if let Ok(stats) = create_all_stats(index_scheduler.into(), &SearchRules::default()) {
|
&mut self,
|
||||||
|
index_scheduler: Arc<IndexScheduler>,
|
||||||
|
auth_controller: AuthController,
|
||||||
|
) {
|
||||||
|
if let Ok(stats) =
|
||||||
|
create_all_stats(index_scheduler.into(), auth_controller, &SearchRules::default())
|
||||||
|
{
|
||||||
let _ = self
|
let _ = self
|
||||||
.batcher
|
.batcher
|
||||||
.push(Identify {
|
.push(Identify {
|
||||||
|
@ -57,7 +57,8 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
|
|
||||||
#[cfg(all(not(debug_assertions), feature = "analytics"))]
|
#[cfg(all(not(debug_assertions), feature = "analytics"))]
|
||||||
let analytics = if !opt.no_analytics {
|
let analytics = if !opt.no_analytics {
|
||||||
analytics::SegmentAnalytics::new(&opt, index_scheduler.clone()).await
|
analytics::SegmentAnalytics::new(&opt, index_scheduler.clone(), auth_controller.clone())
|
||||||
|
.await
|
||||||
} else {
|
} else {
|
||||||
analytics::MockAnalytics::new(&opt)
|
analytics::MockAnalytics::new(&opt)
|
||||||
};
|
};
|
||||||
|
@ -4,6 +4,7 @@ use actix_web::web::Data;
|
|||||||
use actix_web::{web, HttpRequest, HttpResponse};
|
use actix_web::{web, HttpRequest, HttpResponse};
|
||||||
use index_scheduler::{IndexScheduler, Query};
|
use index_scheduler::{IndexScheduler, Query};
|
||||||
use log::debug;
|
use log::debug;
|
||||||
|
use meilisearch_auth::AuthController;
|
||||||
use meilisearch_types::error::ResponseError;
|
use meilisearch_types::error::ResponseError;
|
||||||
use meilisearch_types::settings::{Settings, Unchecked};
|
use meilisearch_types::settings::{Settings, Unchecked};
|
||||||
use meilisearch_types::tasks::{Kind, Status, Task, TaskId};
|
use meilisearch_types::tasks::{Kind, Status, Task, TaskId};
|
||||||
@ -230,13 +231,15 @@ pub struct Stats {
|
|||||||
|
|
||||||
async fn get_stats(
|
async fn get_stats(
|
||||||
index_scheduler: GuardedData<ActionPolicy<{ actions::STATS_GET }>, Data<IndexScheduler>>,
|
index_scheduler: GuardedData<ActionPolicy<{ actions::STATS_GET }>, Data<IndexScheduler>>,
|
||||||
|
auth_controller: GuardedData<ActionPolicy<{ actions::STATS_GET }>, AuthController>,
|
||||||
req: HttpRequest,
|
req: HttpRequest,
|
||||||
analytics: web::Data<dyn Analytics>,
|
analytics: web::Data<dyn Analytics>,
|
||||||
) -> Result<HttpResponse, ResponseError> {
|
) -> Result<HttpResponse, ResponseError> {
|
||||||
analytics.publish("Stats Seen".to_string(), json!({ "per_index_uid": false }), Some(&req));
|
analytics.publish("Stats Seen".to_string(), json!({ "per_index_uid": false }), Some(&req));
|
||||||
let search_rules = &index_scheduler.filters().search_rules;
|
let search_rules = &index_scheduler.filters().search_rules;
|
||||||
|
|
||||||
let stats = create_all_stats((*index_scheduler).clone(), search_rules)?;
|
let stats =
|
||||||
|
create_all_stats((*index_scheduler).clone(), (*auth_controller).clone(), search_rules)?;
|
||||||
|
|
||||||
debug!("returns: {:?}", stats);
|
debug!("returns: {:?}", stats);
|
||||||
Ok(HttpResponse::Ok().json(stats))
|
Ok(HttpResponse::Ok().json(stats))
|
||||||
@ -244,6 +247,7 @@ async fn get_stats(
|
|||||||
|
|
||||||
pub fn create_all_stats(
|
pub fn create_all_stats(
|
||||||
index_scheduler: Data<IndexScheduler>,
|
index_scheduler: Data<IndexScheduler>,
|
||||||
|
auth_controller: AuthController,
|
||||||
search_rules: &meilisearch_auth::SearchRules,
|
search_rules: &meilisearch_auth::SearchRules,
|
||||||
) -> Result<Stats, ResponseError> {
|
) -> Result<Stats, ResponseError> {
|
||||||
let mut last_task: Option<OffsetDateTime> = None;
|
let mut last_task: Option<OffsetDateTime> = None;
|
||||||
@ -253,6 +257,7 @@ pub fn create_all_stats(
|
|||||||
Query { statuses: Some(vec![Status::Processing]), limit: Some(1), ..Query::default() },
|
Query { statuses: Some(vec![Status::Processing]), limit: Some(1), ..Query::default() },
|
||||||
search_rules.authorized_indexes(),
|
search_rules.authorized_indexes(),
|
||||||
)?;
|
)?;
|
||||||
|
// accumulate the size of each indexes
|
||||||
let processing_index = processing_task.first().and_then(|task| task.index_uid());
|
let processing_index = processing_task.first().and_then(|task| task.index_uid());
|
||||||
for (name, index) in index_scheduler.indexes()? {
|
for (name, index) in index_scheduler.indexes()? {
|
||||||
if !search_rules.is_index_authorized(&name) {
|
if !search_rules.is_index_authorized(&name) {
|
||||||
@ -273,6 +278,11 @@ pub fn create_all_stats(
|
|||||||
|
|
||||||
indexes.insert(name, stats);
|
indexes.insert(name, stats);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
database_size += index_scheduler.size()?;
|
||||||
|
database_size += auth_controller.size()?;
|
||||||
|
database_size += index_scheduler.compute_update_file_size()?;
|
||||||
|
|
||||||
let stats = Stats { database_size, last_update: last_task, indexes };
|
let stats = Stats { database_size, last_update: last_task, indexes };
|
||||||
Ok(stats)
|
Ok(stats)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user