pass anaytics into Arc instead of static ref

This commit is contained in:
marin postma 2021-10-29 15:58:06 +02:00
parent 66d87761b7
commit d65f055030
No known key found for this signature in database
GPG Key ID: 6088B7721C3E39F9
5 changed files with 57 additions and 69 deletions

View File

@ -1,4 +1,4 @@
use std::{any::Any, fmt::Display};
use std::{any::Any, sync::Arc};
use actix_web::HttpRequest;
use serde_json::Value;
@ -7,9 +7,7 @@ use crate::{routes::indexes::documents::UpdateDocumentsQuery, Opt};
use super::{find_user_id, Analytics};
pub struct MockAnalytics {
user: String,
}
pub struct MockAnalytics;
#[derive(Default)]
pub struct SearchAggregator {}
@ -24,36 +22,29 @@ impl SearchAggregator {
}
impl MockAnalytics {
pub fn new(opt: &Opt) -> &'static Self {
pub fn new(opt: &Opt) -> (Arc<dyn Analytics>, String) {
let user = find_user_id(&opt.db_path).unwrap_or_default();
let analytics = Box::new(Self { user });
Box::leak(analytics)
(Arc::new(Self), user)
}
}
impl Analytics for MockAnalytics {
// These methods are noop and should be optimized out
fn publish(&'static self, _event_name: String, _send: Value, _request: Option<&HttpRequest>) {}
fn get_search(&'static self, _aggregate: super::SearchAggregator) {}
fn post_search(&'static self, _aggregate: super::SearchAggregator) {}
fn publish(&self, _event_name: String, _send: Value, _request: Option<&HttpRequest>) {}
fn get_search(&self, _aggregate: super::SearchAggregator) {}
fn post_search(&self, _aggregate: super::SearchAggregator) {}
fn add_documents(
&'static self,
&self,
_documents_query: &UpdateDocumentsQuery,
_index_creation: bool,
_request: &HttpRequest,
) {
}
fn update_documents(
&'static self,
&self,
_documents_query: &UpdateDocumentsQuery,
_index_creation: bool,
_request: &HttpRequest,
) {
}
}
impl Display for MockAnalytics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.user)
}
}

View File

@ -3,7 +3,6 @@ mod mock_analytics;
#[cfg(all(not(debug_assertions), feature = "analytics"))]
mod segment_analytics;
use std::fmt::Display;
use std::fs;
use std::path::{Path, PathBuf};
@ -58,26 +57,26 @@ fn find_user_id(db_path: &Path) -> Option<String> {
.or_else(|| fs::read_to_string(&config_user_id_path(db_path)?).ok())
}
pub trait Analytics: Display + Sync + Send {
pub trait Analytics: Sync + Send {
/// The method used to publish most analytics that do not need to be batched every hours
fn publish(&'static self, event_name: String, send: Value, request: Option<&HttpRequest>);
fn publish(&self, event_name: String, send: Value, request: Option<&HttpRequest>);
/// This method should be called to aggergate a get search
fn get_search(&'static self, aggregate: SearchAggregator);
fn get_search(&self, aggregate: SearchAggregator);
/// This method should be called to aggregate a post search
fn post_search(&'static self, aggregate: SearchAggregator);
fn post_search(&self, aggregate: SearchAggregator);
// this method should be called to aggregate a add documents request
fn add_documents(
&'static self,
&self,
documents_query: &UpdateDocumentsQuery,
index_creation: bool,
request: &HttpRequest,
);
// this method should be called to batch a update documents request
fn update_documents(
&'static self,
&self,
documents_query: &UpdateDocumentsQuery,
index_creation: bool,
request: &HttpRequest,

View File

@ -1,7 +1,7 @@
use std::collections::{HashMap, HashSet};
use std::fmt::Display;
use std::fs;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, Instant};
use actix_web::http::header::USER_AGENT;
@ -53,7 +53,7 @@ pub fn extract_user_agents(request: &HttpRequest) -> Vec<String> {
.collect()
}
pub enum Message {
pub enum AnalyticsMsg {
BatchMessage(Track),
AggregateGetSearch(SearchAggregator),
AggregatePostSearch(SearchAggregator),
@ -62,12 +62,12 @@ pub enum Message {
}
pub struct SegmentAnalytics {
sender: Sender<AnalyticsMsg>,
user: User,
sender: Sender<Message>,
}
impl SegmentAnalytics {
pub async fn new(opt: &Opt, meilisearch: &MeiliSearch) -> &'static Self {
pub async fn new(opt: &Opt, meilisearch: &MeiliSearch) -> (Arc<dyn Analytics>, String) {
let user_id = super::find_user_id(&opt.db_path);
let first_time_run = user_id.is_none();
let user_id = user_id.unwrap_or_else(|| Uuid::new_v4().to_string());
@ -91,25 +91,21 @@ impl SegmentAnalytics {
});
tokio::spawn(segment.run(meilisearch.clone()));
let ret = Box::new(Self { user, sender });
let ret = Box::leak(ret);
let this = Self {
sender,
user: user.clone(),
};
// batch the launched for the first time track event
if first_time_run {
ret.publish("Launched".to_string(), json!({}), None);
this.publish("Launched".to_string(), json!({}), None);
}
ret
}
}
impl Display for SegmentAnalytics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.user)
(Arc::new(this), user.to_string())
}
}
impl super::Analytics for SegmentAnalytics {
fn publish(&'static self, event_name: String, mut send: Value, request: Option<&HttpRequest>) {
fn publish(&self, event_name: String, mut send: Value, request: Option<&HttpRequest>) {
let user_agent = request
.map(|req| req.headers().get(USER_AGENT))
.flatten()
@ -123,20 +119,21 @@ impl super::Analytics for SegmentAnalytics {
properties: send,
..Default::default()
};
let _ = self.sender.try_send(Message::BatchMessage(event.into()));
}
fn get_search(&'static self, aggregate: SearchAggregator) {
let _ = self.sender.try_send(Message::AggregateGetSearch(aggregate));
let _ = self.sender.try_send(AnalyticsMsg::BatchMessage(event.into()));
}
fn post_search(&'static self, aggregate: SearchAggregator) {
fn get_search(&self, aggregate: SearchAggregator) {
let _ = self.sender.try_send(AnalyticsMsg::AggregateGetSearch(aggregate));
}
fn post_search(&self, aggregate: SearchAggregator) {
let _ = self
.sender
.try_send(Message::AggregatePostSearch(aggregate));
.try_send(AnalyticsMsg::AggregatePostSearch(aggregate));
}
fn add_documents(
&'static self,
&self,
documents_query: &UpdateDocumentsQuery,
index_creation: bool,
request: &HttpRequest,
@ -144,11 +141,11 @@ impl super::Analytics for SegmentAnalytics {
let aggregate = DocumentsAggregator::from_query(documents_query, index_creation, request);
let _ = self
.sender
.try_send(Message::AggregateAddDocuments(aggregate));
.try_send(AnalyticsMsg::AggregateAddDocuments(aggregate));
}
fn update_documents(
&'static self,
&self,
documents_query: &UpdateDocumentsQuery,
index_creation: bool,
request: &HttpRequest,
@ -156,12 +153,12 @@ impl super::Analytics for SegmentAnalytics {
let aggregate = DocumentsAggregator::from_query(documents_query, index_creation, request);
let _ = self
.sender
.try_send(Message::AggregateUpdateDocuments(aggregate));
.try_send(AnalyticsMsg::AggregateUpdateDocuments(aggregate));
}
}
pub struct Segment {
inbox: Receiver<Message>,
inbox: Receiver<AnalyticsMsg>,
user: User,
opt: Opt,
batcher: AutoBatcher,
@ -224,11 +221,11 @@ impl Segment {
},
msg = self.inbox.recv() => {
match msg {
Some(Message::BatchMessage(msg)) => drop(self.batcher.push(msg).await),
Some(Message::AggregateGetSearch(agreg)) => self.get_search_aggregator.aggregate(agreg),
Some(Message::AggregatePostSearch(agreg)) => self.post_search_aggregator.aggregate(agreg),
Some(Message::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg),
Some(Message::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::BatchMessage(msg)) => drop(self.batcher.push(msg).await),
Some(AnalyticsMsg::AggregateGetSearch(agreg)) => self.get_search_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregatePostSearch(agreg)) => self.post_search_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg),
None => (),
}
}

View File

@ -7,6 +7,7 @@ pub mod analytics;
pub mod helpers;
pub mod option;
pub mod routes;
use std::sync::Arc;
use std::time::Duration;
use crate::error::MeilisearchHttpError;
@ -78,12 +79,12 @@ pub fn configure_data(
config: &mut web::ServiceConfig,
data: MeiliSearch,
opt: &Opt,
analytics: &'static dyn Analytics,
analytics: Arc<dyn Analytics>,
) {
let http_payload_size_limit = opt.http_payload_size_limit.get_bytes() as usize;
config
.app_data(data)
.app_data(web::Data::new(analytics))
.app_data(web::Data::from(analytics))
.app_data(
web::JsonConfig::default()
.content_type(|mime| mime == mime::APPLICATION_JSON)

View File

@ -1,4 +1,5 @@
use std::env;
use std::sync::Arc;
use actix_web::HttpServer;
use meilisearch_http::analytics;
@ -46,15 +47,15 @@ async fn main() -> anyhow::Result<()> {
let meilisearch = setup_meilisearch(&opt)?;
#[cfg(all(not(debug_assertions), feature = "analytics"))]
let analytics = if !opt.no_analytics {
analytics::SegmentAnalytics::new(&opt, &meilisearch).await as &'static dyn Analytics
let (analytics, user) = if !opt.no_analytics {
analytics::SegmentAnalytics::new(&opt, &meilisearch).await
} else {
analytics::MockAnalytics::new(&opt) as &'static dyn Analytics
analytics::MockAnalytics::new(&opt)
};
#[cfg(any(debug_assertions, not(feature = "analytics")))]
let analytics = analytics::MockAnalytics::new(&opt);
let (analytics, user) = analytics::MockAnalytics::new(&opt);
print_launch_resume(&opt, analytics);
print_launch_resume(&opt, &user);
run_http(meilisearch, opt, analytics).await?;
@ -64,12 +65,12 @@ async fn main() -> anyhow::Result<()> {
async fn run_http(
data: MeiliSearch,
opt: Opt,
analytics: &'static dyn Analytics,
analytics: Arc<dyn Analytics>,
) -> anyhow::Result<()> {
let _enable_dashboard = &opt.env == "development";
let opt_clone = opt.clone();
let http_server =
HttpServer::new(move || create_app!(data, _enable_dashboard, opt_clone, analytics))
HttpServer::new(move || create_app!(data, _enable_dashboard, opt_clone, analytics.clone()))
// Disable signals allows the server to terminate immediately when a user enter CTRL-C
.disable_signals();
@ -84,7 +85,7 @@ async fn run_http(
Ok(())
}
pub fn print_launch_resume(opt: &Opt, analytics: &'static dyn Analytics) {
pub fn print_launch_resume(opt: &Opt, user: &str) {
let commit_sha = option_env!("VERGEN_GIT_SHA").unwrap_or("unknown");
let commit_date = option_env!("VERGEN_GIT_COMMIT_TIMESTAMP").unwrap_or("unknown");
@ -127,9 +128,8 @@ Anonymous telemetry:\t\"Enabled\""
}
}
let analytics = analytics.to_string();
if !analytics.is_empty() {
eprintln!("Instance UID:\t\t\"{}\"", analytics);
if !user.is_empty() {
eprintln!("Instance UID:\t\t\"{}\"", user);
}
eprintln!();