finish the analytics in all the routes

This commit is contained in:
Tamo 2024-10-16 21:17:06 +02:00
parent fdeb47fb54
commit ea6883189e
10 changed files with 84 additions and 199 deletions

View File

@ -1,7 +1,5 @@
pub mod segment_analytics;
use std::any::TypeId;
use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::str::FromStr;
@ -10,7 +8,6 @@ use actix_web::HttpRequest;
use meilisearch_types::InstanceUid;
use once_cell::sync::Lazy;
use platform_dirs::AppDirs;
use segment::message::User;
use serde::Serialize;
// if the feature analytics is enabled we use the real analytics
@ -83,7 +80,7 @@ pub enum DocumentFetchKind {
Normal { with_filter: bool, limit: usize, offset: usize, retrieve_vectors: bool },
}
pub trait Aggregate {
pub trait Aggregate: 'static {
fn event_name(&self) -> &'static str;
fn aggregate(self, other: Self) -> Self
@ -97,7 +94,7 @@ pub trait Aggregate {
/// Helper trait to define multiple aggregate with the same content but a different name.
/// Commonly used when you must aggregate a search with POST or with GET for example.
pub trait AggregateMethod {
pub trait AggregateMethod: 'static + Default {
fn event_name() -> &'static str;
}
@ -105,7 +102,8 @@ pub trait AggregateMethod {
#[macro_export]
macro_rules! aggregate_methods {
($method:ident => $event_name:literal) => {
pub enum $method {}
#[derive(Default)]
pub struct $method {}
impl $crate::analytics::AggregateMethod for $method {
fn event_name() -> &'static str {
@ -122,35 +120,26 @@ macro_rules! aggregate_methods {
}
pub struct Analytics {
// TODO: TAMO: remove
inner: Option<SegmentAnalytics>,
instance_uid: Option<InstanceUid>,
user: Option<User>,
events: HashMap<TypeId, Box<dyn Aggregate>>,
segment: Option<SegmentAnalytics>,
}
impl Analytics {
fn no_analytics() -> Self {
Self { inner: None, events: HashMap::new(), instance_uid: None, user: None }
Self { segment: None }
}
fn segment_analytics(segment: SegmentAnalytics) -> Self {
Self {
instance_uid: Some(segment.instance_uid),
user: Some(segment.user),
inner: Some(segment),
events: HashMap::new(),
}
Self { segment: Some(segment) }
}
pub fn instance_uid(&self) -> Option<&InstanceUid> {
self.instance_uid
self.segment.as_ref().map(|segment| segment.instance_uid.as_ref())
}
/// The method used to publish most analytics that do not need to be batched every hours
pub fn publish(&self, send: impl Aggregate, request: &HttpRequest) {
let Some(segment) = self.inner else { return };
pub fn publish(&self, event: impl Aggregate, request: &HttpRequest) {
let Some(ref segment) = self.segment else { return };
let user_agents = extract_user_agents(request);
let _ = segment.sender.try_send(Box::new(event));
}
}

View File

@ -1,3 +1,4 @@
use std::any::{Any, TypeId};
use std::collections::{BTreeSet, BinaryHeap, HashMap, HashSet};
use std::fs;
use std::mem::take;
@ -74,6 +75,7 @@ pub fn extract_user_agents(request: &HttpRequest) -> Vec<String> {
pub struct SegmentAnalytics {
pub instance_uid: InstanceUid,
pub user: User,
pub sender: Sender<Box<dyn Aggregate>>,
}
impl SegmentAnalytics {
@ -128,18 +130,7 @@ impl SegmentAnalytics {
user: user.clone(),
opt: opt.clone(),
batcher,
post_search_aggregator: SearchAggregator::default(),
post_multi_search_aggregator: MultiSearchAggregator::default(),
post_facet_search_aggregator: FacetSearchAggregator::default(),
get_search_aggregator: SearchAggregator::default(),
add_documents_aggregator: DocumentsAggregator::default(),
delete_documents_aggregator: DocumentsDeletionAggregator::default(),
update_documents_aggregator: DocumentsAggregator::default(),
edit_documents_by_function_aggregator: EditDocumentsByFunctionAggregator::default(),
get_fetch_documents_aggregator: DocumentsFetchAggregator::default(),
post_fetch_documents_aggregator: DocumentsFetchAggregator::default(),
get_similar_aggregator: SimilarAggregator::default(),
post_similar_aggregator: SimilarAggregator::default(),
events: todo!(),
});
tokio::spawn(segment.run(index_scheduler.clone(), auth_controller.clone()));
@ -387,22 +378,11 @@ impl From<Opt> for Infos {
}
pub struct Segment {
inbox: Receiver<AnalyticsMsg>,
inbox: Receiver<Box<dyn Aggregate>>,
user: User,
opt: Opt,
batcher: AutoBatcher,
get_search_aggregator: SearchAggregator,
post_search_aggregator: SearchAggregator,
post_multi_search_aggregator: MultiSearchAggregator,
post_facet_search_aggregator: FacetSearchAggregator,
add_documents_aggregator: DocumentsAggregator,
delete_documents_aggregator: DocumentsDeletionAggregator,
update_documents_aggregator: DocumentsAggregator,
edit_documents_by_function_aggregator: EditDocumentsByFunctionAggregator,
get_fetch_documents_aggregator: DocumentsFetchAggregator,
post_fetch_documents_aggregator: DocumentsFetchAggregator,
get_similar_aggregator: SimilarAggregator,
post_similar_aggregator: SimilarAggregator,
events: HashMap<TypeId, Box<dyn Aggregate>>,
}
impl Segment {
@ -455,19 +435,8 @@ impl Segment {
},
msg = self.inbox.recv() => {
match msg {
Some(AnalyticsMsg::BatchMessage(msg)) => drop(self.batcher.push(msg).await),
Some(AnalyticsMsg::AggregateGetSearch(agreg)) => self.get_search_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregatePostSearch(agreg)) => self.post_search_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregatePostMultiSearch(agreg)) => self.post_multi_search_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregatePostFacetSearch(agreg)) => self.post_facet_search_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateDeleteDocuments(agreg)) => self.delete_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateEditDocumentsByFunction(agreg)) => self.edit_documents_by_function_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateGetFetchDocuments(agreg)) => self.get_fetch_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregatePostFetchDocuments(agreg)) => self.post_fetch_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateGetSimilar(agreg)) => self.get_similar_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregatePostSimilar(agreg)) => self.post_similar_aggregator.aggregate(agreg),
// Some(AnalyticsMsg::BatchMessage(msg)) => drop(self.batcher.push(msg).await),
Some(_) => todo!(),
None => (),
}
}
@ -507,87 +476,19 @@ impl Segment {
.await;
}
let Segment {
inbox: _,
opt: _,
batcher: _,
user,
get_search_aggregator,
post_search_aggregator,
post_multi_search_aggregator,
post_facet_search_aggregator,
add_documents_aggregator,
delete_documents_aggregator,
update_documents_aggregator,
edit_documents_by_function_aggregator,
get_fetch_documents_aggregator,
post_fetch_documents_aggregator,
get_similar_aggregator,
post_similar_aggregator,
} = self;
// We empty the list of events
let events = std::mem::take(&mut self.events);
if let Some(get_search) =
take(get_search_aggregator).into_event(user, "Documents Searched GET")
{
let _ = self.batcher.push(get_search).await;
}
if let Some(post_search) =
take(post_search_aggregator).into_event(user, "Documents Searched POST")
{
let _ = self.batcher.push(post_search).await;
}
if let Some(post_multi_search) = take(post_multi_search_aggregator)
.into_event(user, "Documents Searched by Multi-Search POST")
{
let _ = self.batcher.push(post_multi_search).await;
}
if let Some(post_facet_search) =
take(post_facet_search_aggregator).into_event(user, "Facet Searched POST")
{
let _ = self.batcher.push(post_facet_search).await;
}
if let Some(add_documents) =
take(add_documents_aggregator).into_event(user, "Documents Added")
{
let _ = self.batcher.push(add_documents).await;
}
if let Some(delete_documents) =
take(delete_documents_aggregator).into_event(user, "Documents Deleted")
{
let _ = self.batcher.push(delete_documents).await;
}
if let Some(update_documents) =
take(update_documents_aggregator).into_event(user, "Documents Updated")
{
let _ = self.batcher.push(update_documents).await;
}
if let Some(edit_documents_by_function) = take(edit_documents_by_function_aggregator)
.into_event(user, "Documents Edited By Function")
{
let _ = self.batcher.push(edit_documents_by_function).await;
}
if let Some(get_fetch_documents) =
take(get_fetch_documents_aggregator).into_event(user, "Documents Fetched GET")
{
let _ = self.batcher.push(get_fetch_documents).await;
}
if let Some(post_fetch_documents) =
take(post_fetch_documents_aggregator).into_event(user, "Documents Fetched POST")
{
let _ = self.batcher.push(post_fetch_documents).await;
for (_, mut event) in events {
self.batcher.push(Track {
user: self.user,
event: event.event_name().to_string(),
properties: event.into_event(),
timestamp: todo!(),
..Default::default()
});
}
if let Some(get_similar_documents) =
take(get_similar_aggregator).into_event(user, "Similar GET")
{
let _ = self.batcher.push(get_similar_documents).await;
}
if let Some(post_similar_documents) =
take(post_similar_aggregator).into_event(user, "Similar POST")
{
let _ = self.batcher.push(post_similar_documents).await;
}
let _ = self.batcher.flush().await;
}
}
@ -702,10 +603,8 @@ impl<Method: AggregateMethod> SearchAggregator<Method> {
} = query;
let mut ret = Self::default();
ret.timestamp = Some(OffsetDateTime::now_utc());
ret.total_received = 1;
ret.user_agents = extract_user_agents(request).into_iter().collect();
if let Some(ref sort) = sort {
ret.sort_total_number_of_criteria = 1;
@ -949,7 +848,7 @@ impl<Method: AggregateMethod> Aggregate for SearchAggregator<Method> {
self
}
fn into_event(self) -> Option<Track> {
fn into_event(self) -> impl Serialize {
let Self {
total_received,
total_succeeded,
@ -1087,10 +986,7 @@ pub struct MultiSearchAggregator {
}
impl MultiSearchAggregator {
pub fn from_federated_search(
federated_search: &FederatedSearch,
request: &HttpRequest,
) -> Self {
pub fn from_federated_search(federated_search: &FederatedSearch) -> Self {
let use_federation = federated_search.federation.is_some();
let distinct_indexes: HashSet<_> = federated_search
@ -1162,7 +1058,7 @@ impl Aggregate for MultiSearchAggregator {
}
/// Aggregate one [MultiSearchAggregator] into another.
fn aggregate(mut self, other: Self) -> Self {
fn aggregate(self, other: Self) -> Self {
// write the aggregate in a way that will cause a compilation error if a field is added.
// get ownership of self, replacing it by a default value.
@ -1177,13 +1073,8 @@ impl Aggregate for MultiSearchAggregator {
let show_ranking_score = this.show_ranking_score || other.show_ranking_score;
let show_ranking_score_details =
this.show_ranking_score_details || other.show_ranking_score_details;
let mut user_agents = this.user_agents;
let use_federation = this.use_federation || other.use_federation;
for user_agent in other.user_agents.into_iter() {
user_agents.insert(user_agent);
}
Self {
total_received,
total_succeeded,
@ -1748,7 +1639,7 @@ pub struct SimilarAggregator<Method: AggregateMethod> {
impl<Method: AggregateMethod> SimilarAggregator<Method> {
#[allow(clippy::field_reassign_with_default)]
pub fn from_query(query: &SimilarQuery, request: &HttpRequest) -> Self {
pub fn from_query(query: &SimilarQuery) -> Self {
let SimilarQuery {
id: _,
embedder: _,
@ -1763,10 +1654,8 @@ impl<Method: AggregateMethod> SimilarAggregator<Method> {
} = query;
let mut ret = Self::default();
ret.timestamp = Some(OffsetDateTime::now_utc());
ret.total_received = 1;
ret.user_agents = extract_user_agents(request).into_iter().collect();
if let Some(ref filter) = filter {
static RE: Lazy<Regex> = Lazy::new(|| Regex::new("AND | OR").unwrap());

View File

@ -7,7 +7,6 @@ use meilisearch_types::deserr::DeserrJsonError;
use meilisearch_types::error::ResponseError;
use meilisearch_types::keys::actions;
use serde::Serialize;
use serde_json::json;
use tracing::debug;
use crate::analytics::{Aggregate, Analytics};

View File

@ -32,7 +32,7 @@ use tokio::fs::File;
use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufWriter};
use tracing::debug;
use crate::analytics::{Aggregate, AggregateMethod, Analytics, DocumentDeletionKind};
use crate::analytics::{Aggregate, AggregateMethod, Analytics};
use crate::error::MeilisearchHttpError;
use crate::error::PayloadError::ReceivePayload;
use crate::extractors::authentication::policies::*;
@ -102,8 +102,13 @@ pub struct GetDocument {
retrieve_vectors: Param<bool>,
}
aggregate_methods!(
DocumentsGET => "Documents Fetched GET",
DocumentsPOST => "Documents Fetched POST",
);
#[derive(Default, Serialize)]
pub struct DocumentsFetchAggregator {
pub struct DocumentsFetchAggregator<Method: AggregateMethod> {
#[serde(rename = "requests.total_received")]
total_received: usize,
@ -120,6 +125,8 @@ pub struct DocumentsFetchAggregator {
max_limit: usize,
#[serde(rename = "pagination.max_offset")]
max_offset: usize,
marker: std::marker::PhantomData<Method>,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
@ -128,7 +135,7 @@ pub enum DocumentFetchKind {
Normal { with_filter: bool, limit: usize, offset: usize, retrieve_vectors: bool },
}
impl DocumentsFetchAggregator {
impl<Method: AggregateMethod> DocumentsFetchAggregator<Method> {
pub fn from_query(query: &DocumentFetchKind) -> Self {
let (limit, offset, retrieve_vectors) = match query {
DocumentFetchKind::PerDocumentId { retrieve_vectors } => (1, 0, *retrieve_vectors),
@ -136,6 +143,7 @@ impl DocumentsFetchAggregator {
(*limit, *offset, *retrieve_vectors)
}
};
Self {
total_received: 1,
per_document_id: matches!(query, DocumentFetchKind::PerDocumentId { .. }),
@ -143,20 +151,18 @@ impl DocumentsFetchAggregator {
max_limit: limit,
max_offset: offset,
retrieve_vectors,
marker: PhantomData,
}
}
}
impl Aggregate for DocumentsFetchAggregator {
// TODO: TAMO: Should we do the same event for the GET requests
impl<Method: AggregateMethod> Aggregate for DocumentsFetchAggregator<Method> {
fn event_name(&self) -> &'static str {
"Documents Fetched POST"
Method::event_name()
}
fn aggregate(self, other: Self) -> Self
where
Self: Sized,
{
fn aggregate(self, other: Self) -> Self {
Self {
total_received: self.total_received.saturating_add(other.total_received),
per_document_id: self.per_document_id | other.per_document_id,
@ -164,11 +170,12 @@ impl Aggregate for DocumentsFetchAggregator {
retrieve_vectors: self.retrieve_vectors | other.retrieve_vectors,
max_limit: self.max_limit.max(other.max_limit),
max_offset: self.max_offset.max(other.max_offset),
marker: PhantomData,
}
}
fn into_event(self) -> Value {
serde_json::to_value(self).unwrap()
fn into_event(self) -> impl Serialize {
self
}
}
@ -190,7 +197,7 @@ pub async fn get_document(
let retrieve_vectors = RetrieveVectors::new(param_retrieve_vectors.0, features)?;
analytics.publish(
DocumentsFetchAggregator {
DocumentsFetchAggregator::<DocumentsGET> {
retrieve_vectors: param_retrieve_vectors.0,
..Default::default()
},
@ -232,8 +239,8 @@ impl Aggregate for DocumentsDeletionAggregator {
}
}
fn into_event(self) -> Value {
serde_json::to_value(self).unwrap()
fn into_event(self) -> impl Serialize {
self
}
}
@ -311,7 +318,7 @@ pub async fn documents_by_query_post(
debug!(parameters = ?body, "Get documents POST");
analytics.publish(
DocumentsFetchAggregator {
DocumentsFetchAggregator::<DocumentsPOST> {
total_received: 1,
per_filter: body.filter.is_some(),
retrieve_vectors: body.retrieve_vectors,
@ -353,7 +360,7 @@ pub async fn get_documents(
};
analytics.publish(
DocumentsFetchAggregator {
DocumentsFetchAggregator::<DocumentsGET> {
total_received: 1,
per_filter: query.filter.is_some(),
retrieve_vectors: query.retrieve_vectors,
@ -436,20 +443,17 @@ impl<Method: AggregateMethod> Aggregate for DocumentsAggregator<Method> {
Method::event_name()
}
fn aggregate(mut self, other: Self) -> Self
where
Self: Sized,
{
fn aggregate(self, other: Self) -> Self {
Self {
payload_types: self.payload_types.union(&other.payload_types).collect(),
primary_key: self.primary_key.union(&other.primary_key).collect(),
payload_types: self.payload_types.union(&other.payload_types).cloned().collect(),
primary_key: self.primary_key.union(&other.primary_key).cloned().collect(),
index_creation: self.index_creation | other.index_creation,
method: PhantomData,
}
}
fn into_event(self) -> Value {
serde_json::to_value(self).unwrap()
fn into_event(self) -> impl Serialize {
self
}
}
@ -818,8 +822,8 @@ impl Aggregate for EditDocumentsByFunctionAggregator {
}
}
fn into_event(self) -> Value {
serde_json::to_value(self).unwrap()
fn into_event(self) -> impl Serialize {
self
}
}

View File

@ -9,6 +9,7 @@ use meilisearch_types::error::deserr_codes::*;
use meilisearch_types::error::ResponseError;
use meilisearch_types::index_uid::IndexUid;
use meilisearch_types::locales::Locale;
use serde::Serialize;
use serde_json::Value;
use tracing::debug;
@ -72,7 +73,7 @@ pub struct FacetSearchAggregator {
impl FacetSearchAggregator {
#[allow(clippy::field_reassign_with_default)]
pub fn from_query(query: &FacetSearchQuery, request: &HttpRequest) -> Self {
pub fn from_query(query: &FacetSearchQuery) -> Self {
let FacetSearchQuery {
facet_query: _,
facet_name,
@ -113,23 +114,22 @@ impl Aggregate for FacetSearchAggregator {
"Facet Searched POST"
}
fn aggregate(mut self, other: Self) -> Self
where
Self: Sized,
{
self.time_spent.insert(other.time_spent);
fn aggregate(mut self, other: Self) -> Self {
for time in other.time_spent {
self.time_spent.push(time);
}
Self {
total_received: self.total_received.saturating_add(other.total_received),
total_succeeded: self.total_succeeded.saturating_add(other.total_succeeded),
time_spent: self.time_spent,
facet_names: self.facet_names.union(&other.facet_names).collect(),
facet_names: self.facet_names.union(&other.facet_names).cloned().collect(),
additional_search_parameters_provided: self.additional_search_parameters_provided
| other.additional_search_parameters_provided,
}
}
fn into_event(self) -> Value {
fn into_event(self) -> impl Serialize {
let Self {
total_received,
total_succeeded,
@ -137,6 +137,12 @@ impl Aggregate for FacetSearchAggregator {
facet_names,
additional_search_parameters_provided,
} = self;
// the index of the 99th percentage of value
let percentile_99th = 0.99 * (total_succeeded as f64 - 1.) + 1.;
// we get all the values in a sorted manner
let time_spent = time_spent.into_sorted_vec();
// We are only interested by the slowest value of the 99th fastest results
let time_spent = time_spent.get(percentile_99th as usize);
serde_json::json!({
"requests": {
@ -166,7 +172,7 @@ pub async fn search(
let query = params.into_inner();
debug!(parameters = ?query, "Facet search");
let mut aggregate = FacetSearchAggregator::from_query(&query, &req);
let mut aggregate = FacetSearchAggregator::from_query(&query);
let facet_query = query.facet_query.clone();
let facet_name = query.facet_name.clone();

View File

@ -14,7 +14,6 @@ use meilisearch_types::index_uid::IndexUid;
use meilisearch_types::milli::{self, FieldDistribution, Index};
use meilisearch_types::tasks::KindWithContent;
use serde::Serialize;
use serde_json::json;
use time::OffsetDateTime;
use tracing::debug;
@ -138,7 +137,7 @@ impl Aggregate for IndexCreatedAggregate {
where
Self: Sized,
{
Self { primary_key: self.primary_key.union(&other.primary_key).collect() }
Self { primary_key: self.primary_key.union(&other.primary_key).cloned().collect() }
}
fn into_event(self) -> impl Serialize {
@ -227,7 +226,7 @@ impl Aggregate for IndexUpdatedAggregate {
}
fn aggregate(self, other: Self) -> Self {
Self { primary_key: self.primary_key.union(&other.primary_key).collect() }
Self { primary_key: self.primary_key.union(&other.primary_key).cloned().collect() }
}
fn into_event(self) -> impl Serialize {

View File

@ -41,7 +41,7 @@ pub async fn similar_get(
let query = params.0.try_into()?;
let mut aggregate = SimilarAggregator::<SimilarGET>::from_query(&query, &req);
let mut aggregate = SimilarAggregator::<SimilarGET>::from_query(&query);
debug!(parameters = ?query, "Similar get");
@ -70,7 +70,7 @@ pub async fn similar_post(
let query = params.into_inner();
debug!(parameters = ?query, "Similar post");
let mut aggregate = SimilarAggregator::<SimilarPOST>::from_query(&query, &req);
let mut aggregate = SimilarAggregator::<SimilarPOST>::from_query(&query);
let similar = similar(index_scheduler, index_uid, query).await;

View File

@ -43,7 +43,7 @@ pub async fn multi_search_with_post(
let federated_search = params.into_inner();
let mut multi_aggregate = MultiSearchAggregator::from_federated_search(&federated_search, &req);
let mut multi_aggregate = MultiSearchAggregator::from_federated_search(&federated_search);
let FederatedSearch { mut queries, federation } = federated_search;

View File

@ -9,7 +9,6 @@ use meilisearch_types::error::ResponseError;
use meilisearch_types::index_uid::IndexUid;
use meilisearch_types::tasks::{IndexSwap, KindWithContent};
use serde::Serialize;
use serde_json::json;
use super::{get_task_id, is_dry_run, SummarizedTaskView};
use crate::analytics::{Aggregate, Analytics};

View File

@ -180,7 +180,7 @@ struct TaskFilterAnalytics<Method: AggregateMethod> {
marker: std::marker::PhantomData<Method>,
}
impl<Method: AggregateMethod> Aggregate for TaskFilterAnalytics<Method> {
impl<Method: AggregateMethod + 'static> Aggregate for TaskFilterAnalytics<Method> {
fn event_name(&self) -> &'static str {
Method::event_name()
}