From 3ad8311bdd50f13483ceafe80cd3737149328661 Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 27 Oct 2021 18:16:13 +0200 Subject: [PATCH] split the analytics in a module --- meilisearch-http/src/analytics.rs | 617 ------------------ .../src/analytics/mock_analytics.rs | 50 ++ meilisearch-http/src/analytics/mod.rs | 86 +++ .../src/analytics/segment_analytics.rs | 497 ++++++++++++++ 4 files changed, 633 insertions(+), 617 deletions(-) delete mode 100644 meilisearch-http/src/analytics.rs create mode 100644 meilisearch-http/src/analytics/mock_analytics.rs create mode 100644 meilisearch-http/src/analytics/mod.rs create mode 100644 meilisearch-http/src/analytics/segment_analytics.rs diff --git a/meilisearch-http/src/analytics.rs b/meilisearch-http/src/analytics.rs deleted file mode 100644 index 9e316f23d..000000000 --- a/meilisearch-http/src/analytics.rs +++ /dev/null @@ -1,617 +0,0 @@ -use std::fmt::Display; -use std::fs; -use std::path::{Path, PathBuf}; - -use actix_web::HttpRequest; -use meilisearch_lib::index::SearchQuery; -use once_cell::sync::Lazy; -use platform_dirs::AppDirs; -use serde_json::Value; - -use crate::routes::indexes::documents::UpdateDocumentsQuery; -use crate::Opt; - -/// The MeiliSearch config dir: -/// `~/.config/MeiliSearch` on *NIX or *BSD. -/// `~/Library/ApplicationSupport` on macOS. -/// `%APPDATA` (= `C:\Users%USERNAME%\AppData\Roaming`) on windows. -static MEILISEARCH_CONFIG_PATH: Lazy> = - Lazy::new(|| AppDirs::new(Some("MeiliSearch"), false).map(|appdir| appdir.config_dir)); - -fn config_user_id_path(db_path: &Path) -> Option { - db_path - .canonicalize() - .ok() - .map(|path| { - path.join("instance-uid") - .display() - .to_string() - .replace("/", "-") - }) - .zip(MEILISEARCH_CONFIG_PATH.as_ref()) - .map(|(filename, config_path)| config_path.join(filename.trim_start_matches('-'))) -} - -/// Look for the instance-uid in the `data.ms` or in `~/.config/MeiliSearch/path-to-db-instance-uid` -fn find_user_id(db_path: &Path) -> Option { - fs::read_to_string(db_path.join("instance-uid")) - .ok() - .or_else(|| fs::read_to_string(&config_user_id_path(db_path)?).ok()) -} - -#[cfg(all(not(debug_assertions), feature = "analytics"))] -/// Write the instance-uid in the `data.ms` and in `~/.config/MeiliSearch/path-to-db-instance-uid`. Ignore the errors. -fn write_user_id(db_path: &Path, user_id: &str) { - let _ = fs::write(db_path.join("instance-uid"), user_id.as_bytes()); - if let Some((meilisearch_config_path, user_id_path)) = MEILISEARCH_CONFIG_PATH - .as_ref() - .zip(config_user_id_path(db_path)) - { - let _ = fs::create_dir_all(&meilisearch_config_path); - let _ = fs::write(user_id_path, user_id.as_bytes()); - } -} - -// if we are in release mode and the feature analytics was enabled -#[cfg(all(not(debug_assertions), feature = "analytics"))] -mod segment { - use std::collections::{HashMap, HashSet}; - use std::fmt::Display; - use std::time::{Duration, Instant}; - - use actix_web::http::header::USER_AGENT; - use actix_web::HttpRequest; - use http::header::CONTENT_TYPE; - use meilisearch_lib::index::SearchQuery; - use meilisearch_lib::index_controller::Stats; - use meilisearch_lib::MeiliSearch; - use once_cell::sync::Lazy; - use regex::Regex; - use segment::message::{Identify, Track, User}; - use segment::{AutoBatcher, Batcher, HttpClient}; - use serde_json::{json, Value}; - use sysinfo::{DiskExt, System, SystemExt}; - use tokio::sync::Mutex; - use uuid::Uuid; - - use crate::analytics::Analytics; - use crate::routes::indexes::documents::UpdateDocumentsQuery; - use crate::Opt; - - const SEGMENT_API_KEY: &str = "vHi89WrNDckHSQssyUJqLvIyp2QFITSC"; - - pub fn extract_user_agents(request: &HttpRequest) -> Vec { - request - .headers() - .get(USER_AGENT) - .map(|header| header.to_str().ok()) - .flatten() - .unwrap_or("unknown") - .split(';') - .map(str::trim) - .map(ToString::to_string) - .collect() - } - - pub struct SegmentAnalytics { - user: User, - opt: Opt, - batcher: Mutex, - post_search_batcher: Mutex, - get_search_batcher: Mutex, - add_documents_batcher: Mutex, - update_documents_batcher: Mutex, - } - - impl SegmentAnalytics { - fn compute_traits(opt: &Opt, stats: Stats) -> Value { - static FIRST_START_TIMESTAMP: Lazy = Lazy::new(Instant::now); - const SYSTEM: Lazy = Lazy::new(|| { - let mut sys = System::new_all(); - sys.refresh_all(); - let kernel_version = sys - .kernel_version() - .map(|k| k.split_once("-").map(|(k, _)| k.to_string())) - .flatten(); - json!({ - "distribution": sys.name(), - "kernel_version": kernel_version, - "cores": sys.processors().len(), - "ram_size": sys.total_memory(), - "disk_size": sys.disks().iter().map(|disk| disk.available_space()).max(), - "server_provider": std::env::var("MEILI_SERVER_PROVIDER").ok(), - }) - }); - let infos = json!({ - "env": opt.env.clone(), - "has_snapshot": opt.schedule_snapshot, - }); - - let number_of_documents = stats - .indexes - .values() - .map(|index| index.number_of_documents) - .collect::>(); - - json!({ - "system": *SYSTEM, - "stats": { - "database_size": stats.database_size, - "indexes_number": stats.indexes.len(), - "documents_number": number_of_documents, - "start_since_days": FIRST_START_TIMESTAMP.elapsed().as_secs() / (60 * 60 * 24), // one day - }, - "infos": infos, - }) - } - - pub async fn new(opt: &Opt, meilisearch: &MeiliSearch) -> &'static Self { - let user_id = super::find_user_id(&opt.db_path); - let first_time_run = user_id.is_none(); - let user_id = user_id.unwrap_or_else(|| Uuid::new_v4().to_string()); - super::write_user_id(&opt.db_path, &user_id); - - let client = HttpClient::default(); - let user = User::UserId { - user_id: user_id.clone(), - }; - let batcher = Mutex::new(AutoBatcher::new( - client, - Batcher::new(None), - SEGMENT_API_KEY.to_string(), - )); - let segment = Box::new(Self { - user, - opt: opt.clone(), - batcher, - post_search_batcher: Mutex::new(SearchBatcher::default()), - get_search_batcher: Mutex::new(SearchBatcher::default()), - add_documents_batcher: Mutex::new(DocumentsBatcher::default()), - update_documents_batcher: Mutex::new(DocumentsBatcher::default()), - }); - let segment = Box::leak(segment); - - // batch the launched for the first time track event - if first_time_run { - segment.publish("Launched".to_string(), json!({}), None); - } - segment.tick(meilisearch.clone()); - segment - } - - fn tick(&'static self, meilisearch: MeiliSearch) { - tokio::spawn(async move { - loop { - if let Ok(stats) = meilisearch.get_all_stats().await { - let _ = self - .batcher - .lock() - .await - .push(Identify { - context: Some(json!({ - "app": { - "version": env!("CARGO_PKG_VERSION").to_string(), - }, - })), - user: self.user.clone(), - traits: Self::compute_traits(&self.opt, stats), - ..Default::default() - }) - .await; - } - let get_search = std::mem::take(&mut *self.get_search_batcher.lock().await) - .into_event(&self.user, "Document Searched GET"); - let post_search = std::mem::take(&mut *self.post_search_batcher.lock().await) - .into_event(&self.user, "Document Searched POST"); - let add_documents = - std::mem::take(&mut *self.add_documents_batcher.lock().await) - .into_event(&self.user, "Documents Added"); - let update_documents = - std::mem::take(&mut *self.update_documents_batcher.lock().await) - .into_event(&self.user, "Documents Updated"); - // keep the lock on the batcher just for these five operations - { - let mut batcher = self.batcher.lock().await; - if let Some(get_search) = get_search { - let _ = batcher.push(get_search).await; - } - if let Some(post_search) = post_search { - let _ = batcher.push(post_search).await; - } - if let Some(add_documents) = add_documents { - let _ = batcher.push(add_documents).await; - } - if let Some(update_documents) = update_documents { - let _ = batcher.push(update_documents).await; - } - let _ = batcher.flush().await; - } - const INTERVAL: Duration = Duration::from_secs(60 * 60); // one hour - tokio::time::sleep(INTERVAL).await; - } - }); - } - - fn start_search( - &'static self, - batcher: &'static Mutex, - query: &SearchQuery, - request: &HttpRequest, - ) { - let user_agent = extract_user_agents(request); - let sorted = query.sort.is_some() as usize; - let sort_with_geo_point = query - .sort - .as_ref() - .map_or(false, |s| s.iter().any(|s| s.contains("_geoPoint("))); - let sort_criteria_terms = query.sort.as_ref().map_or(0, |s| s.len()); - - // since there is quite a bit of computation made on the filter we are going to do that in the async task - let filter = query.filter.clone(); - let queried = query.q.is_some(); - let nb_terms = query.q.as_ref().map_or(0, |s| s.split_whitespace().count()); - - let max_limit = query.limit; - let max_offset = query.offset.unwrap_or_default(); - - // to avoid blocking the search we are going to do the heavier computation and take the - // batcher's mutex in an async task - tokio::spawn(async move { - const RE: Lazy = Lazy::new(|| Regex::new("AND | OR").unwrap()); - - let filtered = filter.is_some() as usize; - let syntax = match filter.as_ref() { - Some(Value::String(_)) => "string".to_string(), - Some(Value::Array(values)) => { - if values - .iter() - .map(|v| v.to_string()) - .any(|s| RE.is_match(&s)) - { - "mixed".to_string() - } else { - "array".to_string() - } - } - _ => "none".to_string(), - }; - let stringified_filters = filter.map_or(String::new(), |v| v.to_string()); - let filter_with_geo_radius = stringified_filters.contains("_geoRadius("); - let filter_number_of_criteria = RE.split(&stringified_filters).count(); - - let mut search_batcher = batcher.lock().await; - user_agent.into_iter().for_each(|ua| { - search_batcher.user_agents.insert(ua); - }); - search_batcher.total_received += 1; - - // sort - search_batcher.sort_with_geo_point |= sort_with_geo_point; - search_batcher.sort_sum_of_criteria_terms += sort_criteria_terms; - search_batcher.sort_total_number_of_criteria += sorted; - - // filter - search_batcher.filter_with_geo_radius |= filter_with_geo_radius; - search_batcher.filter_sum_of_criteria_terms += filter_number_of_criteria; - search_batcher.filter_total_number_of_criteria += filtered as usize; - *search_batcher.used_syntax.entry(syntax).or_insert(0) += 1; - - // q - search_batcher.sum_of_terms_count += nb_terms; - search_batcher.total_number_of_q += queried as usize; - - // pagination - search_batcher.max_limit = search_batcher.max_limit.max(max_limit); - search_batcher.max_offset = search_batcher.max_offset.max(max_offset); - }); - } - - fn batch_documents( - &'static self, - batcher: &'static Mutex, - documents_query: &UpdateDocumentsQuery, - index_creation: bool, - request: &HttpRequest, - ) { - let user_agents = extract_user_agents(request); - let primary_key = documents_query.primary_key.clone(); - let content_type = request - .headers() - .get(CONTENT_TYPE) - .map(|s| s.to_str().unwrap_or("unkown")) - .unwrap() - .to_string(); - - tokio::spawn(async move { - let mut lock = batcher.lock().await; - for user_agent in user_agents { - lock.user_agents.insert(user_agent); - } - lock.content_types.insert(content_type); - if let Some(primary_key) = primary_key { - lock.primary_keys.insert(primary_key); - } - lock.index_creation |= index_creation; - lock.updated = true; - // drop the lock here - }); - } - } - - impl super::Analytics for SegmentAnalytics { - fn publish(&'static self, event_name: String, send: Value, request: Option<&HttpRequest>) { - let content_type = request - .map(|req| req.headers().get(USER_AGENT)) - .flatten() - .map(|header| header.to_str().unwrap_or("unknown").to_string()); - - tokio::spawn(async move { - let _ = self - .batcher - .lock() - .await - .push(Track { - user: self.user.clone(), - event: event_name.clone(), - context: content_type.map(|user_agent| json!({ "user-agent": user_agent.split(";").map(str::trim).collect::>() })), - properties: send, - ..Default::default() - }) - .await; - }); - } - - fn start_get_search(&'static self, query: &SearchQuery, request: &HttpRequest) { - self.start_search(&self.get_search_batcher, query, request) - } - - fn end_get_search(&'static self, process_time: usize) { - tokio::spawn(async move { - let mut search_batcher = self.get_search_batcher.lock().await; - search_batcher.total_succeeded += 1; - search_batcher.time_spent.push(process_time); - }); - } - - fn start_post_search(&'static self, query: &SearchQuery, request: &HttpRequest) { - self.start_search(&self.post_search_batcher, query, request) - } - - fn end_post_search(&'static self, process_time: usize) { - tokio::spawn(async move { - let mut search_batcher = self.post_search_batcher.lock().await; - search_batcher.total_succeeded += 1; - search_batcher.time_spent.push(process_time); - }); - } - - fn add_documents( - &'static self, - documents_query: &UpdateDocumentsQuery, - index_creation: bool, - request: &HttpRequest, - ) { - self.batch_documents( - &self.add_documents_batcher, - documents_query, - index_creation, - request, - ) - } - - fn update_documents( - &'static self, - documents_query: &UpdateDocumentsQuery, - index_creation: bool, - request: &HttpRequest, - ) { - self.batch_documents( - &self.update_documents_batcher, - documents_query, - index_creation, - request, - ) - } - } - - impl Display for SegmentAnalytics { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.user) - } - } - - #[derive(Default)] - pub struct SearchBatcher { - // context - user_agents: HashSet, - - // requests - total_received: usize, - total_succeeded: usize, - time_spent: Vec, - - // sort - sort_with_geo_point: bool, - // everytime a request has a filter, this field must be incremented by the number of terms it contains - sort_sum_of_criteria_terms: usize, - // everytime a request has a filter, this field must be incremented by one - sort_total_number_of_criteria: usize, - - // filter - filter_with_geo_radius: bool, - // everytime a request has a filter, this field must be incremented by the number of terms it contains - filter_sum_of_criteria_terms: usize, - // everytime a request has a filter, this field must be incremented by one - filter_total_number_of_criteria: usize, - used_syntax: HashMap, - - // q - // everytime a request has a q field, this field must be incremented by the number of terms - sum_of_terms_count: usize, - // everytime a request has a q field, this field must be incremented by one - total_number_of_q: usize, - - // pagination - max_limit: usize, - max_offset: usize, - } - - impl SearchBatcher { - pub fn into_event(mut self, user: &User, event_name: &str) -> Option { - if self.total_received == 0 { - None - } else { - let context = Some(json!({ "user-agent": self.user_agents})); - let percentile_99th = 0.99 * (self.total_succeeded as f64 - 1.) + 1.; - self.time_spent.drain(percentile_99th as usize..); - - let properties = json!({ - "requests": { - "99th_response_time": format!("{:.2}", self.time_spent.iter().sum::() as f64 / self.time_spent.len() as f64), - "total_succeeded": self.total_succeeded, - "total_failed": self.total_received.saturating_sub(self.total_succeeded), // just to be sure we never panics - "total_received": self.total_received, - }, - "sort": { - "with_geoPoint": self.sort_with_geo_point, - "avg_criteria_number": format!("{:.2}", self.sort_sum_of_criteria_terms as f64 / self.sort_total_number_of_criteria as f64), - }, - "filter": { - "with_geoRadius": self.filter_with_geo_radius, - "avg_criteria_number": format!("{:.2}", self.filter_sum_of_criteria_terms as f64 / self.filter_total_number_of_criteria as f64), - "most_used_syntax": self.used_syntax.iter().max_by_key(|(_, v)| *v).map(|(k, _)| json!(k)).unwrap_or_else(|| json!(null)), - }, - "q": { - "avg_terms_number": format!("{:.2}", self.sum_of_terms_count as f64 / self.total_number_of_q as f64), - }, - "pagination": { - "max_limit": self.max_limit, - "max_offset": self.max_offset, - }, - }); - - Some(Track { - user: user.clone(), - event: event_name.to_string(), - context, - properties, - ..Default::default() - }) - } - } - } - - #[derive(Default)] - pub struct DocumentsBatcher { - // set to true when at least one request was received - updated: bool, - - // context - user_agents: HashSet, - - content_types: HashSet, - primary_keys: HashSet, - index_creation: bool, - } - - impl DocumentsBatcher { - pub fn into_event(self, user: &User, event_name: &str) -> Option { - if !self.updated { - None - } else { - let context = Some(json!({ "user-agent": self.user_agents})); - - let properties = json!({ - "payload_type": self.content_types, - "primary_key": self.primary_keys, - "index_creation": self.index_creation, - }); - - Some(Track { - user: user.clone(), - event: event_name.to_string(), - context, - properties, - ..Default::default() - }) - } - } - } -} - -// if we are in debug mode OR the analytics feature is disabled -#[cfg(any(debug_assertions, not(feature = "analytics")))] -pub type SegmentAnalytics = MockAnalytics; -#[cfg(all(not(debug_assertions), feature = "analytics"))] -pub type SegmentAnalytics = segment::SegmentAnalytics; - -pub struct MockAnalytics { - user: String, -} - -impl MockAnalytics { - pub fn new(opt: &Opt) -> &'static Self { - let user = find_user_id(&opt.db_path).unwrap_or_default(); - let analytics = Box::new(Self { user }); - Box::leak(analytics) - } -} - -impl Analytics for MockAnalytics { - // These methods are noop and should be optimized out - fn publish(&'static self, _event_name: String, _send: Value, _request: Option<&HttpRequest>) {} - fn start_get_search(&'static self, _query: &SearchQuery, _request: &HttpRequest) {} - fn end_get_search(&'static self, _process_time: usize) {} - fn start_post_search(&'static self, _query: &SearchQuery, _request: &HttpRequest) {} - fn end_post_search(&'static self, _process_time: usize) {} - fn add_documents( - &'static self, - _documents_query: &UpdateDocumentsQuery, - _index_creation: bool, - _request: &HttpRequest, - ) { - } - fn update_documents( - &'static self, - _documents_query: &UpdateDocumentsQuery, - _index_creation: bool, - _request: &HttpRequest, - ) { - } -} - -impl Display for MockAnalytics { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.user) - } -} - -pub trait Analytics: Display + Sync + Send { - /// The method used to publish most analytics that do not need to be batched every hours - fn publish(&'static self, event_name: String, send: Value, request: Option<&HttpRequest>); - - /// This method should be called to batch a get search request - fn start_get_search(&'static self, query: &SearchQuery, request: &HttpRequest); - /// This method should be called once a get search request has succeeded - fn end_get_search(&'static self, process_time: usize); - - /// This method should be called to batch a get search request - fn start_post_search(&'static self, query: &SearchQuery, request: &HttpRequest); - /// This method should be called once a post search request has succeeded - fn end_post_search(&'static self, process_time: usize); - - // this method should be called to batch a add documents request - fn add_documents( - &'static self, - documents_query: &UpdateDocumentsQuery, - index_creation: bool, - request: &HttpRequest, - ); - // this method should be called to batch a update documents request - fn update_documents( - &'static self, - documents_query: &UpdateDocumentsQuery, - index_creation: bool, - request: &HttpRequest, - ); -} diff --git a/meilisearch-http/src/analytics/mock_analytics.rs b/meilisearch-http/src/analytics/mock_analytics.rs new file mode 100644 index 000000000..64cd5673f --- /dev/null +++ b/meilisearch-http/src/analytics/mock_analytics.rs @@ -0,0 +1,50 @@ +use std::fmt::Display; + +use actix_web::HttpRequest; +use meilisearch_lib::index::SearchQuery; +use serde_json::Value; + +use crate::{routes::indexes::documents::UpdateDocumentsQuery, Opt}; + +use super::{find_user_id, Analytics}; + +pub struct MockAnalytics { + user: String, +} + +impl MockAnalytics { + pub fn new(opt: &Opt) -> &'static Self { + let user = find_user_id(&opt.db_path).unwrap_or_default(); + let analytics = Box::new(Self { user }); + Box::leak(analytics) + } +} + +impl Analytics for MockAnalytics { + // These methods are noop and should be optimized out + fn publish(&'static self, _event_name: String, _send: Value, _request: Option<&HttpRequest>) {} + fn start_get_search(&'static self, _query: &SearchQuery, _request: &HttpRequest) {} + fn end_get_search(&'static self, _process_time: usize) {} + fn start_post_search(&'static self, _query: &SearchQuery, _request: &HttpRequest) {} + fn end_post_search(&'static self, _process_time: usize) {} + fn add_documents( + &'static self, + _documents_query: &UpdateDocumentsQuery, + _index_creation: bool, + _request: &HttpRequest, + ) { + } + fn update_documents( + &'static self, + _documents_query: &UpdateDocumentsQuery, + _index_creation: bool, + _request: &HttpRequest, + ) { + } +} + +impl Display for MockAnalytics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.user) + } +} diff --git a/meilisearch-http/src/analytics/mod.rs b/meilisearch-http/src/analytics/mod.rs new file mode 100644 index 000000000..48ba77ddb --- /dev/null +++ b/meilisearch-http/src/analytics/mod.rs @@ -0,0 +1,86 @@ +mod mock_analytics; +// if we are in release mode and the feature analytics was enabled +#[cfg(all(not(debug_assertions), feature = "analytics"))] +mod segment_analytics; + +use std::fmt::Display; +use std::fs; +use std::path::{Path, PathBuf}; + +use actix_web::HttpRequest; +use meilisearch_lib::index::SearchQuery; +use once_cell::sync::Lazy; +use platform_dirs::AppDirs; +use serde_json::Value; + +use crate::routes::indexes::documents::UpdateDocumentsQuery; + +pub use mock_analytics::MockAnalytics; + +// if we are in debug mode OR the analytics feature is disabled +// the `SegmentAnalytics` point to the mock instead of the real analytics +#[cfg(any(debug_assertions, not(feature = "analytics")))] +pub type SegmentAnalytics = MockAnalytics; + +// if we are in release mode and the feature analytics was enabled +// we use the real analytics +#[cfg(all(not(debug_assertions), feature = "analytics"))] +pub type SegmentAnalytics = segment_analytics::SegmentAnalytics; + +/// The MeiliSearch config dir: +/// `~/.config/MeiliSearch` on *NIX or *BSD. +/// `~/Library/ApplicationSupport` on macOS. +/// `%APPDATA` (= `C:\Users%USERNAME%\AppData\Roaming`) on windows. +static MEILISEARCH_CONFIG_PATH: Lazy> = + Lazy::new(|| AppDirs::new(Some("MeiliSearch"), false).map(|appdir| appdir.config_dir)); + +fn config_user_id_path(db_path: &Path) -> Option { + db_path + .canonicalize() + .ok() + .map(|path| { + path.join("instance-uid") + .display() + .to_string() + .replace("/", "-") + }) + .zip(MEILISEARCH_CONFIG_PATH.as_ref()) + .map(|(filename, config_path)| config_path.join(filename.trim_start_matches('-'))) +} + +/// Look for the instance-uid in the `data.ms` or in `~/.config/MeiliSearch/path-to-db-instance-uid` +fn find_user_id(db_path: &Path) -> Option { + fs::read_to_string(db_path.join("instance-uid")) + .ok() + .or_else(|| fs::read_to_string(&config_user_id_path(db_path)?).ok()) +} + +pub trait Analytics: Display + Sync + Send { + /// The method used to publish most analytics that do not need to be batched every hours + fn publish(&'static self, event_name: String, send: Value, request: Option<&HttpRequest>); + + /// This method should be called to batch a get search request + fn start_get_search(&'static self, query: &SearchQuery, request: &HttpRequest); + /// This method should be called once a get search request has succeeded + fn end_get_search(&'static self, process_time: usize); + + /// This method should be called to batch a get search request + fn start_post_search(&'static self, query: &SearchQuery, request: &HttpRequest); + /// This method should be called once a post search request has succeeded + fn end_post_search(&'static self, process_time: usize); + + // this method should be called to batch a add documents request + fn add_documents( + &'static self, + documents_query: &UpdateDocumentsQuery, + index_creation: bool, + request: &HttpRequest, + ); + // this method should be called to batch a update documents request + fn update_documents( + &'static self, + documents_query: &UpdateDocumentsQuery, + index_creation: bool, + request: &HttpRequest, + ); +} diff --git a/meilisearch-http/src/analytics/segment_analytics.rs b/meilisearch-http/src/analytics/segment_analytics.rs new file mode 100644 index 000000000..065fff175 --- /dev/null +++ b/meilisearch-http/src/analytics/segment_analytics.rs @@ -0,0 +1,497 @@ +use std::collections::{HashMap, HashSet}; +use std::fmt::Display; +use std::fs; +use std::path::Path; +use std::time::{Duration, Instant}; + +use actix_web::http::header::USER_AGENT; +use actix_web::HttpRequest; +use http::header::CONTENT_TYPE; +use meilisearch_lib::index::SearchQuery; +use meilisearch_lib::index_controller::Stats; +use meilisearch_lib::MeiliSearch; +use once_cell::sync::Lazy; +use regex::Regex; +use segment::message::{Identify, Track, User}; +use segment::{AutoBatcher, Batcher, HttpClient}; +use serde_json::{json, Value}; +use sysinfo::{DiskExt, System, SystemExt}; +use tokio::sync::Mutex; +use uuid::Uuid; + +use crate::analytics::Analytics; +use crate::routes::indexes::documents::UpdateDocumentsQuery; +use crate::Opt; + +use super::{config_user_id_path, MEILISEARCH_CONFIG_PATH}; + +/// Write the instance-uid in the `data.ms` and in `~/.config/MeiliSearch/path-to-db-instance-uid`. Ignore the errors. +fn write_user_id(db_path: &Path, user_id: &str) { + let _ = fs::write(db_path.join("instance-uid"), user_id.as_bytes()); + if let Some((meilisearch_config_path, user_id_path)) = MEILISEARCH_CONFIG_PATH + .as_ref() + .zip(config_user_id_path(db_path)) + { + let _ = fs::create_dir_all(&meilisearch_config_path); + let _ = fs::write(user_id_path, user_id.as_bytes()); + } +} + +const SEGMENT_API_KEY: &str = "vHi89WrNDckHSQssyUJqLvIyp2QFITSC"; + +pub fn extract_user_agents(request: &HttpRequest) -> Vec { + request + .headers() + .get(USER_AGENT) + .map(|header| header.to_str().ok()) + .flatten() + .unwrap_or("unknown") + .split(';') + .map(str::trim) + .map(ToString::to_string) + .collect() +} + +pub struct SegmentAnalytics { + user: User, + opt: Opt, + batcher: Mutex, + post_search_batcher: Mutex, + get_search_batcher: Mutex, + add_documents_batcher: Mutex, + update_documents_batcher: Mutex, +} + +impl SegmentAnalytics { + fn compute_traits(opt: &Opt, stats: Stats) -> Value { + static FIRST_START_TIMESTAMP: Lazy = Lazy::new(Instant::now); + const SYSTEM: Lazy = Lazy::new(|| { + let mut sys = System::new_all(); + sys.refresh_all(); + let kernel_version = sys + .kernel_version() + .map(|k| k.split_once("-").map(|(k, _)| k.to_string())) + .flatten(); + json!({ + "distribution": sys.name(), + "kernel_version": kernel_version, + "cores": sys.processors().len(), + "ram_size": sys.total_memory(), + "disk_size": sys.disks().iter().map(|disk| disk.available_space()).max(), + "server_provider": std::env::var("MEILI_SERVER_PROVIDER").ok(), + }) + }); + let infos = json!({ + "env": opt.env.clone(), + "has_snapshot": opt.schedule_snapshot, + }); + + let number_of_documents = stats + .indexes + .values() + .map(|index| index.number_of_documents) + .collect::>(); + + json!({ + "system": *SYSTEM, + "stats": { + "database_size": stats.database_size, + "indexes_number": stats.indexes.len(), + "documents_number": number_of_documents, + "start_since_days": FIRST_START_TIMESTAMP.elapsed().as_secs() / (60 * 60 * 24), // one day + }, + "infos": infos, + }) + } + + pub async fn new(opt: &Opt, meilisearch: &MeiliSearch) -> &'static Self { + let user_id = super::find_user_id(&opt.db_path); + let first_time_run = user_id.is_none(); + let user_id = user_id.unwrap_or_else(|| Uuid::new_v4().to_string()); + write_user_id(&opt.db_path, &user_id); + + let client = HttpClient::default(); + let user = User::UserId { + user_id: user_id.clone(), + }; + let batcher = Mutex::new(AutoBatcher::new( + client, + Batcher::new(None), + SEGMENT_API_KEY.to_string(), + )); + let segment = Box::new(Self { + user, + opt: opt.clone(), + batcher, + post_search_batcher: Mutex::new(SearchBatcher::default()), + get_search_batcher: Mutex::new(SearchBatcher::default()), + add_documents_batcher: Mutex::new(DocumentsBatcher::default()), + update_documents_batcher: Mutex::new(DocumentsBatcher::default()), + }); + let segment = Box::leak(segment); + + // batch the launched for the first time track event + if first_time_run { + segment.publish("Launched".to_string(), json!({}), None); + } + segment.tick(meilisearch.clone()); + segment + } + + fn tick(&'static self, meilisearch: MeiliSearch) { + tokio::spawn(async move { + loop { + if let Ok(stats) = meilisearch.get_all_stats().await { + let _ = self + .batcher + .lock() + .await + .push(Identify { + context: Some(json!({ + "app": { + "version": env!("CARGO_PKG_VERSION").to_string(), + }, + })), + user: self.user.clone(), + traits: Self::compute_traits(&self.opt, stats), + ..Default::default() + }) + .await; + } + let get_search = std::mem::take(&mut *self.get_search_batcher.lock().await) + .into_event(&self.user, "Document Searched GET"); + let post_search = std::mem::take(&mut *self.post_search_batcher.lock().await) + .into_event(&self.user, "Document Searched POST"); + let add_documents = std::mem::take(&mut *self.add_documents_batcher.lock().await) + .into_event(&self.user, "Documents Added"); + let update_documents = + std::mem::take(&mut *self.update_documents_batcher.lock().await) + .into_event(&self.user, "Documents Updated"); + // keep the lock on the batcher just for these five operations + { + let mut batcher = self.batcher.lock().await; + if let Some(get_search) = get_search { + let _ = batcher.push(get_search).await; + } + if let Some(post_search) = post_search { + let _ = batcher.push(post_search).await; + } + if let Some(add_documents) = add_documents { + let _ = batcher.push(add_documents).await; + } + if let Some(update_documents) = update_documents { + let _ = batcher.push(update_documents).await; + } + let _ = batcher.flush().await; + } + const INTERVAL: Duration = Duration::from_secs(60 * 60); // one hour + tokio::time::sleep(INTERVAL).await; + } + }); + } + + fn start_search( + &'static self, + batcher: &'static Mutex, + query: &SearchQuery, + request: &HttpRequest, + ) { + let user_agent = extract_user_agents(request); + let sorted = query.sort.is_some() as usize; + let sort_with_geo_point = query + .sort + .as_ref() + .map_or(false, |s| s.iter().any(|s| s.contains("_geoPoint("))); + let sort_criteria_terms = query.sort.as_ref().map_or(0, |s| s.len()); + + // since there is quite a bit of computation made on the filter we are going to do that in the async task + let filter = query.filter.clone(); + let queried = query.q.is_some(); + let nb_terms = query.q.as_ref().map_or(0, |s| s.split_whitespace().count()); + + let max_limit = query.limit; + let max_offset = query.offset.unwrap_or_default(); + + // to avoid blocking the search we are going to do the heavier computation and take the + // batcher's mutex in an async task + tokio::spawn(async move { + const RE: Lazy = Lazy::new(|| Regex::new("AND | OR").unwrap()); + + let filtered = filter.is_some() as usize; + let syntax = match filter.as_ref() { + Some(Value::String(_)) => "string".to_string(), + Some(Value::Array(values)) => { + if values + .iter() + .map(|v| v.to_string()) + .any(|s| RE.is_match(&s)) + { + "mixed".to_string() + } else { + "array".to_string() + } + } + _ => "none".to_string(), + }; + let stringified_filters = filter.map_or(String::new(), |v| v.to_string()); + let filter_with_geo_radius = stringified_filters.contains("_geoRadius("); + let filter_number_of_criteria = RE.split(&stringified_filters).count(); + + let mut search_batcher = batcher.lock().await; + user_agent.into_iter().for_each(|ua| { + search_batcher.user_agents.insert(ua); + }); + search_batcher.total_received += 1; + + // sort + search_batcher.sort_with_geo_point |= sort_with_geo_point; + search_batcher.sort_sum_of_criteria_terms += sort_criteria_terms; + search_batcher.sort_total_number_of_criteria += sorted; + + // filter + search_batcher.filter_with_geo_radius |= filter_with_geo_radius; + search_batcher.filter_sum_of_criteria_terms += filter_number_of_criteria; + search_batcher.filter_total_number_of_criteria += filtered as usize; + *search_batcher.used_syntax.entry(syntax).or_insert(0) += 1; + + // q + search_batcher.sum_of_terms_count += nb_terms; + search_batcher.total_number_of_q += queried as usize; + + // pagination + search_batcher.max_limit = search_batcher.max_limit.max(max_limit); + search_batcher.max_offset = search_batcher.max_offset.max(max_offset); + }); + } + + fn batch_documents( + &'static self, + batcher: &'static Mutex, + documents_query: &UpdateDocumentsQuery, + index_creation: bool, + request: &HttpRequest, + ) { + let user_agents = extract_user_agents(request); + let primary_key = documents_query.primary_key.clone(); + let content_type = request + .headers() + .get(CONTENT_TYPE) + .map(|s| s.to_str().unwrap_or("unkown")) + .unwrap() + .to_string(); + + tokio::spawn(async move { + let mut lock = batcher.lock().await; + for user_agent in user_agents { + lock.user_agents.insert(user_agent); + } + lock.content_types.insert(content_type); + if let Some(primary_key) = primary_key { + lock.primary_keys.insert(primary_key); + } + lock.index_creation |= index_creation; + lock.updated = true; + // drop the lock here + }); + } +} + +impl super::Analytics for SegmentAnalytics { + fn publish(&'static self, event_name: String, send: Value, request: Option<&HttpRequest>) { + let content_type = request + .map(|req| req.headers().get(USER_AGENT)) + .flatten() + .map(|header| header.to_str().unwrap_or("unknown").to_string()); + + tokio::spawn(async move { + let _ = self + .batcher + .lock() + .await + .push(Track { + user: self.user.clone(), + event: event_name.clone(), + context: content_type.map(|user_agent| json!({ "user-agent": user_agent.split(";").map(str::trim).collect::>() })), + properties: send, + ..Default::default() + }) + .await; + }); + } + + fn start_get_search(&'static self, query: &SearchQuery, request: &HttpRequest) { + self.start_search(&self.get_search_batcher, query, request) + } + + fn end_get_search(&'static self, process_time: usize) { + tokio::spawn(async move { + let mut search_batcher = self.get_search_batcher.lock().await; + search_batcher.total_succeeded += 1; + search_batcher.time_spent.push(process_time); + }); + } + + fn start_post_search(&'static self, query: &SearchQuery, request: &HttpRequest) { + self.start_search(&self.post_search_batcher, query, request) + } + + fn end_post_search(&'static self, process_time: usize) { + tokio::spawn(async move { + let mut search_batcher = self.post_search_batcher.lock().await; + search_batcher.total_succeeded += 1; + search_batcher.time_spent.push(process_time); + }); + } + + fn add_documents( + &'static self, + documents_query: &UpdateDocumentsQuery, + index_creation: bool, + request: &HttpRequest, + ) { + self.batch_documents( + &self.add_documents_batcher, + documents_query, + index_creation, + request, + ) + } + + fn update_documents( + &'static self, + documents_query: &UpdateDocumentsQuery, + index_creation: bool, + request: &HttpRequest, + ) { + self.batch_documents( + &self.update_documents_batcher, + documents_query, + index_creation, + request, + ) + } +} + +impl Display for SegmentAnalytics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.user) + } +} + +#[derive(Default)] +pub struct SearchBatcher { + // context + user_agents: HashSet, + + // requests + total_received: usize, + total_succeeded: usize, + time_spent: Vec, + + // sort + sort_with_geo_point: bool, + // everytime a request has a filter, this field must be incremented by the number of terms it contains + sort_sum_of_criteria_terms: usize, + // everytime a request has a filter, this field must be incremented by one + sort_total_number_of_criteria: usize, + + // filter + filter_with_geo_radius: bool, + // everytime a request has a filter, this field must be incremented by the number of terms it contains + filter_sum_of_criteria_terms: usize, + // everytime a request has a filter, this field must be incremented by one + filter_total_number_of_criteria: usize, + used_syntax: HashMap, + + // q + // everytime a request has a q field, this field must be incremented by the number of terms + sum_of_terms_count: usize, + // everytime a request has a q field, this field must be incremented by one + total_number_of_q: usize, + + // pagination + max_limit: usize, + max_offset: usize, +} + +impl SearchBatcher { + pub fn into_event(mut self, user: &User, event_name: &str) -> Option { + if self.total_received == 0 { + None + } else { + let context = Some(json!({ "user-agent": self.user_agents})); + let percentile_99th = 0.99 * (self.total_succeeded as f64 - 1.) + 1.; + self.time_spent.drain(percentile_99th as usize..); + + let properties = json!({ + "requests": { + "99th_response_time": format!("{:.2}", self.time_spent.iter().sum::() as f64 / self.time_spent.len() as f64), + "total_succeeded": self.total_succeeded, + "total_failed": self.total_received.saturating_sub(self.total_succeeded), // just to be sure we never panics + "total_received": self.total_received, + }, + "sort": { + "with_geoPoint": self.sort_with_geo_point, + "avg_criteria_number": format!("{:.2}", self.sort_sum_of_criteria_terms as f64 / self.sort_total_number_of_criteria as f64), + }, + "filter": { + "with_geoRadius": self.filter_with_geo_radius, + "avg_criteria_number": format!("{:.2}", self.filter_sum_of_criteria_terms as f64 / self.filter_total_number_of_criteria as f64), + "most_used_syntax": self.used_syntax.iter().max_by_key(|(_, v)| *v).map(|(k, _)| json!(k)).unwrap_or_else(|| json!(null)), + }, + "q": { + "avg_terms_number": format!("{:.2}", self.sum_of_terms_count as f64 / self.total_number_of_q as f64), + }, + "pagination": { + "max_limit": self.max_limit, + "max_offset": self.max_offset, + }, + }); + + Some(Track { + user: user.clone(), + event: event_name.to_string(), + context, + properties, + ..Default::default() + }) + } + } +} + +#[derive(Default)] +pub struct DocumentsBatcher { + // set to true when at least one request was received + updated: bool, + + // context + user_agents: HashSet, + + content_types: HashSet, + primary_keys: HashSet, + index_creation: bool, +} + +impl DocumentsBatcher { + pub fn into_event(self, user: &User, event_name: &str) -> Option { + if !self.updated { + None + } else { + let context = Some(json!({ "user-agent": self.user_agents})); + + let properties = json!({ + "payload_type": self.content_types, + "primary_key": self.primary_keys, + "index_creation": self.index_creation, + }); + + Some(Track { + user: user.clone(), + event: event_name.to_string(), + context, + properties, + ..Default::default() + }) + } + } +}