From e226b1a87fd4b5b902fe42270827b3237c6f4739 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 12 Oct 2021 11:22:44 +0200 Subject: [PATCH] rewrite the main analytics module and the information sent in the tick --- Cargo.lock | 15 +- meilisearch-http/Cargo.toml | 5 +- meilisearch-http/src/analytics.rs | 252 +++++++++++++++++------------- meilisearch-http/src/main.rs | 5 +- 4 files changed, 163 insertions(+), 114 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fe025ebe1..36849a58f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1658,6 +1658,7 @@ dependencies = [ "regex", "reqwest", "rustls", + "segment", "serde", "serde_json", "serde_url_params", @@ -1677,7 +1678,6 @@ dependencies = [ "uuid", "vergen", "walkdir", - "whoami", "zip", ] @@ -2540,6 +2540,19 @@ dependencies = [ "untrusted", ] +[[package]] +name = "segment" +version = "0.1.1" +source = "git+https://github.com/meilisearch/segment#656b91e1f7a2c6443e2a8ed59f8942400e9a811e" +dependencies = [ + "async-trait", + "chrono", + "reqwest", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "semver" version = "0.9.0" diff --git a/meilisearch-http/Cargo.toml b/meilisearch-http/Cargo.toml index 394892db9..ffc660c80 100644 --- a/meilisearch-http/Cargo.toml +++ b/meilisearch-http/Cargo.toml @@ -55,6 +55,7 @@ rand = "0.8.4" rayon = "1.5.1" regex = "1.5.4" rustls = "0.19.1" +segment = { git = "https://github.com/meilisearch/segment", optional = true } serde = { version = "1.0.130", features = ["derive"] } serde_json = { version = "1.0.67", features = ["preserve_order"] } sha2 = "0.9.6" @@ -69,8 +70,6 @@ uuid = { version = "0.8.2", features = ["serde"] } walkdir = "2.3.2" obkv = "0.2.0" pin-project = "1.0.8" -whoami = { version = "1.1.3", optional = true } -reqwest = { version = "0.11.4", features = ["json", "rustls-tls"], default-features = false, optional = true } sysinfo = "0.20.2" tokio-stream = "0.1.7" @@ -91,7 +90,7 @@ mini-dashboard = [ "tempfile", "zip", ] -analytics = ["whoami", "reqwest"] +analytics = ["segment"] default = ["analytics", "mini-dashboard"] [target.'cfg(target_os = "linux")'.dependencies] diff --git a/meilisearch-http/src/analytics.rs b/meilisearch-http/src/analytics.rs index 596b69aa0..41f487bb4 100644 --- a/meilisearch-http/src/analytics.rs +++ b/meilisearch-http/src/analytics.rs @@ -1,126 +1,164 @@ -use std::hash::{Hash, Hasher}; -use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; - -use log::debug; +use meilisearch_lib::index_controller::Stats; use meilisearch_lib::MeiliSearch; -use serde::Serialize; -use siphasher::sip::SipHasher; +use once_cell::sync::Lazy; +use segment::message::{Identify, Track, User}; +use segment::{AutoBatcher, Batcher, HttpClient}; +use serde_json::{json, Value}; +use std::fmt::Display; +use std::time::{Duration, Instant}; +use sysinfo::DiskExt; +use sysinfo::ProcessorExt; +use sysinfo::System; +use sysinfo::SystemExt; +use tokio::sync::Mutex; +use uuid::Uuid; use crate::Opt; -const AMPLITUDE_API_KEY: &str = "f7fba398780e06d8fe6666a9be7e3d47"; +const SEGMENT_API_KEY: &str = "vHi89WrNDckHSQssyUJqLvIyp2QFITSC"; -#[derive(Debug, Serialize)] -struct EventProperties { - database_size: u64, - last_update_timestamp: Option, //timestamp - number_of_documents: Vec, +pub struct Analytics { + user: User, + opt: Opt, + batcher: Mutex, } -impl EventProperties { - async fn from(data: MeiliSearch) -> anyhow::Result { - let stats = data.get_all_stats().await?; +impl Analytics { + pub fn publish(&'static self, event_name: String, send: Value) { + tokio::spawn(async move { + let _ = self + .batcher + .lock() + .await + .push(Track { + user: self.user.clone(), + event: event_name.clone(), + properties: send, + ..Default::default() + }) + .await; + println!("ANALYTICS: {} added to batch", event_name) + }); + } - let database_size = stats.database_size; - let last_update_timestamp = stats.last_update.map(|u| u.timestamp()); + pub fn tick(&'static self, meilisearch: MeiliSearch) { + tokio::spawn(async move { + loop { + tokio::time::sleep(Duration::from_secs(60)).await; // 1 minutes + println!("ANALYTICS: should do things"); + + if let Ok(stats) = meilisearch.get_all_stats().await { + let traits = Self::compute_traits(&self.opt, stats); + let user = self.user.clone(); + println!("ANALYTICS: Pushing our identify tick"); + let _ = self + .batcher + .lock() + .await + .push(Identify { + user, + traits, + ..Default::default() + }) + .await; + } + println!("ANALYTICS: Pushing our batch"); + let _ = self.batcher.lock().await.flush().await; + } + }); + } +} + +impl Analytics { + pub async fn new(opt: &Opt, meilisearch: &MeiliSearch) -> &'static Self { + let user_id = std::fs::read_to_string(opt.db_path.join("user-id")); + let first_time_run = user_id.is_err(); + let user_id = user_id.unwrap_or(Uuid::new_v4().to_string()); + let _ = std::fs::write(opt.db_path.join("user-id"), user_id.as_bytes()); + let client = HttpClient::default(); + let user = User::UserId { + user_id: user_id.clone(), + }; + let batcher = Batcher::new(None); + let batcher = Mutex::new(AutoBatcher::new( + client, + batcher, + SEGMENT_API_KEY.to_string(), + )); + let segment = Box::new(Self { + user, + opt: opt.clone(), + batcher, + }); + let segment = Box::leak(segment); + + // send an identify event + let _ = segment + .batcher + .lock() + .await + .push(Identify { + user: segment.user.clone(), + // TODO: TAMO: what should we do when meilisearch is broken at start + traits: Self::compute_traits( + &segment.opt, + meilisearch.get_all_stats().await.unwrap(), + ), + ..Default::default() + }) + .await; + println!("ANALYTICS: pushed the identify event"); + + // send the associated track event + if first_time_run { + segment.publish("Launched for the first time".to_string(), json!({})); + } + + // start the runtime tick + segment.tick(meilisearch.clone()); + + segment + } + + fn compute_traits(opt: &Opt, stats: Stats) -> Value { + static FIRST_START_TIMESTAMP: Lazy = Lazy::new(|| Instant::now()); + static SYSTEM: Lazy = Lazy::new(|| { + let mut sys = System::new_all(); + sys.refresh_all(); + json!({ + "distribution": sys.name().zip(sys.kernel_version()).map(|(name, version)| format!("{}: {}", name, version)), + "core_number": sys.processors().len(), + "ram_size": sys.total_memory(), + "frequency": sys.processors().iter().map(|cpu| cpu.frequency()).sum::() / sys.processors().len() as u64, + "disk_size": sys.disks().iter().map(|disk| disk.available_space()).max(), + "server_provider": std::env::var("MEILI_SERVER_PROVIDER").ok(), + }) + }); let number_of_documents = stats .indexes .values() .map(|index| index.number_of_documents) - .collect(); + .collect::>(); - Ok(EventProperties { - database_size, - last_update_timestamp, - number_of_documents, + json!({ + "system": *SYSTEM, + "stats": { + "database_size": stats.database_size, + "indexes_number": stats.indexes.len(), + "documents_number": number_of_documents, + }, + "infos": { + "version": env!("CARGO_PKG_VERSION").to_string(), + "env": opt.env.clone(), + "snapshot": opt.schedule_snapshot, + "start_since_days": FIRST_START_TIMESTAMP.elapsed().as_secs() / 60 * 60 * 24, // one day + }, }) } } -#[derive(Debug, Serialize)] -struct UserProperties<'a> { - env: &'a str, - start_since_days: u64, - user_email: Option, - server_provider: Option, -} - -#[derive(Debug, Serialize)] -struct Event<'a> { - user_id: &'a str, - event_type: &'a str, - device_id: &'a str, - time: u64, - app_version: &'a str, - user_properties: UserProperties<'a>, - event_properties: Option, -} - -#[derive(Debug, Serialize)] -struct AmplitudeRequest<'a> { - api_key: &'a str, - events: Vec>, -} - -pub async fn analytics_sender(data: MeiliSearch, opt: Opt) { - let username = whoami::username(); - let hostname = whoami::hostname(); - let platform = whoami::platform(); - - let uid = username + &hostname + &platform.to_string(); - - let mut hasher = SipHasher::new(); - uid.hash(&mut hasher); - let hash = hasher.finish(); - - let uid = format!("{:X}", hash); - let platform = platform.to_string(); - let first_start = Instant::now(); - - loop { - let n = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); - let user_id = &uid; - let device_id = &platform; - let time = n.as_secs(); - let event_type = "runtime_tick"; - let elapsed_since_start = first_start.elapsed().as_secs() / 86_400; // One day - let event_properties = EventProperties::from(data.clone()).await.ok(); - let app_version = env!("CARGO_PKG_VERSION").to_string(); - let app_version = app_version.as_str(); - let user_email = std::env::var("MEILI_USER_EMAIL").ok(); - let server_provider = std::env::var("MEILI_SERVER_PROVIDER").ok(); - let user_properties = UserProperties { - env: &opt.env, - start_since_days: elapsed_since_start, - user_email, - server_provider, - }; - - let event = Event { - user_id, - event_type, - device_id, - time, - app_version, - user_properties, - event_properties, - }; - - let request = AmplitudeRequest { - api_key: AMPLITUDE_API_KEY, - events: vec![event], - }; - - let response = reqwest::Client::new() - .post("https://api2.amplitude.com/2/httpapi") - .timeout(Duration::from_secs(60)) // 1 minute max - .json(&request) - .send() - .await; - if let Err(e) = response { - debug!("Unsuccessful call to Amplitude: {}", e); - } - - tokio::time::sleep(Duration::from_secs(3600)).await; +impl Display for Analytics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.user) } } diff --git a/meilisearch-http/src/main.rs b/meilisearch-http/src/main.rs index 864015dd1..73105927e 100644 --- a/meilisearch-http/src/main.rs +++ b/meilisearch-http/src/main.rs @@ -48,9 +48,8 @@ async fn main() -> anyhow::Result<()> { #[cfg(all(not(debug_assertions), feature = "analytics"))] if !opt.no_analytics { - let analytics_data = meilisearch.clone(); - let analytics_opt = opt.clone(); - tokio::task::spawn(analytics::analytics_sender(analytics_data, analytics_opt)); + let analytics = analytics::Analytics::new(&opt, &meilisearch).await; + println!("go my analytics back"); } print_launch_resume(&opt);