fix the analytics

This commit is contained in:
Tamo 2024-10-17 00:38:18 +02:00
parent ea6883189e
commit 6728cfbfac
4 changed files with 81 additions and 37 deletions

7
Cargo.lock generated
View File

@ -3415,6 +3415,7 @@ dependencies = [
"meilisearch-types", "meilisearch-types",
"mimalloc", "mimalloc",
"mime", "mime",
"mopa",
"num_cpus", "num_cpus",
"obkv", "obkv",
"once_cell", "once_cell",
@ -3681,6 +3682,12 @@ dependencies = [
"syn 2.0.60", "syn 2.0.60",
] ]
[[package]]
name = "mopa"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a785740271256c230f57462d3b83e52f998433a7062fc18f96d5999474a9f915"
[[package]] [[package]]
name = "mutually_exclusive_features" name = "mutually_exclusive_features"
version = "0.0.3" version = "0.0.3"

View File

@ -104,6 +104,7 @@ tracing-trace = { version = "0.1.0", path = "../tracing-trace" }
tracing-actix-web = "0.7.11" tracing-actix-web = "0.7.11"
build-info = { version = "1.7.0", path = "../build-info" } build-info = { version = "1.7.0", path = "../build-info" }
roaring = "0.10.2" roaring = "0.10.2"
mopa = "0.2.2"
[dev-dependencies] [dev-dependencies]
actix-rt = "2.10.0" actix-rt = "2.10.0"

View File

@ -6,9 +6,9 @@ use std::str::FromStr;
use actix_web::HttpRequest; use actix_web::HttpRequest;
use meilisearch_types::InstanceUid; use meilisearch_types::InstanceUid;
use mopa::mopafy;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use platform_dirs::AppDirs; use platform_dirs::AppDirs;
use serde::Serialize;
// if the feature analytics is enabled we use the real analytics // if the feature analytics is enabled we use the real analytics
pub type SegmentAnalytics = segment_analytics::SegmentAnalytics; pub type SegmentAnalytics = segment_analytics::SegmentAnalytics;
@ -31,11 +31,11 @@ macro_rules! empty_analytics {
$event_name $event_name
} }
fn aggregate(self, _other: Self) -> Self { fn aggregate(self: Box<Self>, _other: Box<Self>) -> Box<Self> {
self self
} }
fn into_event(self) -> impl serde::Serialize { fn into_event(self: Box<Self>) -> serde_json::Value {
serde_json::json!({}) serde_json::json!({})
} }
} }
@ -80,17 +80,33 @@ pub enum DocumentFetchKind {
Normal { with_filter: bool, limit: usize, offset: usize, retrieve_vectors: bool }, Normal { with_filter: bool, limit: usize, offset: usize, retrieve_vectors: bool },
} }
pub trait Aggregate: 'static { pub trait Aggregate: 'static + mopa::Any + Send {
fn event_name(&self) -> &'static str; fn event_name(&self) -> &'static str;
fn aggregate(self, other: Self) -> Self fn aggregate(self: Box<Self>, other: Box<Self>) -> Box<Self>
where where
Self: Sized; Self: Sized;
fn into_event(self) -> impl Serialize fn downcast_aggregate(
this: Box<dyn Aggregate>,
other: Box<dyn Aggregate>,
) -> Option<Box<dyn Aggregate>>
where where
Self: Sized; Self: Sized,
{
if this.is::<Self>() && other.is::<Self>() {
let this = this.downcast::<Self>().ok()?;
let other = other.downcast::<Self>().ok()?;
Some(Self::aggregate(this, other))
} else {
None
} }
}
fn into_event(self: Box<Self>) -> serde_json::Value;
}
mopafy!(Aggregate);
/// Helper trait to define multiple aggregate with the same content but a different name. /// Helper trait to define multiple aggregate with the same content but a different name.
/// Commonly used when you must aggregate a search with POST or with GET for example. /// Commonly used when you must aggregate a search with POST or with GET for example.
@ -137,9 +153,9 @@ impl Analytics {
} }
/// The method used to publish most analytics that do not need to be batched every hours /// The method used to publish most analytics that do not need to be batched every hours
pub fn publish(&self, event: impl Aggregate, request: &HttpRequest) { pub fn publish<T: Aggregate>(&self, event: T, request: &HttpRequest) {
let Some(ref segment) = self.segment else { return }; let Some(ref segment) = self.segment else { return };
let user_agents = extract_user_agents(request); let user_agents = extract_user_agents(request);
let _ = segment.sender.try_send(Box::new(event)); let _ = segment.sender.try_send(segment_analytics::Message::new(event));
} }
} }

View File

@ -1,7 +1,6 @@
use std::any::{Any, TypeId}; use std::any::TypeId;
use std::collections::{BTreeSet, BinaryHeap, HashMap, HashSet}; use std::collections::{BTreeSet, BinaryHeap, HashMap, HashSet};
use std::fs; use std::fs;
use std::mem::take;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -72,10 +71,26 @@ pub fn extract_user_agents(request: &HttpRequest) -> Vec<String> {
.collect() .collect()
} }
pub struct Message {
type_id: TypeId,
event: Box<dyn Aggregate>,
aggregator_function: fn(Box<dyn Aggregate>, Box<dyn Aggregate>) -> Option<Box<dyn Aggregate>>,
}
impl Message {
pub fn new<T: Aggregate>(event: T) -> Self {
Self {
type_id: TypeId::of::<T>(),
event: Box::new(event),
aggregator_function: T::downcast_aggregate,
}
}
}
pub struct SegmentAnalytics { pub struct SegmentAnalytics {
pub instance_uid: InstanceUid, pub instance_uid: InstanceUid,
pub user: User, pub user: User,
pub sender: Sender<Box<dyn Aggregate>>, pub sender: Sender<Message>,
} }
impl SegmentAnalytics { impl SegmentAnalytics {
@ -378,7 +393,7 @@ impl From<Opt> for Infos {
} }
pub struct Segment { pub struct Segment {
inbox: Receiver<Box<dyn Aggregate>>, inbox: Receiver<Message>,
user: User, user: User,
opt: Opt, opt: Opt,
batcher: AutoBatcher, batcher: AutoBatcher,
@ -435,8 +450,13 @@ impl Segment {
}, },
msg = self.inbox.recv() => { msg = self.inbox.recv() => {
match msg { match msg {
// Some(AnalyticsMsg::BatchMessage(msg)) => drop(self.batcher.push(msg).await), Some(Message { type_id, event, aggregator_function }) => {
Some(_) => todo!(), let new_event = match self.events.remove(&type_id) {
Some(old) => (aggregator_function)(old, event).unwrap(),
None => event,
};
self.events.insert(type_id, new_event);
},
None => (), None => (),
} }
} }
@ -479,9 +499,9 @@ impl Segment {
// We empty the list of events // We empty the list of events
let events = std::mem::take(&mut self.events); let events = std::mem::take(&mut self.events);
for (_, mut event) in events { for (_, event) in events {
self.batcher.push(Track { self.batcher.push(Track {
user: self.user, user: self.user.clone(),
event: event.event_name().to_string(), event: event.event_name().to_string(),
properties: event.into_event(), properties: event.into_event(),
timestamp: todo!(), timestamp: todo!(),
@ -722,11 +742,11 @@ impl<Method: AggregateMethod> Aggregate for SearchAggregator<Method> {
Method::event_name() Method::event_name()
} }
fn aggregate(mut self, mut other: Self) -> Self { fn aggregate(mut self: Box<Self>, other: Box<Self>) -> Box<Self> {
let Self { let Self {
total_received, total_received,
total_succeeded, total_succeeded,
ref mut time_spent, mut time_spent,
sort_with_geo_point, sort_with_geo_point,
sort_sum_of_criteria_terms, sort_sum_of_criteria_terms,
sort_total_number_of_criteria, sort_total_number_of_criteria,
@ -761,9 +781,9 @@ impl<Method: AggregateMethod> Aggregate for SearchAggregator<Method> {
total_degraded, total_degraded,
total_used_negative_operator, total_used_negative_operator,
ranking_score_threshold, ranking_score_threshold,
ref mut locales, mut locales,
marker: _, marker: _,
} = other; } = *other;
// request // request
self.total_received = self.total_received.saturating_add(total_received); self.total_received = self.total_received.saturating_add(total_received);
@ -771,7 +791,7 @@ impl<Method: AggregateMethod> Aggregate for SearchAggregator<Method> {
self.total_degraded = self.total_degraded.saturating_add(total_degraded); self.total_degraded = self.total_degraded.saturating_add(total_degraded);
self.total_used_negative_operator = self.total_used_negative_operator =
self.total_used_negative_operator.saturating_add(total_used_negative_operator); self.total_used_negative_operator.saturating_add(total_used_negative_operator);
self.time_spent.append(time_spent); self.time_spent.append(&mut time_spent);
// sort // sort
self.sort_with_geo_point |= sort_with_geo_point; self.sort_with_geo_point |= sort_with_geo_point;
@ -843,12 +863,12 @@ impl<Method: AggregateMethod> Aggregate for SearchAggregator<Method> {
self.ranking_score_threshold |= ranking_score_threshold; self.ranking_score_threshold |= ranking_score_threshold;
// locales // locales
self.locales.append(locales); self.locales.append(&mut locales);
self self
} }
fn into_event(self) -> impl Serialize { fn into_event(self: Box<Self>) -> serde_json::Value {
let Self { let Self {
total_received, total_received,
total_succeeded, total_succeeded,
@ -889,7 +909,7 @@ impl<Method: AggregateMethod> Aggregate for SearchAggregator<Method> {
ranking_score_threshold, ranking_score_threshold,
locales, locales,
marker: _, marker: _,
} = self; } = *self;
// we get all the values in a sorted manner // we get all the values in a sorted manner
let time_spent = time_spent.into_sorted_vec(); let time_spent = time_spent.into_sorted_vec();
@ -1058,11 +1078,11 @@ impl Aggregate for MultiSearchAggregator {
} }
/// Aggregate one [MultiSearchAggregator] into another. /// Aggregate one [MultiSearchAggregator] into another.
fn aggregate(self, other: Self) -> Self { fn aggregate(self: Box<Self>, other: Box<Self>) -> Box<Self> {
// write the aggregate in a way that will cause a compilation error if a field is added. // write the aggregate in a way that will cause a compilation error if a field is added.
// get ownership of self, replacing it by a default value. // get ownership of self, replacing it by a default value.
let this = self; let this = *self;
let total_received = this.total_received.saturating_add(other.total_received); let total_received = this.total_received.saturating_add(other.total_received);
let total_succeeded = this.total_succeeded.saturating_add(other.total_succeeded); let total_succeeded = this.total_succeeded.saturating_add(other.total_succeeded);
@ -1075,7 +1095,7 @@ impl Aggregate for MultiSearchAggregator {
this.show_ranking_score_details || other.show_ranking_score_details; this.show_ranking_score_details || other.show_ranking_score_details;
let use_federation = this.use_federation || other.use_federation; let use_federation = this.use_federation || other.use_federation;
Self { Box::new(Self {
total_received, total_received,
total_succeeded, total_succeeded,
total_distinct_index_count, total_distinct_index_count,
@ -1084,10 +1104,10 @@ impl Aggregate for MultiSearchAggregator {
show_ranking_score, show_ranking_score,
show_ranking_score_details, show_ranking_score_details,
use_federation, use_federation,
} })
} }
fn into_event(self) -> impl Serialize { fn into_event(self: Box<Self>) -> serde_json::Value {
let Self { let Self {
total_received, total_received,
total_succeeded, total_succeeded,
@ -1097,7 +1117,7 @@ impl Aggregate for MultiSearchAggregator {
show_ranking_score, show_ranking_score,
show_ranking_score_details, show_ranking_score_details,
use_federation, use_federation,
} = self; } = *self;
json!({ json!({
"requests": { "requests": {
@ -1708,11 +1728,11 @@ impl<Method: AggregateMethod> Aggregate for SimilarAggregator<Method> {
} }
/// Aggregate one [SimilarAggregator] into another. /// Aggregate one [SimilarAggregator] into another.
fn aggregate(mut self, mut other: Self) -> Self { fn aggregate(mut self: Box<Self>, other: Box<Self>) -> Box<Self> {
let Self { let Self {
total_received, total_received,
total_succeeded, total_succeeded,
ref mut time_spent, mut time_spent,
filter_with_geo_radius, filter_with_geo_radius,
filter_with_geo_bounding_box, filter_with_geo_bounding_box,
filter_sum_of_criteria_terms, filter_sum_of_criteria_terms,
@ -1726,12 +1746,12 @@ impl<Method: AggregateMethod> Aggregate for SimilarAggregator<Method> {
ranking_score_threshold, ranking_score_threshold,
retrieve_vectors, retrieve_vectors,
marker: _, marker: _,
} = other; } = *other;
// request // request
self.total_received = self.total_received.saturating_add(total_received); self.total_received = self.total_received.saturating_add(total_received);
self.total_succeeded = self.total_succeeded.saturating_add(total_succeeded); self.total_succeeded = self.total_succeeded.saturating_add(total_succeeded);
self.time_spent.append(time_spent); self.time_spent.append(&mut time_spent);
// filter // filter
self.filter_with_geo_radius |= filter_with_geo_radius; self.filter_with_geo_radius |= filter_with_geo_radius;
@ -1763,7 +1783,7 @@ impl<Method: AggregateMethod> Aggregate for SimilarAggregator<Method> {
self self
} }
fn into_event(self) -> impl Serialize { fn into_event(self: Box<Self>) -> serde_json::Value {
let Self { let Self {
total_received, total_received,
total_succeeded, total_succeeded,
@ -1781,7 +1801,7 @@ impl<Method: AggregateMethod> Aggregate for SimilarAggregator<Method> {
ranking_score_threshold, ranking_score_threshold,
retrieve_vectors, retrieve_vectors,
marker: _, marker: _,
} = self; } = *self;
// we get all the values in a sorted manner // we get all the values in a sorted manner
let time_spent = time_spent.into_sorted_vec(); let time_spent = time_spent.into_sorted_vec();