diff --git a/meilisearch/src/analytics/mock_analytics.rs b/meilisearch/src/analytics/mock_analytics.rs index 03aed0189..68c3a7dff 100644 --- a/meilisearch/src/analytics/mock_analytics.rs +++ b/meilisearch/src/analytics/mock_analytics.rs @@ -5,7 +5,7 @@ use actix_web::HttpRequest; use meilisearch_types::InstanceUid; use serde_json::Value; -use super::{find_user_id, Analytics, DocumentDeletionKind}; +use super::{find_user_id, Analytics, DocumentDeletionKind, DocumentFetchKind}; use crate::routes::indexes::documents::UpdateDocumentsQuery; use crate::routes::tasks::TasksFilterQuery; use crate::Opt; @@ -71,6 +71,8 @@ impl Analytics for MockAnalytics { _request: &HttpRequest, ) { } + fn get_fetch_documents(&self, _documents_query: &DocumentFetchKind, _request: &HttpRequest) {} + fn post_fetch_documents(&self, _documents_query: &DocumentFetchKind, _request: &HttpRequest) {} fn get_tasks(&self, _query: &TasksFilterQuery, _request: &HttpRequest) {} fn health_seen(&self, _request: &HttpRequest) {} } diff --git a/meilisearch/src/analytics/mod.rs b/meilisearch/src/analytics/mod.rs index 6223b9db7..c48564dff 100644 --- a/meilisearch/src/analytics/mod.rs +++ b/meilisearch/src/analytics/mod.rs @@ -67,6 +67,12 @@ pub enum DocumentDeletionKind { PerFilter, } +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum DocumentFetchKind { + PerDocumentId, + Normal { with_filter: bool, limit: usize, offset: usize }, +} + pub trait Analytics: Sync + Send { fn instance_uid(&self) -> Option<&InstanceUid>; @@ -90,6 +96,12 @@ pub trait Analytics: Sync + Send { request: &HttpRequest, ); + // this method should be called to aggregate a fetch documents request + fn get_fetch_documents(&self, documents_query: &DocumentFetchKind, request: &HttpRequest); + + // this method should be called to aggregate a fetch documents request + fn post_fetch_documents(&self, documents_query: &DocumentFetchKind, request: &HttpRequest); + // this method should be called to aggregate a add documents request fn delete_documents(&self, kind: DocumentDeletionKind, request: &HttpRequest); diff --git a/meilisearch/src/analytics/segment_analytics.rs b/meilisearch/src/analytics/segment_analytics.rs index 3e40c09e8..d640c4ef0 100644 --- a/meilisearch/src/analytics/segment_analytics.rs +++ b/meilisearch/src/analytics/segment_analytics.rs @@ -23,7 +23,9 @@ use tokio::select; use tokio::sync::mpsc::{self, Receiver, Sender}; use uuid::Uuid; -use super::{config_user_id_path, DocumentDeletionKind, MEILISEARCH_CONFIG_PATH}; +use super::{ + config_user_id_path, DocumentDeletionKind, DocumentFetchKind, MEILISEARCH_CONFIG_PATH, +}; use crate::analytics::Analytics; use crate::option::{default_http_addr, IndexerOpts, MaxMemory, MaxThreads, ScheduleSnapshot}; use crate::routes::indexes::documents::UpdateDocumentsQuery; @@ -72,6 +74,8 @@ pub enum AnalyticsMsg { AggregateAddDocuments(DocumentsAggregator), AggregateDeleteDocuments(DocumentsDeletionAggregator), AggregateUpdateDocuments(DocumentsAggregator), + AggregateGetFetchDocuments(DocumentsFetchAggregator), + AggregatePostFetchDocuments(DocumentsFetchAggregator), AggregateTasks(TasksAggregator), AggregateHealth(HealthAggregator), } @@ -139,6 +143,8 @@ impl SegmentAnalytics { add_documents_aggregator: DocumentsAggregator::default(), delete_documents_aggregator: DocumentsDeletionAggregator::default(), update_documents_aggregator: DocumentsAggregator::default(), + get_fetch_documents_aggregator: DocumentsFetchAggregator::default(), + post_fetch_documents_aggregator: DocumentsFetchAggregator::default(), get_tasks_aggregator: TasksAggregator::default(), health_aggregator: HealthAggregator::default(), }); @@ -205,6 +211,16 @@ impl super::Analytics for SegmentAnalytics { let _ = self.sender.try_send(AnalyticsMsg::AggregateUpdateDocuments(aggregate)); } + fn get_fetch_documents(&self, documents_query: &DocumentFetchKind, request: &HttpRequest) { + let aggregate = DocumentsFetchAggregator::from_query(documents_query, request); + let _ = self.sender.try_send(AnalyticsMsg::AggregateGetFetchDocuments(aggregate)); + } + + fn post_fetch_documents(&self, documents_query: &DocumentFetchKind, request: &HttpRequest) { + let aggregate = DocumentsFetchAggregator::from_query(documents_query, request); + let _ = self.sender.try_send(AnalyticsMsg::AggregatePostFetchDocuments(aggregate)); + } + fn get_tasks(&self, query: &TasksFilterQuery, request: &HttpRequest) { let aggregate = TasksAggregator::from_query(query, request); let _ = self.sender.try_send(AnalyticsMsg::AggregateTasks(aggregate)); @@ -338,6 +354,8 @@ pub struct Segment { add_documents_aggregator: DocumentsAggregator, delete_documents_aggregator: DocumentsDeletionAggregator, update_documents_aggregator: DocumentsAggregator, + get_fetch_documents_aggregator: DocumentsFetchAggregator, + post_fetch_documents_aggregator: DocumentsFetchAggregator, get_tasks_aggregator: TasksAggregator, health_aggregator: HealthAggregator, } @@ -400,6 +418,8 @@ impl Segment { Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateDeleteDocuments(agreg)) => self.delete_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg), + Some(AnalyticsMsg::AggregateGetFetchDocuments(agreg)) => self.get_fetch_documents_aggregator.aggregate(agreg), + Some(AnalyticsMsg::AggregatePostFetchDocuments(agreg)) => self.post_fetch_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateTasks(agreg)) => self.get_tasks_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateHealth(agreg)) => self.health_aggregator.aggregate(agreg), None => (), @@ -450,6 +470,10 @@ impl Segment { .into_event(&self.user, "Documents Deleted"); let update_documents = std::mem::take(&mut self.update_documents_aggregator) .into_event(&self.user, "Documents Updated"); + let get_fetch_documents = std::mem::take(&mut self.get_fetch_documents_aggregator) + .into_event(&self.user, "Documents Fetched GET"); + let post_fetch_documents = std::mem::take(&mut self.post_fetch_documents_aggregator) + .into_event(&self.user, "Documents Fetched POST"); let get_tasks = std::mem::take(&mut self.get_tasks_aggregator).into_event(&self.user, "Tasks Seen"); let health = @@ -473,6 +497,12 @@ impl Segment { if let Some(update_documents) = update_documents { let _ = self.batcher.push(update_documents).await; } + if let Some(get_fetch_documents) = get_fetch_documents { + let _ = self.batcher.push(get_fetch_documents).await; + } + if let Some(post_fetch_documents) = post_fetch_documents { + let _ = self.batcher.push(post_fetch_documents).await; + } if let Some(get_tasks) = get_tasks { let _ = self.batcher.push(get_tasks).await; } @@ -1135,3 +1165,73 @@ impl HealthAggregator { }) } } + +#[derive(Default, Serialize)] +pub struct DocumentsFetchAggregator { + #[serde(skip)] + timestamp: Option, + + // context + #[serde(rename = "user-agent")] + user_agents: HashSet, + + total_received: usize, + + // a call on ../documents/:doc_id + per_document_id: bool, + // if a filter was used + per_filter: bool, + + // pagination + max_limit: usize, + max_offset: usize, +} + +impl DocumentsFetchAggregator { + pub fn from_query(query: &DocumentFetchKind, request: &HttpRequest) -> Self { + let (limit, offset) = match query { + DocumentFetchKind::PerDocumentId => (1, 0), + DocumentFetchKind::Normal { limit, offset, .. } => (*limit, *offset), + }; + Self { + timestamp: Some(OffsetDateTime::now_utc()), + user_agents: extract_user_agents(request).into_iter().collect(), + total_received: 1, + per_document_id: matches!(query, DocumentFetchKind::PerDocumentId), + per_filter: matches!(query, DocumentFetchKind::Normal { with_filter, .. } if *with_filter), + max_limit: limit, + max_offset: offset, + } + } + + /// Aggregate one [DocumentsFetchAggregator] into another. + pub fn aggregate(&mut self, other: Self) { + if self.timestamp.is_none() { + self.timestamp = other.timestamp; + } + for user_agent in other.user_agents { + self.user_agents.insert(user_agent); + } + + self.total_received = self.total_received.saturating_add(other.total_received); + self.per_document_id |= other.per_document_id; + self.per_filter |= other.per_filter; + + self.max_limit = self.max_limit.max(other.max_limit); + self.max_offset |= self.max_offset.max(other.max_offset); + } + + pub fn into_event(self, user: &User, event_name: &str) -> Option { + // if we had no timestamp it means we never encountered any events and + // thus we don't need to send this event. + let timestamp = self.timestamp?; + + Some(Track { + timestamp: Some(timestamp), + user: user.clone(), + event: event_name.to_string(), + properties: serde_json::to_value(self).ok()?, + ..Default::default() + }) + } +} diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index eb0f5a59e..dcc4ed04f 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -29,7 +29,7 @@ use tempfile::tempfile; use tokio::fs::File; use tokio::io::{AsyncSeekExt, AsyncWriteExt, BufWriter}; -use crate::analytics::{Analytics, DocumentDeletionKind}; +use crate::analytics::{Analytics, DocumentDeletionKind, DocumentFetchKind}; use crate::error::MeilisearchHttpError; use crate::error::PayloadError::ReceivePayload; use crate::extractors::authentication::policies::*; @@ -97,10 +97,14 @@ pub async fn get_document( index_scheduler: GuardedData, Data>, document_param: web::Path, params: AwebQueryParameter, + req: HttpRequest, + analytics: web::Data, ) -> Result { let DocumentParam { index_uid, document_id } = document_param.into_inner(); let index_uid = IndexUid::try_from(index_uid)?; + analytics.get_fetch_documents(&DocumentFetchKind::PerDocumentId, &req); + let GetDocument { fields } = params.into_inner(); let attributes_to_retrieve = fields.merge_star_and_none(); @@ -161,16 +165,31 @@ pub async fn documents_by_query_post( index_scheduler: GuardedData, Data>, index_uid: web::Path, body: AwebJson, + req: HttpRequest, + analytics: web::Data, ) -> Result { debug!("called with body: {:?}", body); - documents_by_query(&index_scheduler, index_uid, body.into_inner()) + let body = body.into_inner(); + + analytics.post_fetch_documents( + &DocumentFetchKind::Normal { + with_filter: body.filter.is_some(), + limit: body.limit, + offset: body.offset, + }, + &req, + ); + + documents_by_query(&index_scheduler, index_uid, body) } pub async fn get_documents( index_scheduler: GuardedData, Data>, index_uid: web::Path, params: AwebQueryParameter, + req: HttpRequest, + analytics: web::Data, ) -> Result { debug!("called with params: {:?}", params); @@ -191,6 +210,15 @@ pub async fn get_documents( filter, }; + analytics.get_fetch_documents( + &DocumentFetchKind::Normal { + with_filter: query.filter.is_some(), + limit: query.limit, + offset: query.offset, + }, + &req, + ); + documents_by_query(&index_scheduler, index_uid, query) }