mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-03 11:57:07 +02:00
Merge branch 'main' into tmp-release-v1.11.0
This commit is contained in:
commit
cf6ad1ae5e
1071 changed files with 263 additions and 106 deletions
109
crates/meilisearch/src/analytics/mock_analytics.rs
Normal file
109
crates/meilisearch/src/analytics/mock_analytics.rs
Normal file
|
@ -0,0 +1,109 @@
|
|||
use std::any::Any;
|
||||
use std::sync::Arc;
|
||||
|
||||
use actix_web::HttpRequest;
|
||||
use meilisearch_types::InstanceUid;
|
||||
use serde_json::Value;
|
||||
|
||||
use super::{find_user_id, Analytics, DocumentDeletionKind, DocumentFetchKind};
|
||||
use crate::routes::indexes::documents::{DocumentEditionByFunction, UpdateDocumentsQuery};
|
||||
use crate::Opt;
|
||||
|
||||
pub struct MockAnalytics {
|
||||
instance_uid: Option<InstanceUid>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct SearchAggregator;
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl SearchAggregator {
|
||||
pub fn from_query(_: &dyn Any, _: &dyn Any) -> Self {
|
||||
Self
|
||||
}
|
||||
|
||||
pub fn succeed(&mut self, _: &dyn Any) {}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct SimilarAggregator;
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl SimilarAggregator {
|
||||
pub fn from_query(_: &dyn Any, _: &dyn Any) -> Self {
|
||||
Self
|
||||
}
|
||||
|
||||
pub fn succeed(&mut self, _: &dyn Any) {}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct MultiSearchAggregator;
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl MultiSearchAggregator {
|
||||
pub fn from_federated_search(_: &dyn Any, _: &dyn Any) -> Self {
|
||||
Self
|
||||
}
|
||||
|
||||
pub fn succeed(&mut self) {}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct FacetSearchAggregator;
|
||||
|
||||
#[allow(dead_code)]
|
||||
impl FacetSearchAggregator {
|
||||
pub fn from_query(_: &dyn Any, _: &dyn Any) -> Self {
|
||||
Self
|
||||
}
|
||||
|
||||
pub fn succeed(&mut self, _: &dyn Any) {}
|
||||
}
|
||||
|
||||
impl MockAnalytics {
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
pub fn new(opt: &Opt) -> Arc<dyn Analytics> {
|
||||
let instance_uid = find_user_id(&opt.db_path);
|
||||
Arc::new(Self { instance_uid })
|
||||
}
|
||||
}
|
||||
|
||||
impl Analytics for MockAnalytics {
|
||||
fn instance_uid(&self) -> Option<&meilisearch_types::InstanceUid> {
|
||||
self.instance_uid.as_ref()
|
||||
}
|
||||
|
||||
// These methods are noop and should be optimized out
|
||||
fn publish(&self, _event_name: String, _send: Value, _request: Option<&HttpRequest>) {}
|
||||
fn get_search(&self, _aggregate: super::SearchAggregator) {}
|
||||
fn post_search(&self, _aggregate: super::SearchAggregator) {}
|
||||
fn get_similar(&self, _aggregate: super::SimilarAggregator) {}
|
||||
fn post_similar(&self, _aggregate: super::SimilarAggregator) {}
|
||||
fn post_multi_search(&self, _aggregate: super::MultiSearchAggregator) {}
|
||||
fn post_facet_search(&self, _aggregate: super::FacetSearchAggregator) {}
|
||||
fn add_documents(
|
||||
&self,
|
||||
_documents_query: &UpdateDocumentsQuery,
|
||||
_index_creation: bool,
|
||||
_request: &HttpRequest,
|
||||
) {
|
||||
}
|
||||
fn delete_documents(&self, _kind: DocumentDeletionKind, _request: &HttpRequest) {}
|
||||
fn update_documents(
|
||||
&self,
|
||||
_documents_query: &UpdateDocumentsQuery,
|
||||
_index_creation: bool,
|
||||
_request: &HttpRequest,
|
||||
) {
|
||||
}
|
||||
fn update_documents_by_function(
|
||||
&self,
|
||||
_documents_query: &DocumentEditionByFunction,
|
||||
_index_creation: bool,
|
||||
_request: &HttpRequest,
|
||||
) {
|
||||
}
|
||||
fn get_fetch_documents(&self, _documents_query: &DocumentFetchKind, _request: &HttpRequest) {}
|
||||
fn post_fetch_documents(&self, _documents_query: &DocumentFetchKind, _request: &HttpRequest) {}
|
||||
}
|
166
crates/meilisearch/src/analytics/mod.rs
Normal file
166
crates/meilisearch/src/analytics/mod.rs
Normal file
|
@ -0,0 +1,166 @@
|
|||
pub mod segment_analytics;
|
||||
|
||||
use std::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use actix_web::HttpRequest;
|
||||
use index_scheduler::IndexScheduler;
|
||||
use meilisearch_auth::AuthController;
|
||||
use meilisearch_types::InstanceUid;
|
||||
use mopa::mopafy;
|
||||
use once_cell::sync::Lazy;
|
||||
use platform_dirs::AppDirs;
|
||||
|
||||
// if the feature analytics is enabled we use the real analytics
|
||||
pub type SegmentAnalytics = segment_analytics::SegmentAnalytics;
|
||||
|
||||
use crate::Opt;
|
||||
|
||||
/// A macro used to quickly define events that don't aggregate or send anything besides an empty event with its name.
|
||||
#[macro_export]
|
||||
macro_rules! empty_analytics {
|
||||
($struct_name:ident, $event_name:literal) => {
|
||||
#[derive(Default)]
|
||||
struct $struct_name {}
|
||||
|
||||
impl $crate::analytics::Aggregate for $struct_name {
|
||||
fn event_name(&self) -> &'static str {
|
||||
$event_name
|
||||
}
|
||||
|
||||
fn aggregate(self: Box<Self>, _other: Box<Self>) -> Box<Self> {
|
||||
self
|
||||
}
|
||||
|
||||
fn into_event(self: Box<Self>) -> serde_json::Value {
|
||||
serde_json::json!({})
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/// The Meilisearch config dir:
|
||||
/// `~/.config/Meilisearch` on *NIX or *BSD.
|
||||
/// `~/Library/ApplicationSupport` on macOS.
|
||||
/// `%APPDATA` (= `C:\Users%USERNAME%\AppData\Roaming`) on windows.
|
||||
static MEILISEARCH_CONFIG_PATH: Lazy<Option<PathBuf>> =
|
||||
Lazy::new(|| AppDirs::new(Some("Meilisearch"), false).map(|appdir| appdir.config_dir));
|
||||
|
||||
fn config_user_id_path(db_path: &Path) -> Option<PathBuf> {
|
||||
db_path
|
||||
.canonicalize()
|
||||
.ok()
|
||||
.map(|path| path.join("instance-uid").display().to_string().replace('/', "-"))
|
||||
.zip(MEILISEARCH_CONFIG_PATH.as_ref())
|
||||
.map(|(filename, config_path)| config_path.join(filename.trim_start_matches('-')))
|
||||
}
|
||||
|
||||
/// Look for the instance-uid in the `data.ms` or in `~/.config/Meilisearch/path-to-db-instance-uid`
|
||||
fn find_user_id(db_path: &Path) -> Option<InstanceUid> {
|
||||
fs::read_to_string(db_path.join("instance-uid"))
|
||||
.ok()
|
||||
.or_else(|| fs::read_to_string(config_user_id_path(db_path)?).ok())
|
||||
.and_then(|uid| InstanceUid::from_str(&uid).ok())
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||
pub enum DocumentDeletionKind {
|
||||
PerDocumentId,
|
||||
ClearAll,
|
||||
PerBatch,
|
||||
PerFilter,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
|
||||
pub enum DocumentFetchKind {
|
||||
PerDocumentId { retrieve_vectors: bool },
|
||||
Normal { with_filter: bool, limit: usize, offset: usize, retrieve_vectors: bool },
|
||||
}
|
||||
|
||||
/// To send an event to segment, your event must be able to aggregate itself with another event of the same type.
|
||||
pub trait Aggregate: 'static + mopa::Any + Send {
|
||||
/// The name of the event that will be sent to segment.
|
||||
fn event_name(&self) -> &'static str;
|
||||
|
||||
/// Will be called every time an event has been used twice before segment flushed its buffer.
|
||||
fn aggregate(self: Box<Self>, new: Box<Self>) -> Box<Self>
|
||||
where
|
||||
Self: Sized;
|
||||
|
||||
/// Converts your structure to the final event that'll be sent to segment.
|
||||
fn into_event(self: Box<Self>) -> serde_json::Value;
|
||||
}
|
||||
|
||||
mopafy!(Aggregate);
|
||||
|
||||
/// Helper trait to define multiple aggregates with the same content but a different name.
|
||||
/// Commonly used when you must aggregate a search with POST or with GET, for example.
|
||||
pub trait AggregateMethod: 'static + Default + Send {
|
||||
fn event_name() -> &'static str;
|
||||
}
|
||||
|
||||
/// A macro used to quickly define multiple aggregate method with their name
|
||||
/// Usage:
|
||||
/// ```rust
|
||||
/// use meilisearch::aggregate_methods;
|
||||
///
|
||||
/// aggregate_methods!(
|
||||
/// SearchGET => "Documents Searched GET",
|
||||
/// SearchPOST => "Documents Searched POST",
|
||||
/// );
|
||||
/// ```
|
||||
#[macro_export]
|
||||
macro_rules! aggregate_methods {
|
||||
($method:ident => $event_name:literal) => {
|
||||
#[derive(Default)]
|
||||
pub struct $method {}
|
||||
|
||||
impl $crate::analytics::AggregateMethod for $method {
|
||||
fn event_name() -> &'static str {
|
||||
$event_name
|
||||
}
|
||||
}
|
||||
};
|
||||
($($method:ident => $event_name:literal,)+) => {
|
||||
$(
|
||||
aggregate_methods!($method => $event_name);
|
||||
)+
|
||||
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Analytics {
|
||||
segment: Option<Arc<SegmentAnalytics>>,
|
||||
}
|
||||
|
||||
impl Analytics {
|
||||
pub async fn new(
|
||||
opt: &Opt,
|
||||
index_scheduler: Arc<IndexScheduler>,
|
||||
auth_controller: Arc<AuthController>,
|
||||
) -> Self {
|
||||
if opt.no_analytics {
|
||||
Self { segment: None }
|
||||
} else {
|
||||
Self { segment: SegmentAnalytics::new(opt, index_scheduler, auth_controller).await }
|
||||
}
|
||||
}
|
||||
|
||||
pub fn no_analytics() -> Self {
|
||||
Self { segment: None }
|
||||
}
|
||||
|
||||
pub fn instance_uid(&self) -> Option<&InstanceUid> {
|
||||
self.segment.as_ref().map(|segment| segment.instance_uid.as_ref())
|
||||
}
|
||||
|
||||
/// The method used to publish most analytics that do not need to be batched every hours
|
||||
pub fn publish<T: Aggregate>(&self, event: T, request: &HttpRequest) {
|
||||
if let Some(ref segment) = self.segment {
|
||||
let _ = segment.sender.try_send(segment_analytics::Message::new(event, request));
|
||||
}
|
||||
}
|
||||
}
|
484
crates/meilisearch/src/analytics/segment_analytics.rs
Normal file
484
crates/meilisearch/src/analytics/segment_analytics.rs
Normal file
|
@ -0,0 +1,484 @@
|
|||
use std::any::TypeId;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fs;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use actix_web::http::header::USER_AGENT;
|
||||
use actix_web::HttpRequest;
|
||||
use byte_unit::Byte;
|
||||
use index_scheduler::IndexScheduler;
|
||||
use meilisearch_auth::{AuthController, AuthFilter};
|
||||
use meilisearch_types::features::RuntimeTogglableFeatures;
|
||||
use meilisearch_types::InstanceUid;
|
||||
use once_cell::sync::Lazy;
|
||||
use segment::message::{Identify, Track, User};
|
||||
use segment::{AutoBatcher, Batcher, HttpClient};
|
||||
use serde::Serialize;
|
||||
use serde_json::{json, Value};
|
||||
use sysinfo::{Disks, System};
|
||||
use time::OffsetDateTime;
|
||||
use tokio::select;
|
||||
use tokio::sync::mpsc::{self, Receiver, Sender};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::{config_user_id_path, Aggregate, MEILISEARCH_CONFIG_PATH};
|
||||
use crate::option::{
|
||||
default_http_addr, IndexerOpts, LogMode, MaxMemory, MaxThreads, ScheduleSnapshot,
|
||||
};
|
||||
use crate::routes::{create_all_stats, Stats};
|
||||
use crate::Opt;
|
||||
|
||||
const ANALYTICS_HEADER: &str = "X-Meilisearch-Client";
|
||||
|
||||
/// Write the instance-uid in the `data.ms` and in `~/.config/MeiliSearch/path-to-db-instance-uid`. Ignore the errors.
|
||||
fn write_user_id(db_path: &Path, user_id: &InstanceUid) {
|
||||
let _ = fs::write(db_path.join("instance-uid"), user_id.to_string());
|
||||
if let Some((meilisearch_config_path, user_id_path)) =
|
||||
MEILISEARCH_CONFIG_PATH.as_ref().zip(config_user_id_path(db_path))
|
||||
{
|
||||
let _ = fs::create_dir_all(meilisearch_config_path);
|
||||
let _ = fs::write(user_id_path, user_id.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
const SEGMENT_API_KEY: &str = "P3FWhhEsJiEDCuEHpmcN9DHcK4hVfBvb";
|
||||
|
||||
pub fn extract_user_agents(request: &HttpRequest) -> HashSet<String> {
|
||||
request
|
||||
.headers()
|
||||
.get(ANALYTICS_HEADER)
|
||||
.or_else(|| request.headers().get(USER_AGENT))
|
||||
.and_then(|header| header.to_str().ok())
|
||||
.unwrap_or("unknown")
|
||||
.split(';')
|
||||
.map(str::trim)
|
||||
.map(ToString::to_string)
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub struct Message {
|
||||
// Since the type_id is solved statically we cannot retrieve it from the Box.
|
||||
// Thus we have to send it in the message directly.
|
||||
type_id: TypeId,
|
||||
// Same for the aggregate function.
|
||||
#[allow(clippy::type_complexity)]
|
||||
aggregator_function: fn(Box<dyn Aggregate>, Box<dyn Aggregate>) -> Option<Box<dyn Aggregate>>,
|
||||
event: Event,
|
||||
}
|
||||
|
||||
pub struct Event {
|
||||
original: Box<dyn Aggregate>,
|
||||
timestamp: OffsetDateTime,
|
||||
user_agents: HashSet<String>,
|
||||
total: usize,
|
||||
}
|
||||
|
||||
/// This function should always be called on the same type. If `this` and `other`
|
||||
/// aren't the same type the function will do nothing and return `None`.
|
||||
fn downcast_aggregate<ConcreteType: Aggregate>(
|
||||
old: Box<dyn Aggregate>,
|
||||
new: Box<dyn Aggregate>,
|
||||
) -> Option<Box<dyn Aggregate>> {
|
||||
if old.is::<ConcreteType>() && new.is::<ConcreteType>() {
|
||||
// Both the two following lines cannot fail, but just to be sure we don't crash, we're still avoiding unwrapping
|
||||
let this = old.downcast::<ConcreteType>().ok()?;
|
||||
let other = new.downcast::<ConcreteType>().ok()?;
|
||||
Some(ConcreteType::aggregate(this, other))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl Message {
|
||||
pub fn new<T: Aggregate>(event: T, request: &HttpRequest) -> Self {
|
||||
Self {
|
||||
type_id: TypeId::of::<T>(),
|
||||
event: Event {
|
||||
original: Box::new(event),
|
||||
timestamp: OffsetDateTime::now_utc(),
|
||||
user_agents: extract_user_agents(request),
|
||||
total: 1,
|
||||
},
|
||||
aggregator_function: downcast_aggregate::<T>,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct SegmentAnalytics {
|
||||
pub instance_uid: InstanceUid,
|
||||
pub user: User,
|
||||
pub sender: Sender<Message>,
|
||||
}
|
||||
|
||||
impl SegmentAnalytics {
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
pub async fn new(
|
||||
opt: &Opt,
|
||||
index_scheduler: Arc<IndexScheduler>,
|
||||
auth_controller: Arc<AuthController>,
|
||||
) -> Option<Arc<Self>> {
|
||||
let instance_uid = super::find_user_id(&opt.db_path);
|
||||
let first_time_run = instance_uid.is_none();
|
||||
let instance_uid = instance_uid.unwrap_or_else(Uuid::new_v4);
|
||||
write_user_id(&opt.db_path, &instance_uid);
|
||||
|
||||
let client = reqwest::Client::builder().connect_timeout(Duration::from_secs(10)).build();
|
||||
|
||||
// if reqwest throws an error we won't be able to send analytics
|
||||
if client.is_err() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let client =
|
||||
HttpClient::new(client.unwrap(), "https://telemetry.meilisearch.com".to_string());
|
||||
let user = User::UserId { user_id: instance_uid.to_string() };
|
||||
let mut batcher = AutoBatcher::new(client, Batcher::new(None), SEGMENT_API_KEY.to_string());
|
||||
|
||||
// If Meilisearch is Launched for the first time:
|
||||
// 1. Send an event Launched associated to the user `total_launch`.
|
||||
// 2. Batch an event Launched with the real instance-id and send it in one hour.
|
||||
if first_time_run {
|
||||
let _ = batcher
|
||||
.push(Track {
|
||||
user: User::UserId { user_id: "total_launch".to_string() },
|
||||
event: "Launched".to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
let _ = batcher.flush().await;
|
||||
let _ = batcher
|
||||
.push(Track {
|
||||
user: user.clone(),
|
||||
event: "Launched".to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
let (sender, inbox) = mpsc::channel(100); // How many analytics can we bufferize
|
||||
|
||||
let segment = Box::new(Segment {
|
||||
inbox,
|
||||
user: user.clone(),
|
||||
opt: opt.clone(),
|
||||
batcher,
|
||||
events: HashMap::new(),
|
||||
});
|
||||
tokio::spawn(segment.run(index_scheduler.clone(), auth_controller.clone()));
|
||||
|
||||
let this = Self { instance_uid, sender, user: user.clone() };
|
||||
|
||||
Some(Arc::new(this))
|
||||
}
|
||||
}
|
||||
|
||||
/// This structure represent the `infos` field we send in the analytics.
|
||||
/// It's quite close to the `Opt` structure except all sensitive informations
|
||||
/// have been simplified to a boolean.
|
||||
/// It's send as-is in amplitude thus you should never update a name of the
|
||||
/// struct without the approval of the PM.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
struct Infos {
|
||||
env: String,
|
||||
experimental_contains_filter: bool,
|
||||
experimental_vector_store: bool,
|
||||
experimental_enable_metrics: bool,
|
||||
experimental_edit_documents_by_function: bool,
|
||||
experimental_search_queue_size: usize,
|
||||
experimental_drop_search_after: usize,
|
||||
experimental_nb_searches_per_core: usize,
|
||||
experimental_logs_mode: LogMode,
|
||||
experimental_replication_parameters: bool,
|
||||
experimental_enable_logs_route: bool,
|
||||
experimental_reduce_indexing_memory_usage: bool,
|
||||
experimental_max_number_of_batched_tasks: usize,
|
||||
gpu_enabled: bool,
|
||||
db_path: bool,
|
||||
import_dump: bool,
|
||||
dump_dir: bool,
|
||||
ignore_missing_dump: bool,
|
||||
ignore_dump_if_db_exists: bool,
|
||||
import_snapshot: bool,
|
||||
schedule_snapshot: Option<u64>,
|
||||
snapshot_dir: bool,
|
||||
ignore_missing_snapshot: bool,
|
||||
ignore_snapshot_if_db_exists: bool,
|
||||
http_addr: bool,
|
||||
http_payload_size_limit: Byte,
|
||||
task_queue_webhook: bool,
|
||||
task_webhook_authorization_header: bool,
|
||||
log_level: String,
|
||||
max_indexing_memory: MaxMemory,
|
||||
max_indexing_threads: MaxThreads,
|
||||
with_configuration_file: bool,
|
||||
ssl_auth_path: bool,
|
||||
ssl_cert_path: bool,
|
||||
ssl_key_path: bool,
|
||||
ssl_ocsp_path: bool,
|
||||
ssl_require_auth: bool,
|
||||
ssl_resumption: bool,
|
||||
ssl_tickets: bool,
|
||||
}
|
||||
|
||||
impl Infos {
|
||||
pub fn new(options: Opt, features: RuntimeTogglableFeatures) -> Self {
|
||||
// We wants to decompose this whole struct by hand to be sure we don't forget
|
||||
// to add analytics when we add a field in the Opt.
|
||||
// Thus we must not insert `..` at the end.
|
||||
let Opt {
|
||||
db_path,
|
||||
experimental_contains_filter,
|
||||
experimental_enable_metrics,
|
||||
experimental_search_queue_size,
|
||||
experimental_drop_search_after,
|
||||
experimental_nb_searches_per_core,
|
||||
experimental_logs_mode,
|
||||
experimental_replication_parameters,
|
||||
experimental_enable_logs_route,
|
||||
experimental_reduce_indexing_memory_usage,
|
||||
experimental_max_number_of_batched_tasks,
|
||||
http_addr,
|
||||
master_key: _,
|
||||
env,
|
||||
task_webhook_url,
|
||||
task_webhook_authorization_header,
|
||||
max_index_size: _,
|
||||
max_task_db_size: _,
|
||||
http_payload_size_limit,
|
||||
ssl_cert_path,
|
||||
ssl_key_path,
|
||||
ssl_auth_path,
|
||||
ssl_ocsp_path,
|
||||
ssl_require_auth,
|
||||
ssl_resumption,
|
||||
ssl_tickets,
|
||||
import_snapshot,
|
||||
ignore_missing_snapshot,
|
||||
ignore_snapshot_if_db_exists,
|
||||
snapshot_dir,
|
||||
schedule_snapshot,
|
||||
import_dump,
|
||||
ignore_missing_dump,
|
||||
ignore_dump_if_db_exists,
|
||||
dump_dir,
|
||||
log_level,
|
||||
indexer_options,
|
||||
config_file_path,
|
||||
no_analytics: _,
|
||||
} = options;
|
||||
|
||||
let schedule_snapshot = match schedule_snapshot {
|
||||
ScheduleSnapshot::Disabled => None,
|
||||
ScheduleSnapshot::Enabled(interval) => Some(interval),
|
||||
};
|
||||
|
||||
let IndexerOpts { max_indexing_memory, max_indexing_threads, skip_index_budget: _ } =
|
||||
indexer_options;
|
||||
|
||||
let RuntimeTogglableFeatures {
|
||||
vector_store,
|
||||
metrics,
|
||||
logs_route,
|
||||
edit_documents_by_function,
|
||||
contains_filter,
|
||||
} = features;
|
||||
|
||||
// We're going to override every sensible information.
|
||||
// We consider information sensible if it contains a path, an address, or a key.
|
||||
Self {
|
||||
env,
|
||||
experimental_contains_filter: experimental_contains_filter | contains_filter,
|
||||
experimental_vector_store: vector_store,
|
||||
experimental_edit_documents_by_function: edit_documents_by_function,
|
||||
experimental_enable_metrics: experimental_enable_metrics | metrics,
|
||||
experimental_search_queue_size,
|
||||
experimental_drop_search_after: experimental_drop_search_after.into(),
|
||||
experimental_nb_searches_per_core: experimental_nb_searches_per_core.into(),
|
||||
experimental_logs_mode,
|
||||
experimental_replication_parameters,
|
||||
experimental_enable_logs_route: experimental_enable_logs_route | logs_route,
|
||||
experimental_reduce_indexing_memory_usage,
|
||||
gpu_enabled: meilisearch_types::milli::vector::is_cuda_enabled(),
|
||||
db_path: db_path != PathBuf::from("./data.ms"),
|
||||
import_dump: import_dump.is_some(),
|
||||
dump_dir: dump_dir != PathBuf::from("dumps/"),
|
||||
ignore_missing_dump,
|
||||
ignore_dump_if_db_exists,
|
||||
import_snapshot: import_snapshot.is_some(),
|
||||
schedule_snapshot,
|
||||
snapshot_dir: snapshot_dir != PathBuf::from("snapshots/"),
|
||||
ignore_missing_snapshot,
|
||||
ignore_snapshot_if_db_exists,
|
||||
http_addr: http_addr != default_http_addr(),
|
||||
http_payload_size_limit,
|
||||
experimental_max_number_of_batched_tasks,
|
||||
task_queue_webhook: task_webhook_url.is_some(),
|
||||
task_webhook_authorization_header: task_webhook_authorization_header.is_some(),
|
||||
log_level: log_level.to_string(),
|
||||
max_indexing_memory,
|
||||
max_indexing_threads,
|
||||
with_configuration_file: config_file_path.is_some(),
|
||||
ssl_auth_path: ssl_auth_path.is_some(),
|
||||
ssl_cert_path: ssl_cert_path.is_some(),
|
||||
ssl_key_path: ssl_key_path.is_some(),
|
||||
ssl_ocsp_path: ssl_ocsp_path.is_some(),
|
||||
ssl_require_auth,
|
||||
ssl_resumption,
|
||||
ssl_tickets,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Segment {
|
||||
inbox: Receiver<Message>,
|
||||
user: User,
|
||||
opt: Opt,
|
||||
batcher: AutoBatcher,
|
||||
events: HashMap<TypeId, Event>,
|
||||
}
|
||||
|
||||
impl Segment {
|
||||
fn compute_traits(opt: &Opt, stats: Stats, features: RuntimeTogglableFeatures) -> Value {
|
||||
static FIRST_START_TIMESTAMP: Lazy<Instant> = Lazy::new(Instant::now);
|
||||
static SYSTEM: Lazy<Value> = Lazy::new(|| {
|
||||
let disks = Disks::new_with_refreshed_list();
|
||||
let mut sys = System::new_all();
|
||||
sys.refresh_all();
|
||||
let kernel_version = System::kernel_version()
|
||||
.and_then(|k| k.split_once('-').map(|(k, _)| k.to_string()));
|
||||
json!({
|
||||
"distribution": System::name(),
|
||||
"kernel_version": kernel_version,
|
||||
"cores": sys.cpus().len(),
|
||||
"ram_size": sys.total_memory(),
|
||||
"disk_size": disks.iter().map(|disk| disk.total_space()).max(),
|
||||
"server_provider": std::env::var("MEILI_SERVER_PROVIDER").ok(),
|
||||
})
|
||||
});
|
||||
let number_of_documents =
|
||||
stats.indexes.values().map(|index| index.number_of_documents).collect::<Vec<u64>>();
|
||||
|
||||
json!({
|
||||
"start_since_days": FIRST_START_TIMESTAMP.elapsed().as_secs() / (60 * 60 * 24), // one day
|
||||
"system": *SYSTEM,
|
||||
"stats": {
|
||||
"database_size": stats.database_size,
|
||||
"indexes_number": stats.indexes.len(),
|
||||
"documents_number": number_of_documents,
|
||||
},
|
||||
"infos": Infos::new(opt.clone(), features),
|
||||
})
|
||||
}
|
||||
|
||||
async fn run(
|
||||
mut self,
|
||||
index_scheduler: Arc<IndexScheduler>,
|
||||
auth_controller: Arc<AuthController>,
|
||||
) {
|
||||
const INTERVAL: Duration = Duration::from_secs(60 * 60); // one hour
|
||||
// The first batch must be sent after one hour.
|
||||
let mut interval =
|
||||
tokio::time::interval_at(tokio::time::Instant::now() + INTERVAL, INTERVAL);
|
||||
|
||||
loop {
|
||||
select! {
|
||||
_ = interval.tick() => {
|
||||
self.tick(index_scheduler.clone(), auth_controller.clone()).await;
|
||||
},
|
||||
Some(msg) = self.inbox.recv() => {
|
||||
self.handle_msg(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_msg(&mut self, Message { type_id, aggregator_function, event }: Message) {
|
||||
let new_event = match self.events.remove(&type_id) {
|
||||
Some(old) => {
|
||||
// The function should never fail since we retrieved the corresponding TypeId in the map. But in the unfortunate
|
||||
// case it could happens we're going to silently ignore the error
|
||||
let Some(original) = (aggregator_function)(old.original, event.original) else {
|
||||
return;
|
||||
};
|
||||
Event {
|
||||
original,
|
||||
// We always want to return the FIRST timestamp ever encountered
|
||||
timestamp: old.timestamp,
|
||||
user_agents: old.user_agents.union(&event.user_agents).cloned().collect(),
|
||||
total: old.total.saturating_add(event.total),
|
||||
}
|
||||
}
|
||||
None => event,
|
||||
};
|
||||
self.events.insert(type_id, new_event);
|
||||
}
|
||||
|
||||
async fn tick(
|
||||
&mut self,
|
||||
index_scheduler: Arc<IndexScheduler>,
|
||||
auth_controller: Arc<AuthController>,
|
||||
) {
|
||||
if let Ok(stats) = create_all_stats(
|
||||
index_scheduler.clone().into(),
|
||||
auth_controller.into(),
|
||||
&AuthFilter::default(),
|
||||
) {
|
||||
// Replace the version number with the prototype name if any.
|
||||
let version = if let Some(prototype) = build_info::DescribeResult::from_build()
|
||||
.and_then(|describe| describe.as_prototype())
|
||||
{
|
||||
prototype
|
||||
} else {
|
||||
env!("CARGO_PKG_VERSION")
|
||||
};
|
||||
|
||||
let _ = self
|
||||
.batcher
|
||||
.push(Identify {
|
||||
context: Some(json!({
|
||||
"app": {
|
||||
"version": version.to_string(),
|
||||
},
|
||||
})),
|
||||
user: self.user.clone(),
|
||||
traits: Self::compute_traits(
|
||||
&self.opt,
|
||||
stats,
|
||||
index_scheduler.features().runtime_features(),
|
||||
),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
// We empty the list of events
|
||||
let events = std::mem::take(&mut self.events);
|
||||
|
||||
for (_, event) in events {
|
||||
let Event { original, timestamp, user_agents, total } = event;
|
||||
let name = original.event_name();
|
||||
let mut properties = original.into_event();
|
||||
if properties["user-agent"].is_null() {
|
||||
properties["user-agent"] = json!(user_agents);
|
||||
};
|
||||
if properties["requests"]["total_received"].is_null() {
|
||||
properties["requests"]["total_received"] = total.into();
|
||||
};
|
||||
|
||||
let _ = self
|
||||
.batcher
|
||||
.push(Track {
|
||||
user: self.user.clone(),
|
||||
event: name.to_string(),
|
||||
properties,
|
||||
timestamp: Some(timestamp),
|
||||
..Default::default()
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
let _ = self.batcher.flush().await;
|
||||
}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue