rewrite most of the analytics especially the settings

This commit is contained in:
Tamo 2024-10-16 15:43:27 +02:00
parent a0b3887709
commit 73e87c152a
12 changed files with 1381 additions and 647 deletions

View File

@ -1,109 +0,0 @@
use std::any::Any;
use std::sync::Arc;
use actix_web::HttpRequest;
use meilisearch_types::InstanceUid;
use serde_json::Value;
use super::{find_user_id, Analytics, DocumentDeletionKind, DocumentFetchKind};
use crate::routes::indexes::documents::{DocumentEditionByFunction, UpdateDocumentsQuery};
use crate::Opt;
pub struct MockAnalytics {
instance_uid: Option<InstanceUid>,
}
#[derive(Default)]
pub struct SearchAggregator;
#[allow(dead_code)]
impl SearchAggregator {
pub fn from_query(_: &dyn Any, _: &dyn Any) -> Self {
Self
}
pub fn succeed(&mut self, _: &dyn Any) {}
}
#[derive(Default)]
pub struct SimilarAggregator;
#[allow(dead_code)]
impl SimilarAggregator {
pub fn from_query(_: &dyn Any, _: &dyn Any) -> Self {
Self
}
pub fn succeed(&mut self, _: &dyn Any) {}
}
#[derive(Default)]
pub struct MultiSearchAggregator;
#[allow(dead_code)]
impl MultiSearchAggregator {
pub fn from_federated_search(_: &dyn Any, _: &dyn Any) -> Self {
Self
}
pub fn succeed(&mut self) {}
}
#[derive(Default)]
pub struct FacetSearchAggregator;
#[allow(dead_code)]
impl FacetSearchAggregator {
pub fn from_query(_: &dyn Any, _: &dyn Any) -> Self {
Self
}
pub fn succeed(&mut self, _: &dyn Any) {}
}
impl MockAnalytics {
#[allow(clippy::new_ret_no_self)]
pub fn new(opt: &Opt) -> Arc<dyn Analytics> {
let instance_uid = find_user_id(&opt.db_path);
Arc::new(Self { instance_uid })
}
}
impl Analytics for MockAnalytics {
fn instance_uid(&self) -> Option<&meilisearch_types::InstanceUid> {
self.instance_uid.as_ref()
}
// These methods are noop and should be optimized out
fn publish(&self, _event_name: String, _send: Value, _request: Option<&HttpRequest>) {}
fn get_search(&self, _aggregate: super::SearchAggregator) {}
fn post_search(&self, _aggregate: super::SearchAggregator) {}
fn get_similar(&self, _aggregate: super::SimilarAggregator) {}
fn post_similar(&self, _aggregate: super::SimilarAggregator) {}
fn post_multi_search(&self, _aggregate: super::MultiSearchAggregator) {}
fn post_facet_search(&self, _aggregate: super::FacetSearchAggregator) {}
fn add_documents(
&self,
_documents_query: &UpdateDocumentsQuery,
_index_creation: bool,
_request: &HttpRequest,
) {
}
fn delete_documents(&self, _kind: DocumentDeletionKind, _request: &HttpRequest) {}
fn update_documents(
&self,
_documents_query: &UpdateDocumentsQuery,
_index_creation: bool,
_request: &HttpRequest,
) {
}
fn update_documents_by_function(
&self,
_documents_query: &DocumentEditionByFunction,
_index_creation: bool,
_request: &HttpRequest,
) {
}
fn get_fetch_documents(&self, _documents_query: &DocumentFetchKind, _request: &HttpRequest) {}
fn post_fetch_documents(&self, _documents_query: &DocumentFetchKind, _request: &HttpRequest) {}
}

View File

@ -1,45 +1,51 @@
mod mock_analytics;
#[cfg(feature = "analytics")]
mod segment_analytics;
pub mod segment_analytics;
use std::any::TypeId;
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use actix_web::HttpRequest;
use meilisearch_types::InstanceUid;
pub use mock_analytics::MockAnalytics;
use once_cell::sync::Lazy;
use platform_dirs::AppDirs;
use serde_json::Value;
use crate::routes::indexes::documents::{DocumentEditionByFunction, UpdateDocumentsQuery};
// if the analytics feature is disabled
// the `SegmentAnalytics` point to the mock instead of the real analytics
#[cfg(not(feature = "analytics"))]
pub type SegmentAnalytics = mock_analytics::MockAnalytics;
#[cfg(not(feature = "analytics"))]
pub type SearchAggregator = mock_analytics::SearchAggregator;
#[cfg(not(feature = "analytics"))]
pub type SimilarAggregator = mock_analytics::SimilarAggregator;
#[cfg(not(feature = "analytics"))]
pub type MultiSearchAggregator = mock_analytics::MultiSearchAggregator;
#[cfg(not(feature = "analytics"))]
pub type FacetSearchAggregator = mock_analytics::FacetSearchAggregator;
use segment::message::User;
use serde::Serialize;
// if the feature analytics is enabled we use the real analytics
#[cfg(feature = "analytics")]
pub type SegmentAnalytics = segment_analytics::SegmentAnalytics;
#[cfg(feature = "analytics")]
pub type SearchAggregator = segment_analytics::SearchAggregator;
#[cfg(feature = "analytics")]
pub use segment_analytics::SearchAggregator;
pub type SimilarAggregator = segment_analytics::SimilarAggregator;
#[cfg(feature = "analytics")]
pub type MultiSearchAggregator = segment_analytics::MultiSearchAggregator;
#[cfg(feature = "analytics")]
pub type FacetSearchAggregator = segment_analytics::FacetSearchAggregator;
/// A macro used to quickly define events that don't aggregate or send anything besides an empty event with its name.
#[macro_export]
macro_rules! empty_analytics {
($struct_name:ident, $event_name:literal) => {
#[derive(Default)]
struct $struct_name {}
impl $crate::analytics::Aggregate for $struct_name {
fn event_name(&self) -> &'static str {
$event_name
}
fn aggregate(self, other: Self) -> Self
where
Self: Sized,
{
self
}
fn into_event(self) -> serde_json::Value {
serde_json::json!({})
}
}
};
}
/// The Meilisearch config dir:
/// `~/.config/Meilisearch` on *NIX or *BSD.
/// `~/Library/ApplicationSupport` on macOS.
@ -78,60 +84,73 @@ pub enum DocumentFetchKind {
Normal { with_filter: bool, limit: usize, offset: usize, retrieve_vectors: bool },
}
pub trait Analytics: Sync + Send {
fn instance_uid(&self) -> Option<&InstanceUid>;
pub trait Aggregate {
fn event_name(&self) -> &'static str;
fn aggregate(self, other: Self) -> Self
where
Self: Sized;
fn into_event(self) -> impl Serialize
where
Self: Sized;
}
/// Helper trait to define multiple aggregate with the same content but a different name.
/// Commonly used when you must aggregate a search with POST or with GET for example.
pub trait AggregateMethod {
fn event_name() -> &'static str;
}
/// A macro used to quickly define multiple aggregate method with their name
#[macro_export]
macro_rules! aggregate_methods {
($method:ident => $event_name:literal) => {
pub enum $method {}
impl $crate::analytics::AggregateMethod for $method {
fn event_name() -> &'static str {
$event_name
}
}
};
($($method:ident => $event_name:literal,)+) => {
$(
aggregate_methods!($method => $event_name);
)+
};
}
pub struct Analytics {
// TODO: TAMO: remove
inner: Option<SegmentAnalytics>,
instance_uid: Option<InstanceUid>,
user: Option<User>,
events: HashMap<TypeId, Box<dyn Aggregate>>,
}
impl Analytics {
fn no_analytics() -> Self {
Self { inner: None, events: HashMap::new(), instance_uid: None, user: None }
}
fn segment_analytics(segment: SegmentAnalytics) -> Self {
Self {
instance_uid: Some(segment.instance_uid),
user: Some(segment.user),
inner: Some(segment),
events: HashMap::new(),
}
}
pub fn instance_uid(&self) -> Option<&InstanceUid> {
self.instance_uid
}
/// The method used to publish most analytics that do not need to be batched every hours
fn publish(&self, event_name: String, send: Value, request: Option<&HttpRequest>);
/// This method should be called to aggregate a get search
fn get_search(&self, aggregate: SearchAggregator);
/// This method should be called to aggregate a post search
fn post_search(&self, aggregate: SearchAggregator);
/// This method should be called to aggregate a get similar request
fn get_similar(&self, aggregate: SimilarAggregator);
/// This method should be called to aggregate a post similar request
fn post_similar(&self, aggregate: SimilarAggregator);
/// This method should be called to aggregate a post array of searches
fn post_multi_search(&self, aggregate: MultiSearchAggregator);
/// This method should be called to aggregate post facet values searches
fn post_facet_search(&self, aggregate: FacetSearchAggregator);
// this method should be called to aggregate an add documents request
fn add_documents(
&self,
documents_query: &UpdateDocumentsQuery,
index_creation: bool,
request: &HttpRequest,
);
// this method should be called to aggregate a fetch documents request
fn get_fetch_documents(&self, documents_query: &DocumentFetchKind, request: &HttpRequest);
// this method should be called to aggregate a fetch documents request
fn post_fetch_documents(&self, documents_query: &DocumentFetchKind, request: &HttpRequest);
// this method should be called to aggregate a add documents request
fn delete_documents(&self, kind: DocumentDeletionKind, request: &HttpRequest);
// this method should be called to batch an update documents request
fn update_documents(
&self,
documents_query: &UpdateDocumentsQuery,
index_creation: bool,
request: &HttpRequest,
);
// this method should be called to batch an update documents by function request
fn update_documents_by_function(
&self,
documents_query: &DocumentEditionByFunction,
index_creation: bool,
request: &HttpRequest,
);
pub fn publish(&self, send: impl Aggregate, request: Option<&HttpRequest>) {
let Some(segment) = self.inner else { return };
}
}

View File

@ -25,7 +25,8 @@ use tokio::sync::mpsc::{self, Receiver, Sender};
use uuid::Uuid;
use super::{
config_user_id_path, DocumentDeletionKind, DocumentFetchKind, MEILISEARCH_CONFIG_PATH,
config_user_id_path, Aggregate, AggregateMethod, DocumentDeletionKind, DocumentFetchKind,
MEILISEARCH_CONFIG_PATH,
};
use crate::analytics::Analytics;
use crate::option::{
@ -40,7 +41,7 @@ use crate::search::{
DEFAULT_HIGHLIGHT_POST_TAG, DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT,
DEFAULT_SEMANTIC_RATIO,
};
use crate::Opt;
use crate::{aggregate_methods, Opt};
const ANALYTICS_HEADER: &str = "X-Meilisearch-Client";
@ -87,9 +88,9 @@ pub enum AnalyticsMsg {
}
pub struct SegmentAnalytics {
instance_uid: InstanceUid,
pub instance_uid: InstanceUid,
sender: Sender<AnalyticsMsg>,
user: User,
pub user: User,
}
impl SegmentAnalytics {
@ -98,7 +99,7 @@ impl SegmentAnalytics {
opt: &Opt,
index_scheduler: Arc<IndexScheduler>,
auth_controller: Arc<AuthController>,
) -> Arc<dyn Analytics> {
) -> Arc<Analytics> {
let instance_uid = super::find_user_id(&opt.db_path);
let first_time_run = instance_uid.is_none();
let instance_uid = instance_uid.unwrap_or_else(Uuid::new_v4);
@ -108,7 +109,7 @@ impl SegmentAnalytics {
// if reqwest throws an error we won't be able to send analytics
if client.is_err() {
return super::MockAnalytics::new(opt);
return Arc::new(Analytics::no_analytics());
}
let client =
@ -161,10 +162,11 @@ impl SegmentAnalytics {
let this = Self { instance_uid, sender, user: user.clone() };
Arc::new(this)
Arc::new(Analytics::segment_analytics(this))
}
}
/*
impl super::Analytics for SegmentAnalytics {
fn instance_uid(&self) -> Option<&InstanceUid> {
Some(&self.instance_uid)
@ -253,6 +255,7 @@ impl super::Analytics for SegmentAnalytics {
let _ = self.sender.try_send(AnalyticsMsg::AggregatePostFetchDocuments(aggregate));
}
}
*/
/// This structure represent the `infos` field we send in the analytics.
/// It's quite close to the `Opt` structure except all sensitive informations
@ -607,12 +610,7 @@ impl Segment {
}
#[derive(Default)]
pub struct SearchAggregator {
timestamp: Option<OffsetDateTime>,
// context
user_agents: HashSet<String>,
pub struct SearchAggregator<Method: AggregateMethod> {
// requests
total_received: usize,
total_succeeded: usize,
@ -684,9 +682,11 @@ pub struct SearchAggregator {
show_ranking_score: bool,
show_ranking_score_details: bool,
ranking_score_threshold: bool,
marker: std::marker::PhantomData<Method>,
}
impl SearchAggregator {
impl<Method: AggregateMethod> SearchAggregator<Method> {
#[allow(clippy::field_reassign_with_default)]
pub fn from_query(query: &SearchQuery, request: &HttpRequest) -> Self {
let SearchQuery {
@ -827,12 +827,21 @@ impl SearchAggregator {
}
self.time_spent.push(*processing_time_ms as usize);
}
}
/// Aggregate one [SearchAggregator] into another.
pub fn aggregate(&mut self, mut other: Self) {
aggregate_methods!(
SearchGET => "Documents Searched GET",
SearchPOST => "Documents Searched POST",
);
impl<Method: AggregateMethod> Aggregate for SearchAggregator<Method> {
fn event_name(&self) -> &'static str {
Method::event_name()
}
fn aggregate(mut self, mut other: Self) -> Self {
let Self {
timestamp,
user_agents,
total_received,
total_succeeded,
ref mut time_spent,
@ -871,17 +880,9 @@ impl SearchAggregator {
total_used_negative_operator,
ranking_score_threshold,
ref mut locales,
marker: _,
} = other;
if self.timestamp.is_none() {
self.timestamp = timestamp;
}
// context
for user_agent in user_agents.into_iter() {
self.user_agents.insert(user_agent);
}
// request
self.total_received = self.total_received.saturating_add(total_received);
self.total_succeeded = self.total_succeeded.saturating_add(total_succeeded);
@ -961,12 +962,12 @@ impl SearchAggregator {
// locales
self.locales.append(locales);
self
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
fn into_event(self) -> Option<Track> {
let Self {
timestamp,
user_agents,
total_received,
total_succeeded,
time_spent,
@ -1005,11 +1006,9 @@ impl SearchAggregator {
total_used_negative_operator,
ranking_score_threshold,
locales,
marker: _,
} = self;
if total_received == 0 {
None
} else {
// we get all the values in a sorted manner
let time_spent = time_spent.into_sorted_vec();
// the index of the 99th percentage of value
@ -1017,8 +1016,7 @@ impl SearchAggregator {
// We are only interested by the slowest value of the 99th fastest results
let time_spent = time_spent.get(percentile_99th);
let properties = json!({
"user-agent": user_agents,
json!({
"requests": {
"99th_response_time": time_spent.map(|t| format!("{:.2}", t)),
"total_succeeded": total_succeeded,
@ -1079,17 +1077,8 @@ impl SearchAggregator {
"show_ranking_score_details": show_ranking_score_details,
"ranking_score_threshold": ranking_score_threshold,
},
});
Some(Track {
timestamp,
user: user.clone(),
event: event_name.to_string(),
properties,
..Default::default()
})
}
}
}
#[derive(Default)]

View File

@ -120,7 +120,7 @@ pub fn create_app(
search_queue: Data<SearchQueue>,
opt: Opt,
logs: (LogRouteHandle, LogStderrHandle),
analytics: Arc<dyn Analytics>,
analytics: Arc<Analytics>,
enable_dashboard: bool,
) -> actix_web::App<
impl ServiceFactory<
@ -473,7 +473,7 @@ pub fn configure_data(
search_queue: Data<SearchQueue>,
opt: &Opt,
(logs_route, logs_stderr): (LogRouteHandle, LogStderrHandle),
analytics: Arc<dyn Analytics>,
analytics: Arc<Analytics>,
) {
let http_payload_size_limit = opt.http_payload_size_limit.as_u64() as usize;
config

View File

@ -4,7 +4,6 @@ use index_scheduler::IndexScheduler;
use meilisearch_auth::AuthController;
use meilisearch_types::error::ResponseError;
use meilisearch_types::tasks::KindWithContent;
use serde_json::json;
use tracing::debug;
use crate::analytics::Analytics;
@ -18,14 +17,16 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_dump))));
}
crate::empty_analytics!(DumpAnalytics, "Dump Created");
pub async fn create_dump(
index_scheduler: GuardedData<ActionPolicy<{ actions::DUMPS_CREATE }>, Data<IndexScheduler>>,
auth_controller: GuardedData<ActionPolicy<{ actions::DUMPS_CREATE }>, Data<AuthController>>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
analytics.publish("Dump Created".to_string(), json!({}), Some(&req));
analytics.publish(DumpAnalytics::default(), Some(&req));
let task = KindWithContent::DumpCreation {
keys: auth_controller.list_keys()?,

View File

@ -6,10 +6,11 @@ use index_scheduler::IndexScheduler;
use meilisearch_types::deserr::DeserrJsonError;
use meilisearch_types::error::ResponseError;
use meilisearch_types::keys::actions;
use serde::Serialize;
use serde_json::json;
use tracing::debug;
use crate::analytics::Analytics;
use crate::analytics::{Aggregate, Analytics};
use crate::extractors::authentication::policies::ActionPolicy;
use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler;
@ -22,17 +23,19 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
);
}
crate::empty_analytics!(GetExperimentalFeatureAnalytics, "Experimental features Seen");
async fn get_features(
index_scheduler: GuardedData<
ActionPolicy<{ actions::EXPERIMENTAL_FEATURES_GET }>,
Data<IndexScheduler>,
>,
req: HttpRequest,
analytics: Data<dyn Analytics>,
analytics: Data<Analytics>,
) -> HttpResponse {
let features = index_scheduler.features();
analytics.publish("Experimental features Seen".to_string(), json!(null), Some(&req));
analytics.publish(GetExperimentalFeatureAnalytics::default(), Some(&req));
let features = features.runtime_features();
debug!(returns = ?features, "Get features");
HttpResponse::Ok().json(features)
@ -53,6 +56,38 @@ pub struct RuntimeTogglableFeatures {
pub contains_filter: Option<bool>,
}
#[derive(Serialize)]
pub struct PatchExperimentalFeatureAnalytics {
vector_store: bool,
metrics: bool,
logs_route: bool,
edit_documents_by_function: bool,
contains_filter: bool,
}
impl Aggregate for PatchExperimentalFeatureAnalytics {
fn event_name(&self) -> &'static str {
"Experimental features Updated"
}
fn aggregate(self, other: Self) -> Self
where
Self: Sized,
{
Self {
vector_store: other.vector_store,
metrics: other.metrics,
logs_route: other.logs_route,
edit_documents_by_function: other.edit_documents_by_function,
contains_filter: other.contains_filter,
}
}
fn into_event(self) -> serde_json::Value {
serde_json::to_value(self).unwrap()
}
}
async fn patch_features(
index_scheduler: GuardedData<
ActionPolicy<{ actions::EXPERIMENTAL_FEATURES_UPDATE }>,
@ -60,7 +95,7 @@ async fn patch_features(
>,
new_features: AwebJson<RuntimeTogglableFeatures, DeserrJsonError>,
req: HttpRequest,
analytics: Data<dyn Analytics>,
analytics: Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let features = index_scheduler.features();
debug!(parameters = ?new_features, "Patch features");
@ -89,14 +124,13 @@ async fn patch_features(
} = new_features;
analytics.publish(
"Experimental features Updated".to_string(),
json!({
"vector_store": vector_store,
"metrics": metrics,
"logs_route": logs_route,
"edit_documents_by_function": edit_documents_by_function,
"contains_filter": contains_filter,
}),
PatchExperimentalFeatureAnalytics {
vector_store,
metrics,
logs_route,
edit_documents_by_function,
contains_filter,
},
Some(&req),
);
index_scheduler.put_runtime_features(new_features)?;

View File

@ -1,4 +1,6 @@
use std::collections::HashSet;
use std::io::ErrorKind;
use std::marker::PhantomData;
use actix_web::http::header::CONTENT_TYPE;
use actix_web::web::Data;
@ -23,14 +25,14 @@ use meilisearch_types::tasks::KindWithContent;
use meilisearch_types::{milli, Document, Index};
use mime::Mime;
use once_cell::sync::Lazy;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tempfile::tempfile;
use tokio::fs::File;
use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufWriter};
use tracing::debug;
use crate::analytics::{Analytics, DocumentDeletionKind, DocumentFetchKind};
use crate::analytics::{Aggregate, AggregateMethod, Analytics, DocumentDeletionKind};
use crate::error::MeilisearchHttpError;
use crate::error::PayloadError::ReceivePayload;
use crate::extractors::authentication::policies::*;
@ -41,7 +43,7 @@ use crate::routes::{
get_task_id, is_dry_run, PaginationView, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT,
};
use crate::search::{parse_filter, RetrieveVectors};
use crate::Opt;
use crate::{aggregate_methods, Opt};
static ACCEPTED_CONTENT_TYPE: Lazy<Vec<String>> = Lazy::new(|| {
vec!["application/json".to_string(), "application/x-ndjson".to_string(), "text/csv".to_string()]
@ -100,12 +102,82 @@ pub struct GetDocument {
retrieve_vectors: Param<bool>,
}
#[derive(Default, Serialize)]
pub struct DocumentsFetchAggregator {
#[serde(rename = "requests.total_received")]
total_received: usize,
// a call on ../documents/:doc_id
per_document_id: bool,
// if a filter was used
per_filter: bool,
#[serde(rename = "vector.retrieve_vectors")]
retrieve_vectors: bool,
// pagination
#[serde(rename = "pagination.max_limit")]
max_limit: usize,
#[serde(rename = "pagination.max_offset")]
max_offset: usize,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum DocumentFetchKind {
PerDocumentId { retrieve_vectors: bool },
Normal { with_filter: bool, limit: usize, offset: usize, retrieve_vectors: bool },
}
impl DocumentsFetchAggregator {
pub fn from_query(query: &DocumentFetchKind) -> Self {
let (limit, offset, retrieve_vectors) = match query {
DocumentFetchKind::PerDocumentId { retrieve_vectors } => (1, 0, *retrieve_vectors),
DocumentFetchKind::Normal { limit, offset, retrieve_vectors, .. } => {
(*limit, *offset, *retrieve_vectors)
}
};
Self {
total_received: 1,
per_document_id: matches!(query, DocumentFetchKind::PerDocumentId { .. }),
per_filter: matches!(query, DocumentFetchKind::Normal { with_filter, .. } if *with_filter),
max_limit: limit,
max_offset: offset,
retrieve_vectors,
}
}
}
impl Aggregate for DocumentsFetchAggregator {
// TODO: TAMO: Should we do the same event for the GET requests
fn event_name(&self) -> &'static str {
"Documents Fetched POST"
}
fn aggregate(self, other: Self) -> Self
where
Self: Sized,
{
Self {
total_received: self.total_received.saturating_add(other.total_received),
per_document_id: self.per_document_id | other.per_document_id,
per_filter: self.per_filter | other.per_filter,
retrieve_vectors: self.retrieve_vectors | other.retrieve_vectors,
max_limit: self.max_limit.max(other.max_limit),
max_offset: self.max_offset.max(other.max_offset),
}
}
fn into_event(self) -> Value {
serde_json::to_value(self).unwrap()
}
}
pub async fn get_document(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_GET }>, Data<IndexScheduler>>,
document_param: web::Path<DocumentParam>,
params: AwebQueryParameter<GetDocument, DeserrQueryParamError>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let DocumentParam { index_uid, document_id } = document_param.into_inner();
debug!(parameters = ?params, "Get document");
@ -117,9 +189,12 @@ pub async fn get_document(
let features = index_scheduler.features();
let retrieve_vectors = RetrieveVectors::new(param_retrieve_vectors.0, features)?;
analytics.get_fetch_documents(
&DocumentFetchKind::PerDocumentId { retrieve_vectors: param_retrieve_vectors.0 },
&req,
analytics.publish(
DocumentsFetchAggregator {
retrieve_vectors: param_retrieve_vectors.0,
..Default::default()
},
Some(&req),
);
let index = index_scheduler.index(&index_uid)?;
@ -129,17 +204,57 @@ pub async fn get_document(
Ok(HttpResponse::Ok().json(document))
}
#[derive(Default, Serialize)]
pub struct DocumentsDeletionAggregator {
#[serde(rename = "requests.total_received")]
total_received: usize,
per_document_id: bool,
clear_all: bool,
per_batch: bool,
per_filter: bool,
}
impl Aggregate for DocumentsDeletionAggregator {
fn event_name(&self) -> &'static str {
"Documents Deleted"
}
fn aggregate(self, other: Self) -> Self
where
Self: Sized,
{
Self {
total_received: self.total_received.saturating_add(other.total_received),
per_document_id: self.per_document_id | other.per_document_id,
clear_all: self.clear_all | other.clear_all,
per_batch: self.per_batch | other.per_batch,
per_filter: self.per_filter | other.per_filter,
}
}
fn into_event(self) -> Value {
serde_json::to_value(self).unwrap()
}
}
pub async fn delete_document(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_DELETE }>, Data<IndexScheduler>>,
path: web::Path<DocumentParam>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let DocumentParam { index_uid, document_id } = path.into_inner();
let index_uid = IndexUid::try_from(index_uid)?;
analytics.delete_documents(DocumentDeletionKind::PerDocumentId, &req);
analytics.publish(
DocumentsDeletionAggregator {
total_received: 1,
per_document_id: true,
..Default::default()
},
Some(&req),
);
let task = KindWithContent::DocumentDeletion {
index_uid: index_uid.to_string(),
@ -190,19 +305,21 @@ pub async fn documents_by_query_post(
index_uid: web::Path<String>,
body: AwebJson<BrowseQuery, DeserrJsonError>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let body = body.into_inner();
debug!(parameters = ?body, "Get documents POST");
analytics.post_fetch_documents(
&DocumentFetchKind::Normal {
with_filter: body.filter.is_some(),
limit: body.limit,
offset: body.offset,
analytics.publish(
DocumentsFetchAggregator {
total_received: 1,
per_filter: body.filter.is_some(),
retrieve_vectors: body.retrieve_vectors,
max_limit: body.limit,
max_offset: body.offset,
..Default::default()
},
&req,
Some(&req),
);
documents_by_query(&index_scheduler, index_uid, body)
@ -213,7 +330,7 @@ pub async fn get_documents(
index_uid: web::Path<String>,
params: AwebQueryParameter<BrowseQueryGet, DeserrQueryParamError>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?params, "Get documents GET");
@ -235,14 +352,16 @@ pub async fn get_documents(
filter,
};
analytics.get_fetch_documents(
&DocumentFetchKind::Normal {
with_filter: query.filter.is_some(),
limit: query.limit,
offset: query.offset,
analytics.publish(
DocumentsFetchAggregator {
total_received: 1,
per_filter: query.filter.is_some(),
retrieve_vectors: query.retrieve_vectors,
max_limit: query.limit,
max_offset: query.offset,
..Default::default()
},
&req,
Some(&req),
);
documents_by_query(&index_scheduler, index_uid, query)
@ -298,6 +417,42 @@ fn from_char_csv_delimiter(
}
}
aggregate_methods!(
Replaced => "Documents Added",
Updated => "Documents Updated",
);
#[derive(Default, Serialize)]
pub struct DocumentsAggregator<T: AggregateMethod> {
payload_types: HashSet<String>,
primary_key: HashSet<String>,
index_creation: bool,
#[serde(skip)]
method: PhantomData<T>,
}
impl<Method: AggregateMethod> Aggregate for DocumentsAggregator<Method> {
fn event_name(&self) -> &'static str {
Method::event_name()
}
fn aggregate(mut self, other: Self) -> Self
where
Self: Sized,
{
Self {
payload_types: self.payload_types.union(&other.payload_types).collect(),
primary_key: self.primary_key.union(&other.primary_key).collect(),
index_creation: self.index_creation | other.index_creation,
method: PhantomData,
}
}
fn into_event(self) -> Value {
serde_json::to_value(self).unwrap()
}
}
pub async fn replace_documents(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ADD }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
@ -305,17 +460,33 @@ pub async fn replace_documents(
body: Payload,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
debug!(parameters = ?params, "Replace documents");
let params = params.into_inner();
analytics.add_documents(
&params,
index_scheduler.index_exists(&index_uid).map_or(true, |x| !x),
&req,
let mut content_types = HashSet::new();
let content_type = req
.headers()
.get(CONTENT_TYPE)
.and_then(|s| s.to_str().ok())
.unwrap_or("unknown")
.to_string();
content_types.insert(content_type);
let mut primary_keys = HashSet::new();
if let Some(primary_key) = params.primary_key.clone() {
primary_keys.insert(primary_key);
}
analytics.publish(
DocumentsAggregator::<Replaced> {
payload_types: content_types,
primary_key: primary_keys,
index_creation: index_scheduler.index_exists(&index_uid).map_or(true, |x| !x),
method: PhantomData,
},
Some(&req),
);
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
@ -346,17 +517,33 @@ pub async fn update_documents(
body: Payload,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let params = params.into_inner();
debug!(parameters = ?params, "Update documents");
analytics.add_documents(
&params,
index_scheduler.index_exists(&index_uid).map_or(true, |x| !x),
&req,
let mut content_types = HashSet::new();
let content_type = req
.headers()
.get(CONTENT_TYPE)
.and_then(|s| s.to_str().ok())
.unwrap_or("unknown")
.to_string();
content_types.insert(content_type);
let mut primary_keys = HashSet::new();
if let Some(primary_key) = params.primary_key.clone() {
primary_keys.insert(primary_key);
}
analytics.publish(
DocumentsAggregator::<Updated> {
payload_types: content_types,
primary_key: primary_keys,
index_creation: index_scheduler.index_exists(&index_uid).map_or(true, |x| !x),
method: PhantomData,
},
Some(&req),
);
let allow_index_creation = index_scheduler.filters().allow_index_creation(&index_uid);
@ -524,12 +711,15 @@ pub async fn delete_documents_batch(
body: web::Json<Vec<Value>>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Delete documents by batch");
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
analytics.delete_documents(DocumentDeletionKind::PerBatch, &req);
analytics.publish(
DocumentsDeletionAggregator { total_received: 1, per_batch: true, ..Default::default() },
Some(&req),
);
let ids = body
.iter()
@ -562,14 +752,17 @@ pub async fn delete_documents_by_filter(
body: AwebJson<DocumentDeletionByFilter, DeserrJsonError>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Delete documents by filter");
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let index_uid = index_uid.into_inner();
let filter = body.into_inner().filter;
analytics.delete_documents(DocumentDeletionKind::PerFilter, &req);
analytics.publish(
DocumentsDeletionAggregator { total_received: 1, per_filter: true, ..Default::default() },
Some(&req),
);
// we ensure the filter is well formed before enqueuing it
crate::search::parse_filter(&filter, Code::InvalidDocumentFilter, index_scheduler.features())?
@ -599,13 +792,44 @@ pub struct DocumentEditionByFunction {
pub function: String,
}
#[derive(Default, Serialize)]
struct EditDocumentsByFunctionAggregator {
// Set to true if at least one request was filtered
filtered: bool,
// Set to true if at least one request contained a context
with_context: bool,
index_creation: bool,
}
impl Aggregate for EditDocumentsByFunctionAggregator {
fn event_name(&self) -> &'static str {
"Documents Edited By Function"
}
fn aggregate(self, other: Self) -> Self
where
Self: Sized,
{
Self {
filtered: self.filtered | other.filtered,
with_context: self.with_context | other.with_context,
index_creation: self.index_creation | other.index_creation,
}
}
fn into_event(self) -> Value {
serde_json::to_value(self).unwrap()
}
}
pub async fn edit_documents_by_function(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ALL }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
params: AwebJson<DocumentEditionByFunction, DeserrJsonError>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?params, "Edit documents by function");
@ -617,10 +841,13 @@ pub async fn edit_documents_by_function(
let index_uid = index_uid.into_inner();
let params = params.into_inner();
analytics.update_documents_by_function(
&params,
index_scheduler.index(&index_uid).is_err(),
&req,
analytics.publish(
EditDocumentsByFunctionAggregator {
filtered: params.filter.is_some(),
with_context: params.context.is_some(),
index_creation: index_scheduler.index(&index_uid).is_err(),
},
Some(&req),
);
let DocumentEditionByFunction { filter, context, function } = params;
@ -670,10 +897,13 @@ pub async fn clear_all_documents(
index_uid: web::Path<String>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
analytics.delete_documents(DocumentDeletionKind::ClearAll, &req);
analytics.publish(
DocumentsDeletionAggregator { total_received: 1, clear_all: true, ..Default::default() },
Some(&req),
);
let task = KindWithContent::DocumentClear { index_uid: index_uid.to_string() };
let uid = get_task_id(&req, &opt)?;

View File

@ -1,3 +1,5 @@
use std::collections::{BinaryHeap, HashSet};
use actix_web::web::Data;
use actix_web::{web, HttpRequest, HttpResponse};
use deserr::actix_web::AwebJson;
@ -10,14 +12,15 @@ use meilisearch_types::locales::Locale;
use serde_json::Value;
use tracing::debug;
use crate::analytics::{Analytics, FacetSearchAggregator};
use crate::analytics::{Aggregate, Analytics};
use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData;
use crate::routes::indexes::search::search_kind;
use crate::search::{
add_search_rules, perform_facet_search, HybridQuery, MatchingStrategy, RankingScoreThreshold,
SearchQuery, DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, DEFAULT_HIGHLIGHT_POST_TAG,
DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, DEFAULT_SEARCH_OFFSET,
add_search_rules, perform_facet_search, FacetSearchResult, HybridQuery, MatchingStrategy,
RankingScoreThreshold, SearchQuery, DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER,
DEFAULT_HIGHLIGHT_POST_TAG, DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT,
DEFAULT_SEARCH_OFFSET,
};
use crate::search_queue::SearchQueue;
@ -53,13 +56,110 @@ pub struct FacetSearchQuery {
pub locales: Option<Vec<Locale>>,
}
#[derive(Default)]
pub struct FacetSearchAggregator {
// requests
total_received: usize,
total_succeeded: usize,
time_spent: BinaryHeap<usize>,
// The set of all facetNames that were used
facet_names: HashSet<String>,
// As there been any other parameter than the facetName or facetQuery ones?
additional_search_parameters_provided: bool,
}
impl FacetSearchAggregator {
#[allow(clippy::field_reassign_with_default)]
pub fn from_query(query: &FacetSearchQuery, request: &HttpRequest) -> Self {
let FacetSearchQuery {
facet_query: _,
facet_name,
vector,
q,
filter,
matching_strategy,
attributes_to_search_on,
hybrid,
ranking_score_threshold,
locales,
} = query;
Self {
total_received: 1,
facet_names: Some(facet_name.clone()).into_iter().collect(),
additional_search_parameters_provided: q.is_some()
|| vector.is_some()
|| filter.is_some()
|| *matching_strategy != MatchingStrategy::default()
|| attributes_to_search_on.is_some()
|| hybrid.is_some()
|| ranking_score_threshold.is_some()
|| locales.is_some(),
..Default::default()
}
}
pub fn succeed(&mut self, result: &FacetSearchResult) {
let FacetSearchResult { facet_hits: _, facet_query: _, processing_time_ms } = result;
self.total_succeeded = 1;
self.time_spent.push(*processing_time_ms as usize);
}
}
impl Aggregate for FacetSearchAggregator {
fn event_name(&self) -> &'static str {
"Facet Searched POST"
}
fn aggregate(mut self, other: Self) -> Self
where
Self: Sized,
{
self.time_spent.insert(other.time_spent);
Self {
total_received: self.total_received.saturating_add(other.total_received),
total_succeeded: self.total_succeeded.saturating_add(other.total_succeeded),
time_spent: self.time_spent,
facet_names: self.facet_names.union(&other.facet_names).collect(),
additional_search_parameters_provided: self.additional_search_parameters_provided
| other.additional_search_parameters_provided,
}
}
fn into_event(self) -> Value {
let Self {
total_received,
total_succeeded,
time_spent,
facet_names,
additional_search_parameters_provided,
} = self;
serde_json::json!({
"requests": {
"99th_response_time": time_spent.map(|t| format!("{:.2}", t)),
"total_succeeded": total_succeeded,
"total_failed": total_received.saturating_sub(total_succeeded), // just to be sure we never panics
"total_received": total_received,
},
"facets": {
"total_distinct_facet_count": facet_names.len(),
"additional_search_parameters_provided": additional_search_parameters_provided,
},
})
}
}
pub async fn search(
index_scheduler: GuardedData<ActionPolicy<{ actions::SEARCH }>, Data<IndexScheduler>>,
search_queue: Data<SearchQueue>,
index_uid: web::Path<String>,
params: AwebJson<FacetSearchQuery, DeserrJsonError>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -100,7 +200,7 @@ pub async fn search(
if let Ok(ref search_result) = search_result {
aggregate.succeed(search_result);
}
analytics.post_facet_search(aggregate);
analytics.publish(aggregate, Some(&req));
let search_result = search_result?;

View File

@ -1,3 +1,4 @@
use std::collections::BTreeSet;
use std::convert::Infallible;
use actix_web::web::Data;
@ -18,7 +19,7 @@ use time::OffsetDateTime;
use tracing::debug;
use super::{get_task_id, Pagination, SummarizedTaskView, PAGINATION_DEFAULT_LIMIT};
use crate::analytics::Analytics;
use crate::analytics::{Aggregate, Analytics};
use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::{AuthenticationError, GuardedData};
use crate::extractors::sequential_extractor::SeqHandler;
@ -123,12 +124,34 @@ pub struct IndexCreateRequest {
primary_key: Option<String>,
}
#[derive(Serialize)]
struct IndexCreatedAggregate {
primary_key: BTreeSet<String>,
}
impl Aggregate for IndexCreatedAggregate {
fn event_name(&self) -> &'static str {
"Index Created"
}
fn aggregate(self, other: Self) -> Self
where
Self: Sized,
{
Self { primary_key: self.primary_key.union(&other.primary_key).collect() }
}
fn into_event(self) -> impl Serialize {
self
}
}
pub async fn create_index(
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_CREATE }>, Data<IndexScheduler>>,
body: AwebJson<IndexCreateRequest, DeserrJsonError>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Create index");
let IndexCreateRequest { primary_key, uid } = body.into_inner();
@ -136,8 +159,7 @@ pub async fn create_index(
let allow_index_creation = index_scheduler.filters().allow_index_creation(&uid);
if allow_index_creation {
analytics.publish(
"Index Created".to_string(),
json!({ "primary_key": primary_key }),
IndexCreatedAggregate { primary_key: primary_key.iter().cloned().collect() },
Some(&req),
);
@ -194,20 +216,37 @@ pub async fn get_index(
Ok(HttpResponse::Ok().json(index_view))
}
#[derive(Serialize)]
struct IndexUpdatedAggregate {
primary_key: BTreeSet<String>,
}
impl Aggregate for IndexUpdatedAggregate {
fn event_name(&self) -> &'static str {
"Index Updated"
}
fn aggregate(self, other: Self) -> Self {
Self { primary_key: self.primary_key.union(&other.primary_key).collect() }
}
fn into_event(self) -> impl Serialize {
self
}
}
pub async fn update_index(
index_scheduler: GuardedData<ActionPolicy<{ actions::INDEXES_UPDATE }>, Data<IndexScheduler>>,
index_uid: web::Path<String>,
body: AwebJson<UpdateIndexRequest, DeserrJsonError>,
req: HttpRequest,
opt: web::Data<Opt>,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Update index");
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let body = body.into_inner();
analytics.publish(
"Index Updated".to_string(),
json!({ "primary_key": body.primary_key }),
IndexUpdatedAggregate { primary_key: body.primary_key.iter().cloned().collect() },
Some(&req),
);

View File

@ -13,6 +13,7 @@ use meilisearch_types::serde_cs::vec::CS;
use serde_json::Value;
use tracing::debug;
use crate::analytics::segment_analytics::{SearchGET, SearchPOST};
use crate::analytics::{Analytics, SearchAggregator};
use crate::error::MeilisearchHttpError;
use crate::extractors::authentication::policies::*;
@ -225,7 +226,7 @@ pub async fn search_with_url_query(
index_uid: web::Path<String>,
params: AwebQueryParameter<SearchQueryGet, DeserrQueryParamError>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?params, "Search get");
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -237,7 +238,7 @@ pub async fn search_with_url_query(
add_search_rules(&mut query.filter, search_rules);
}
let mut aggregate = SearchAggregator::from_query(&query, &req);
let mut aggregate = SearchAggregator::<SearchGET>::from_query(&query, &req);
let index = index_scheduler.index(&index_uid)?;
let features = index_scheduler.features();
@ -254,7 +255,7 @@ pub async fn search_with_url_query(
if let Ok(ref search_result) = search_result {
aggregate.succeed(search_result);
}
analytics.get_search(aggregate);
analytics.publish(aggregate, Some(&req));
let search_result = search_result?;
@ -268,7 +269,7 @@ pub async fn search_with_post(
index_uid: web::Path<String>,
params: AwebJson<SearchQuery, DeserrJsonError>,
req: HttpRequest,
analytics: web::Data<dyn Analytics>,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
let index_uid = IndexUid::try_from(index_uid.into_inner())?;
@ -280,7 +281,7 @@ pub async fn search_with_post(
add_search_rules(&mut query.filter, search_rules);
}
let mut aggregate = SearchAggregator::from_query(&query, &req);
let mut aggregate = SearchAggregator::<SearchPOST>::from_query(&query, &req);
let index = index_scheduler.index(&index_uid)?;
@ -302,7 +303,7 @@ pub async fn search_with_post(
MEILISEARCH_DEGRADED_SEARCH_REQUESTS.inc();
}
}
analytics.post_search(aggregate);
analytics.publish(aggregate, Some(&req));
let search_result = search_result?;

File diff suppressed because it is too large Load Diff

View File

@ -40,7 +40,7 @@ pub async fn swap_indexes(
analytics.publish(
"Indexes Swapped".to_string(),
json!({
"swap_operation_number": params.len(),
"swap_operation_number": params.len(), // Return the max ever encountered
}),
Some(&req),
);