diff --git a/Cargo.lock b/Cargo.lock index c85a59952..733470384 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3415,6 +3415,7 @@ dependencies = [ "meilisearch-types", "mimalloc", "mime", + "mopa", "num_cpus", "obkv", "once_cell", @@ -3681,6 +3682,12 @@ dependencies = [ "syn 2.0.60", ] +[[package]] +name = "mopa" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a785740271256c230f57462d3b83e52f998433a7062fc18f96d5999474a9f915" + [[package]] name = "mutually_exclusive_features" version = "0.0.3" diff --git a/meilisearch/Cargo.toml b/meilisearch/Cargo.toml index 6c2fb4060..322b333ac 100644 --- a/meilisearch/Cargo.toml +++ b/meilisearch/Cargo.toml @@ -104,6 +104,7 @@ tracing-trace = { version = "0.1.0", path = "../tracing-trace" } tracing-actix-web = "0.7.11" build-info = { version = "1.7.0", path = "../build-info" } roaring = "0.10.2" +mopa = "0.2.2" [dev-dependencies] actix-rt = "2.10.0" diff --git a/meilisearch/src/analytics/mod.rs b/meilisearch/src/analytics/mod.rs index ab6fd9993..8a0a68bad 100644 --- a/meilisearch/src/analytics/mod.rs +++ b/meilisearch/src/analytics/mod.rs @@ -6,9 +6,9 @@ use std::str::FromStr; use actix_web::HttpRequest; use meilisearch_types::InstanceUid; +use mopa::mopafy; use once_cell::sync::Lazy; use platform_dirs::AppDirs; -use serde::Serialize; // if the feature analytics is enabled we use the real analytics pub type SegmentAnalytics = segment_analytics::SegmentAnalytics; @@ -31,11 +31,11 @@ macro_rules! empty_analytics { $event_name } - fn aggregate(self, _other: Self) -> Self { + fn aggregate(self: Box, _other: Box) -> Box { self } - fn into_event(self) -> impl serde::Serialize { + fn into_event(self: Box) -> serde_json::Value { serde_json::json!({}) } } @@ -80,18 +80,34 @@ pub enum DocumentFetchKind { Normal { with_filter: bool, limit: usize, offset: usize, retrieve_vectors: bool }, } -pub trait Aggregate: 'static { +pub trait Aggregate: 'static + mopa::Any + Send { fn event_name(&self) -> &'static str; - fn aggregate(self, other: Self) -> Self + fn aggregate(self: Box, other: Box) -> Box where Self: Sized; - fn into_event(self) -> impl Serialize + fn downcast_aggregate( + this: Box, + other: Box, + ) -> Option> where - Self: Sized; + Self: Sized, + { + if this.is::() && other.is::() { + let this = this.downcast::().ok()?; + let other = other.downcast::().ok()?; + Some(Self::aggregate(this, other)) + } else { + None + } + } + + fn into_event(self: Box) -> serde_json::Value; } +mopafy!(Aggregate); + /// Helper trait to define multiple aggregate with the same content but a different name. /// Commonly used when you must aggregate a search with POST or with GET for example. pub trait AggregateMethod: 'static + Default { @@ -137,9 +153,9 @@ impl Analytics { } /// The method used to publish most analytics that do not need to be batched every hours - pub fn publish(&self, event: impl Aggregate, request: &HttpRequest) { + pub fn publish(&self, event: T, request: &HttpRequest) { let Some(ref segment) = self.segment else { return }; let user_agents = extract_user_agents(request); - let _ = segment.sender.try_send(Box::new(event)); + let _ = segment.sender.try_send(segment_analytics::Message::new(event)); } } diff --git a/meilisearch/src/analytics/segment_analytics.rs b/meilisearch/src/analytics/segment_analytics.rs index 601fefa1e..1a1bb9226 100644 --- a/meilisearch/src/analytics/segment_analytics.rs +++ b/meilisearch/src/analytics/segment_analytics.rs @@ -1,7 +1,6 @@ -use std::any::{Any, TypeId}; +use std::any::TypeId; use std::collections::{BTreeSet, BinaryHeap, HashMap, HashSet}; use std::fs; -use std::mem::take; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -72,10 +71,26 @@ pub fn extract_user_agents(request: &HttpRequest) -> Vec { .collect() } +pub struct Message { + type_id: TypeId, + event: Box, + aggregator_function: fn(Box, Box) -> Option>, +} + +impl Message { + pub fn new(event: T) -> Self { + Self { + type_id: TypeId::of::(), + event: Box::new(event), + aggregator_function: T::downcast_aggregate, + } + } +} + pub struct SegmentAnalytics { pub instance_uid: InstanceUid, pub user: User, - pub sender: Sender>, + pub sender: Sender, } impl SegmentAnalytics { @@ -378,7 +393,7 @@ impl From for Infos { } pub struct Segment { - inbox: Receiver>, + inbox: Receiver, user: User, opt: Opt, batcher: AutoBatcher, @@ -435,8 +450,13 @@ impl Segment { }, msg = self.inbox.recv() => { match msg { - // Some(AnalyticsMsg::BatchMessage(msg)) => drop(self.batcher.push(msg).await), - Some(_) => todo!(), + Some(Message { type_id, event, aggregator_function }) => { + let new_event = match self.events.remove(&type_id) { + Some(old) => (aggregator_function)(old, event).unwrap(), + None => event, + }; + self.events.insert(type_id, new_event); + }, None => (), } } @@ -479,9 +499,9 @@ impl Segment { // We empty the list of events let events = std::mem::take(&mut self.events); - for (_, mut event) in events { + for (_, event) in events { self.batcher.push(Track { - user: self.user, + user: self.user.clone(), event: event.event_name().to_string(), properties: event.into_event(), timestamp: todo!(), @@ -722,11 +742,11 @@ impl Aggregate for SearchAggregator { Method::event_name() } - fn aggregate(mut self, mut other: Self) -> Self { + fn aggregate(mut self: Box, other: Box) -> Box { let Self { total_received, total_succeeded, - ref mut time_spent, + mut time_spent, sort_with_geo_point, sort_sum_of_criteria_terms, sort_total_number_of_criteria, @@ -761,9 +781,9 @@ impl Aggregate for SearchAggregator { total_degraded, total_used_negative_operator, ranking_score_threshold, - ref mut locales, + mut locales, marker: _, - } = other; + } = *other; // request self.total_received = self.total_received.saturating_add(total_received); @@ -771,7 +791,7 @@ impl Aggregate for SearchAggregator { self.total_degraded = self.total_degraded.saturating_add(total_degraded); self.total_used_negative_operator = self.total_used_negative_operator.saturating_add(total_used_negative_operator); - self.time_spent.append(time_spent); + self.time_spent.append(&mut time_spent); // sort self.sort_with_geo_point |= sort_with_geo_point; @@ -843,12 +863,12 @@ impl Aggregate for SearchAggregator { self.ranking_score_threshold |= ranking_score_threshold; // locales - self.locales.append(locales); + self.locales.append(&mut locales); self } - fn into_event(self) -> impl Serialize { + fn into_event(self: Box) -> serde_json::Value { let Self { total_received, total_succeeded, @@ -889,7 +909,7 @@ impl Aggregate for SearchAggregator { ranking_score_threshold, locales, marker: _, - } = self; + } = *self; // we get all the values in a sorted manner let time_spent = time_spent.into_sorted_vec(); @@ -1058,11 +1078,11 @@ impl Aggregate for MultiSearchAggregator { } /// Aggregate one [MultiSearchAggregator] into another. - fn aggregate(self, other: Self) -> Self { + fn aggregate(self: Box, other: Box) -> Box { // write the aggregate in a way that will cause a compilation error if a field is added. // get ownership of self, replacing it by a default value. - let this = self; + let this = *self; let total_received = this.total_received.saturating_add(other.total_received); let total_succeeded = this.total_succeeded.saturating_add(other.total_succeeded); @@ -1075,7 +1095,7 @@ impl Aggregate for MultiSearchAggregator { this.show_ranking_score_details || other.show_ranking_score_details; let use_federation = this.use_federation || other.use_federation; - Self { + Box::new(Self { total_received, total_succeeded, total_distinct_index_count, @@ -1084,10 +1104,10 @@ impl Aggregate for MultiSearchAggregator { show_ranking_score, show_ranking_score_details, use_federation, - } + }) } - fn into_event(self) -> impl Serialize { + fn into_event(self: Box) -> serde_json::Value { let Self { total_received, total_succeeded, @@ -1097,7 +1117,7 @@ impl Aggregate for MultiSearchAggregator { show_ranking_score, show_ranking_score_details, use_federation, - } = self; + } = *self; json!({ "requests": { @@ -1708,11 +1728,11 @@ impl Aggregate for SimilarAggregator { } /// Aggregate one [SimilarAggregator] into another. - fn aggregate(mut self, mut other: Self) -> Self { + fn aggregate(mut self: Box, other: Box) -> Box { let Self { total_received, total_succeeded, - ref mut time_spent, + mut time_spent, filter_with_geo_radius, filter_with_geo_bounding_box, filter_sum_of_criteria_terms, @@ -1726,12 +1746,12 @@ impl Aggregate for SimilarAggregator { ranking_score_threshold, retrieve_vectors, marker: _, - } = other; + } = *other; // request self.total_received = self.total_received.saturating_add(total_received); self.total_succeeded = self.total_succeeded.saturating_add(total_succeeded); - self.time_spent.append(time_spent); + self.time_spent.append(&mut time_spent); // filter self.filter_with_geo_radius |= filter_with_geo_radius; @@ -1763,7 +1783,7 @@ impl Aggregate for SimilarAggregator { self } - fn into_event(self) -> impl Serialize { + fn into_event(self: Box) -> serde_json::Value { let Self { total_received, total_succeeded, @@ -1781,7 +1801,7 @@ impl Aggregate for SimilarAggregator { ranking_score_threshold, retrieve_vectors, marker: _, - } = self; + } = *self; // we get all the values in a sorted manner let time_spent = time_spent.into_sorted_vec();