diff --git a/meilisearch-http/src/analytics.rs b/meilisearch-http/src/analytics.rs index e200141a0..f467c800c 100644 --- a/meilisearch-http/src/analytics.rs +++ b/meilisearch-http/src/analytics.rs @@ -1,18 +1,20 @@ use actix_web::HttpRequest; use meilisearch_lib::index::SearchQuery; use serde_json::Value; -use std::collections::{HashMap, HashSet}; use std::fmt::Display; use std::fs::read_to_string; +use crate::routes::indexes::documents::UpdateDocumentsQuery; use crate::Opt; // if we are in release mode and the feature analytics was enabled #[cfg(all(not(debug_assertions), feature = "analytics"))] mod segment { use crate::analytics::Analytics; + use crate::routes::indexes::documents::UpdateDocumentsQuery; use actix_web::http::header::USER_AGENT; use actix_web::HttpRequest; + use http::header::CONTENT_TYPE; use meilisearch_lib::index::SearchQuery; use meilisearch_lib::index_controller::Stats; use meilisearch_lib::MeiliSearch; @@ -38,6 +40,8 @@ mod segment { batcher: Mutex, post_search_batcher: Mutex, get_search_batcher: Mutex, + documents_added_batcher: Mutex, + documents_updated_batcher: Mutex, } impl SegmentAnalytics { @@ -110,6 +114,8 @@ mod segment { batcher, post_search_batcher: Mutex::new(SearchBatcher::default()), get_search_batcher: Mutex::new(SearchBatcher::default()), + documents_added_batcher: Mutex::new(DocumentsBatcher::default()), + documents_updated_batcher: Mutex::new(DocumentsBatcher::default()), }); let segment = Box::leak(segment); @@ -158,6 +164,17 @@ mod segment { post_search .into_event(self.user.clone(), "Document Searched POST".to_string()) }); + let add_documents = + std::mem::take(&mut *self.documents_added_batcher.lock().await); + let add_documents = (add_documents.updated).then(|| { + add_documents.into_event(self.user.clone(), "Documents Added".to_string()) + }); + let update_documents = + std::mem::take(&mut *self.documents_updated_batcher.lock().await); + let update_documents = (update_documents.updated).then(|| { + update_documents + .into_event(self.user.clone(), "Documents Updated".to_string()) + }); // keep the lock on the batcher just for these three operations { println!("ANALYTICS: taking the lock on the batcher"); @@ -168,6 +185,12 @@ mod segment { if let Some(post_search) = post_search { let _ = batcher.push(post_search).await; } + if let Some(add_documents) = add_documents { + let _ = batcher.push(add_documents).await; + } + if let Some(update_documents) = update_documents { + let _ = batcher.push(update_documents).await; + } println!("ANALYTICS: Sending the batch"); let _ = batcher.flush().await; } @@ -199,8 +222,8 @@ mod segment { let max_limit = query.limit; let max_offset = query.offset.unwrap_or_default(); - // to avoid blocking the search we are going to do the heavier computation in an async task - // and take the mutex in the same task + // to avoid blocking the search we are going to do the heavier computation and take the + // batcher's mutex in an async task tokio::spawn(async move { let filtered = filter.is_some() as usize; let syntax = match filter.as_ref() { @@ -313,6 +336,70 @@ mod segment { search_batcher.time_spent.push(process_time); }); } + + fn add_documents( + &'static self, + documents_query: &UpdateDocumentsQuery, + index_creation: bool, + request: &HttpRequest, + ) { + let user_agents = request + .headers() + .get(USER_AGENT) + .map(|header| header.to_str().unwrap_or("unknown").to_string()); + let primary_key = documents_query.primary_key.clone(); + let content_type = request + .headers() + .get(CONTENT_TYPE) + .map(|s| s.to_str().unwrap_or("unkown")) + .unwrap() + .to_string(); + + tokio::spawn(async move { + let mut lock = self.documents_added_batcher.lock().await; + for user_agent in user_agents { + lock.user_agents.insert(user_agent); + } + lock.content_types.insert(content_type); + if let Some(primary_key) = primary_key { + lock.primary_keys.insert(primary_key); + } + lock.index_creation |= index_creation; + // drop the lock here + }); + } + + fn update_documents( + &'static self, + documents_query: &UpdateDocumentsQuery, + index_creation: bool, + request: &HttpRequest, + ) { + let user_agents = request + .headers() + .get(USER_AGENT) + .map(|header| header.to_str().unwrap_or("unknown").to_string()); + let primary_key = documents_query.primary_key.clone(); + let content_type = request + .headers() + .get(CONTENT_TYPE) + .map(|s| s.to_str().unwrap_or("unkown")) + .unwrap() + .to_string(); + + tokio::spawn(async move { + let mut lock = self.documents_updated_batcher.lock().await; + for user_agent in user_agents { + lock.user_agents.insert(user_agent); + } + lock.content_types.insert(content_type); + if let Some(primary_key) = primary_key { + lock.primary_keys.insert(primary_key); + } + lock.index_creation |= index_creation; + // drop the lock here + }); + } } impl Display for SegmentAnalytics { @@ -410,6 +497,39 @@ mod segment { } } } + + #[derive(Default)] + pub struct DocumentsBatcher { + // set to true when at least one request was received + updated: bool, + + // context + user_agents: HashSet, + + content_types: HashSet, + primary_keys: HashSet, + index_creation: bool, + } + + impl DocumentsBatcher { + pub fn into_event(mut self, user: User, event_name: String) -> Track { + let context = Some(json!({ "user-agent": self.user_agents})); + + let properties = json!({ + "payload_type": self.content_types, + "primary_key": self.primary_keys, + "index_creation": self.index_creation, + }); + + Track { + user, + event: event_name, + context, + properties, + ..Default::default() + } + } + } } // if we are in debug mode OR the analytics feature is disabled @@ -440,6 +560,20 @@ impl Analytics for MockAnalytics { fn end_get_search(&'static self, _process_time: usize) {} fn start_post_search(&'static self, _query: &SearchQuery, _request: &HttpRequest) {} fn end_post_search(&'static self, _process_time: usize) {} + fn add_documents( + &'static self, + _documents_query: &UpdateDocumentsQuery, + _index_creation: bool, + _request: &HttpRequest, + ) { + } + fn update_documents( + &'static self, + _documents_query: &UpdateDocumentsQuery, + _index_creation: bool, + _request: &HttpRequest, + ) { + } } impl Display for MockAnalytics { @@ -462,4 +596,19 @@ pub trait Analytics: Display + Sync + Send { fn start_post_search(&'static self, query: &SearchQuery, request: &HttpRequest); /// This method should be called once a post search request has succeeded fn end_post_search(&'static self, process_time: usize); + + // this method should be called to batch a add documents request + fn add_documents( + &'static self, + documents_query: &UpdateDocumentsQuery, + index_creation: bool, + request: &HttpRequest, + ); + // this method should be called to batch a update documents request + fn update_documents( + &'static self, + documents_query: &UpdateDocumentsQuery, + index_creation: bool, + request: &HttpRequest, + ); } diff --git a/meilisearch-http/src/routes/indexes/documents.rs b/meilisearch-http/src/routes/indexes/documents.rs index 8d3630713..c6b220b41 100644 --- a/meilisearch-http/src/routes/indexes/documents.rs +++ b/meilisearch-http/src/routes/indexes/documents.rs @@ -123,7 +123,7 @@ pub async fn get_all_documents( #[derive(Deserialize, Debug)] #[serde(rename_all = "camelCase", deny_unknown_fields)] pub struct UpdateDocumentsQuery { - primary_key: Option, + pub primary_key: Option, } pub async fn add_documents( diff --git a/meilisearch-http/src/routes/indexes/search.rs b/meilisearch-http/src/routes/indexes/search.rs index e3e47886b..d5d2b9540 100644 --- a/meilisearch-http/src/routes/indexes/search.rs +++ b/meilisearch-http/src/routes/indexes/search.rs @@ -3,7 +3,7 @@ use log::debug; use meilisearch_lib::index::{default_crop_length, SearchQuery, DEFAULT_SEARCH_LIMIT}; use meilisearch_lib::MeiliSearch; use serde::Deserialize; -use serde_json::{json, Value}; +use serde_json::Value; use crate::analytics::Analytics; use crate::error::ResponseError; diff --git a/meilisearch-http/src/routes/mod.rs b/meilisearch-http/src/routes/mod.rs index 092bd1186..2c27b7b45 100644 --- a/meilisearch-http/src/routes/mod.rs +++ b/meilisearch-http/src/routes/mod.rs @@ -14,7 +14,7 @@ use crate::extractors::authentication::{policies::*, GuardedData}; use crate::ApiKeys; mod dump; -mod indexes; +pub mod indexes; pub fn configure(cfg: &mut web::ServiceConfig) { cfg.service(web::resource("/health").route(web::get().to(get_health)))