From e97600eead680d576c0b247b59ef7cec10c440c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 8 Jul 2024 16:30:50 +0200 Subject: [PATCH] Improve the analytics for the document edition by function --- meilisearch/src/analytics/mock_analytics.rs | 9 +- meilisearch/src/analytics/mod.rs | 12 ++- .../src/analytics/segment_analytics.rs | 92 ++++++++++++++++++- meilisearch/src/routes/indexes/documents.rs | 21 +++-- 4 files changed, 122 insertions(+), 12 deletions(-) diff --git a/meilisearch/src/analytics/mock_analytics.rs b/meilisearch/src/analytics/mock_analytics.rs index 8f2fe0333..e212775a2 100644 --- a/meilisearch/src/analytics/mock_analytics.rs +++ b/meilisearch/src/analytics/mock_analytics.rs @@ -6,7 +6,7 @@ use meilisearch_types::InstanceUid; use serde_json::Value; use super::{find_user_id, Analytics, DocumentDeletionKind, DocumentFetchKind}; -use crate::routes::indexes::documents::UpdateDocumentsQuery; +use crate::routes::indexes::documents::{DocumentEditionByFunction, UpdateDocumentsQuery}; use crate::Opt; pub struct MockAnalytics { @@ -97,6 +97,13 @@ impl Analytics for MockAnalytics { _request: &HttpRequest, ) { } + fn update_documents_by_function( + &self, + _documents_query: &DocumentEditionByFunction, + _index_creation: bool, + _request: &HttpRequest, + ) { + } fn get_fetch_documents(&self, _documents_query: &DocumentFetchKind, _request: &HttpRequest) {} fn post_fetch_documents(&self, _documents_query: &DocumentFetchKind, _request: &HttpRequest) {} } diff --git a/meilisearch/src/analytics/mod.rs b/meilisearch/src/analytics/mod.rs index 6863dc57b..9140e3492 100644 --- a/meilisearch/src/analytics/mod.rs +++ b/meilisearch/src/analytics/mod.rs @@ -13,7 +13,7 @@ use once_cell::sync::Lazy; use platform_dirs::AppDirs; use serde_json::Value; -use crate::routes::indexes::documents::UpdateDocumentsQuery; +use crate::routes::indexes::documents::{DocumentEditionByFunction, UpdateDocumentsQuery}; // if the analytics feature is disabled // the `SegmentAnalytics` point to the mock instead of the real analytics @@ -119,11 +119,19 @@ pub trait Analytics: Sync + Send { // this method should be called to aggregate a add documents request fn delete_documents(&self, kind: DocumentDeletionKind, request: &HttpRequest); - // this method should be called to batch a update documents request + // this method should be called to batch an update documents request fn update_documents( &self, documents_query: &UpdateDocumentsQuery, index_creation: bool, request: &HttpRequest, ); + + // this method should be called to batch an update documents by function request + fn update_documents_by_function( + &self, + documents_query: &DocumentEditionByFunction, + index_creation: bool, + request: &HttpRequest, + ); } diff --git a/meilisearch/src/analytics/segment_analytics.rs b/meilisearch/src/analytics/segment_analytics.rs index 67f437082..405baa057 100644 --- a/meilisearch/src/analytics/segment_analytics.rs +++ b/meilisearch/src/analytics/segment_analytics.rs @@ -30,7 +30,7 @@ use crate::analytics::Analytics; use crate::option::{ default_http_addr, IndexerOpts, LogMode, MaxMemory, MaxThreads, ScheduleSnapshot, }; -use crate::routes::indexes::documents::UpdateDocumentsQuery; +use crate::routes::indexes::documents::{DocumentEditionByFunction, UpdateDocumentsQuery}; use crate::routes::indexes::facet_search::FacetSearchQuery; use crate::routes::{create_all_stats, Stats}; use crate::search::{ @@ -80,6 +80,7 @@ pub enum AnalyticsMsg { AggregateAddDocuments(DocumentsAggregator), AggregateDeleteDocuments(DocumentsDeletionAggregator), AggregateUpdateDocuments(DocumentsAggregator), + AggregateEditDocumentsByFunction(EditDocumentsByFunctionAggregator), AggregateGetFetchDocuments(DocumentsFetchAggregator), AggregatePostFetchDocuments(DocumentsFetchAggregator), } @@ -149,6 +150,7 @@ impl SegmentAnalytics { add_documents_aggregator: DocumentsAggregator::default(), delete_documents_aggregator: DocumentsDeletionAggregator::default(), update_documents_aggregator: DocumentsAggregator::default(), + edit_documents_by_function_aggregator: EditDocumentsByFunctionAggregator::default(), get_fetch_documents_aggregator: DocumentsFetchAggregator::default(), post_fetch_documents_aggregator: DocumentsFetchAggregator::default(), get_similar_aggregator: SimilarAggregator::default(), @@ -229,6 +231,17 @@ impl super::Analytics for SegmentAnalytics { let _ = self.sender.try_send(AnalyticsMsg::AggregateUpdateDocuments(aggregate)); } + fn update_documents_by_function( + &self, + documents_query: &DocumentEditionByFunction, + index_creation: bool, + request: &HttpRequest, + ) { + let aggregate = + EditDocumentsByFunctionAggregator::from_query(documents_query, index_creation, request); + let _ = self.sender.try_send(AnalyticsMsg::AggregateEditDocumentsByFunction(aggregate)); + } + fn get_fetch_documents(&self, documents_query: &DocumentFetchKind, request: &HttpRequest) { let aggregate = DocumentsFetchAggregator::from_query(documents_query, request); let _ = self.sender.try_send(AnalyticsMsg::AggregateGetFetchDocuments(aggregate)); @@ -389,6 +402,7 @@ pub struct Segment { add_documents_aggregator: DocumentsAggregator, delete_documents_aggregator: DocumentsDeletionAggregator, update_documents_aggregator: DocumentsAggregator, + edit_documents_by_function_aggregator: EditDocumentsByFunctionAggregator, get_fetch_documents_aggregator: DocumentsFetchAggregator, post_fetch_documents_aggregator: DocumentsFetchAggregator, get_similar_aggregator: SimilarAggregator, @@ -453,6 +467,7 @@ impl Segment { Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateDeleteDocuments(agreg)) => self.delete_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg), + Some(AnalyticsMsg::AggregateEditDocumentsByFunction(agreg)) => self.edit_documents_by_function_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateGetFetchDocuments(agreg)) => self.get_fetch_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregatePostFetchDocuments(agreg)) => self.post_fetch_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateGetSimilar(agreg)) => self.get_similar_aggregator.aggregate(agreg), @@ -508,6 +523,7 @@ impl Segment { add_documents_aggregator, delete_documents_aggregator, update_documents_aggregator, + edit_documents_by_function_aggregator, get_fetch_documents_aggregator, post_fetch_documents_aggregator, get_similar_aggregator, @@ -549,6 +565,11 @@ impl Segment { { let _ = self.batcher.push(update_documents).await; } + if let Some(edit_documents_by_function) = take(edit_documents_by_function_aggregator) + .into_event(user, "Documents Edited By Function") + { + let _ = self.batcher.push(edit_documents_by_function).await; + } if let Some(get_fetch_documents) = take(get_fetch_documents_aggregator).into_event(user, "Documents Fetched GET") { @@ -1465,6 +1486,75 @@ impl DocumentsAggregator { } } +#[derive(Default)] +pub struct EditDocumentsByFunctionAggregator { + timestamp: Option, + + // Set to true if at least one request was filtered + filtered: bool, + // Set to true if at least one request contained a context + with_context: bool, + + // context + user_agents: HashSet, + + index_creation: bool, +} + +impl EditDocumentsByFunctionAggregator { + pub fn from_query( + documents_query: &DocumentEditionByFunction, + index_creation: bool, + request: &HttpRequest, + ) -> Self { + let DocumentEditionByFunction { filter, context, function: _ } = documents_query; + + Self { + timestamp: Some(OffsetDateTime::now_utc()), + user_agents: extract_user_agents(request).into_iter().collect(), + filtered: filter.is_some(), + with_context: context.is_some(), + index_creation, + } + } + + /// Aggregate one [DocumentsAggregator] into another. + pub fn aggregate(&mut self, other: Self) { + let Self { timestamp, user_agents, index_creation, filtered, with_context } = other; + + if self.timestamp.is_none() { + self.timestamp = timestamp; + } + + // we can't create a union because there is no `into_union` method + for user_agent in user_agents { + self.user_agents.insert(user_agent); + } + self.index_creation |= index_creation; + self.filtered |= filtered; + self.with_context |= with_context; + } + + pub fn into_event(self, user: &User, event_name: &str) -> Option { + let Self { timestamp, user_agents, index_creation, filtered, with_context } = self; + + let properties = json!({ + "user-agent": user_agents, + "filtered": filtered, + "with_context": with_context, + "index_creation": index_creation, + }); + + Some(Track { + timestamp, + user: user.clone(), + event: event_name.to_string(), + properties, + ..Default::default() + }) + } +} + #[derive(Default, Serialize)] pub struct DocumentsDeletionAggregator { #[serde(skip)] diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index cf9158743..3b1dcba22 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -579,28 +579,33 @@ pub async fn delete_documents_by_filter( #[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)] pub struct DocumentEditionByFunction { #[deserr(default, error = DeserrJsonError)] - filter: Option, + pub filter: Option, #[deserr(default, error = DeserrJsonError)] - context: Option, + pub context: Option, #[deserr(error = DeserrJsonError, missing_field_error = DeserrJsonError::missing_document_edition_function)] - function: String, + pub function: String, } pub async fn edit_documents_by_function( index_scheduler: GuardedData, Data>, index_uid: web::Path, - body: AwebJson, + params: AwebJson, req: HttpRequest, opt: web::Data, - _analytics: web::Data, + analytics: web::Data, ) -> Result { - debug!(parameters = ?body, "Edit documents by function"); + debug!(parameters = ?params, "Edit documents by function"); let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = index_uid.into_inner(); - let DocumentEditionByFunction { filter, context, function } = body.into_inner(); + let params = params.into_inner(); - // analytics.delete_documents(DocumentDeletionKind::PerFilter, &req); + analytics.update_documents_by_function( + ¶ms, + index_scheduler.index(&index_uid).is_err(), + &req, + ); + let DocumentEditionByFunction { filter, context, function } = params; let engine = milli::rhai::Engine::new(); if let Err(e) = engine.compile(&function) { return Err(ResponseError::from_msg(e.to_string(), Code::BadRequest));