Improve the analytics for the document edition by function

This commit is contained in:
Clément Renault 2024-07-08 16:30:50 +02:00
parent 767553519d
commit e97600eead
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
4 changed files with 122 additions and 12 deletions

View File

@ -6,7 +6,7 @@ use meilisearch_types::InstanceUid;
use serde_json::Value; use serde_json::Value;
use super::{find_user_id, Analytics, DocumentDeletionKind, DocumentFetchKind}; use super::{find_user_id, Analytics, DocumentDeletionKind, DocumentFetchKind};
use crate::routes::indexes::documents::UpdateDocumentsQuery; use crate::routes::indexes::documents::{DocumentEditionByFunction, UpdateDocumentsQuery};
use crate::Opt; use crate::Opt;
pub struct MockAnalytics { pub struct MockAnalytics {
@ -97,6 +97,13 @@ impl Analytics for MockAnalytics {
_request: &HttpRequest, _request: &HttpRequest,
) { ) {
} }
fn update_documents_by_function(
&self,
_documents_query: &DocumentEditionByFunction,
_index_creation: bool,
_request: &HttpRequest,
) {
}
fn get_fetch_documents(&self, _documents_query: &DocumentFetchKind, _request: &HttpRequest) {} fn get_fetch_documents(&self, _documents_query: &DocumentFetchKind, _request: &HttpRequest) {}
fn post_fetch_documents(&self, _documents_query: &DocumentFetchKind, _request: &HttpRequest) {} fn post_fetch_documents(&self, _documents_query: &DocumentFetchKind, _request: &HttpRequest) {}
} }

View File

@ -13,7 +13,7 @@ use once_cell::sync::Lazy;
use platform_dirs::AppDirs; use platform_dirs::AppDirs;
use serde_json::Value; use serde_json::Value;
use crate::routes::indexes::documents::UpdateDocumentsQuery; use crate::routes::indexes::documents::{DocumentEditionByFunction, UpdateDocumentsQuery};
// if the analytics feature is disabled // if the analytics feature is disabled
// the `SegmentAnalytics` point to the mock instead of the real analytics // the `SegmentAnalytics` point to the mock instead of the real analytics
@ -119,11 +119,19 @@ pub trait Analytics: Sync + Send {
// this method should be called to aggregate a add documents request // this method should be called to aggregate a add documents request
fn delete_documents(&self, kind: DocumentDeletionKind, request: &HttpRequest); fn delete_documents(&self, kind: DocumentDeletionKind, request: &HttpRequest);
// this method should be called to batch a update documents request // this method should be called to batch an update documents request
fn update_documents( fn update_documents(
&self, &self,
documents_query: &UpdateDocumentsQuery, documents_query: &UpdateDocumentsQuery,
index_creation: bool, index_creation: bool,
request: &HttpRequest, request: &HttpRequest,
); );
// this method should be called to batch an update documents by function request
fn update_documents_by_function(
&self,
documents_query: &DocumentEditionByFunction,
index_creation: bool,
request: &HttpRequest,
);
} }

View File

@ -30,7 +30,7 @@ use crate::analytics::Analytics;
use crate::option::{ use crate::option::{
default_http_addr, IndexerOpts, LogMode, MaxMemory, MaxThreads, ScheduleSnapshot, default_http_addr, IndexerOpts, LogMode, MaxMemory, MaxThreads, ScheduleSnapshot,
}; };
use crate::routes::indexes::documents::UpdateDocumentsQuery; use crate::routes::indexes::documents::{DocumentEditionByFunction, UpdateDocumentsQuery};
use crate::routes::indexes::facet_search::FacetSearchQuery; use crate::routes::indexes::facet_search::FacetSearchQuery;
use crate::routes::{create_all_stats, Stats}; use crate::routes::{create_all_stats, Stats};
use crate::search::{ use crate::search::{
@ -80,6 +80,7 @@ pub enum AnalyticsMsg {
AggregateAddDocuments(DocumentsAggregator), AggregateAddDocuments(DocumentsAggregator),
AggregateDeleteDocuments(DocumentsDeletionAggregator), AggregateDeleteDocuments(DocumentsDeletionAggregator),
AggregateUpdateDocuments(DocumentsAggregator), AggregateUpdateDocuments(DocumentsAggregator),
AggregateEditDocumentsByFunction(EditDocumentsByFunctionAggregator),
AggregateGetFetchDocuments(DocumentsFetchAggregator), AggregateGetFetchDocuments(DocumentsFetchAggregator),
AggregatePostFetchDocuments(DocumentsFetchAggregator), AggregatePostFetchDocuments(DocumentsFetchAggregator),
} }
@ -149,6 +150,7 @@ impl SegmentAnalytics {
add_documents_aggregator: DocumentsAggregator::default(), add_documents_aggregator: DocumentsAggregator::default(),
delete_documents_aggregator: DocumentsDeletionAggregator::default(), delete_documents_aggregator: DocumentsDeletionAggregator::default(),
update_documents_aggregator: DocumentsAggregator::default(), update_documents_aggregator: DocumentsAggregator::default(),
edit_documents_by_function_aggregator: EditDocumentsByFunctionAggregator::default(),
get_fetch_documents_aggregator: DocumentsFetchAggregator::default(), get_fetch_documents_aggregator: DocumentsFetchAggregator::default(),
post_fetch_documents_aggregator: DocumentsFetchAggregator::default(), post_fetch_documents_aggregator: DocumentsFetchAggregator::default(),
get_similar_aggregator: SimilarAggregator::default(), get_similar_aggregator: SimilarAggregator::default(),
@ -229,6 +231,17 @@ impl super::Analytics for SegmentAnalytics {
let _ = self.sender.try_send(AnalyticsMsg::AggregateUpdateDocuments(aggregate)); let _ = self.sender.try_send(AnalyticsMsg::AggregateUpdateDocuments(aggregate));
} }
fn update_documents_by_function(
&self,
documents_query: &DocumentEditionByFunction,
index_creation: bool,
request: &HttpRequest,
) {
let aggregate =
EditDocumentsByFunctionAggregator::from_query(documents_query, index_creation, request);
let _ = self.sender.try_send(AnalyticsMsg::AggregateEditDocumentsByFunction(aggregate));
}
fn get_fetch_documents(&self, documents_query: &DocumentFetchKind, request: &HttpRequest) { fn get_fetch_documents(&self, documents_query: &DocumentFetchKind, request: &HttpRequest) {
let aggregate = DocumentsFetchAggregator::from_query(documents_query, request); let aggregate = DocumentsFetchAggregator::from_query(documents_query, request);
let _ = self.sender.try_send(AnalyticsMsg::AggregateGetFetchDocuments(aggregate)); let _ = self.sender.try_send(AnalyticsMsg::AggregateGetFetchDocuments(aggregate));
@ -389,6 +402,7 @@ pub struct Segment {
add_documents_aggregator: DocumentsAggregator, add_documents_aggregator: DocumentsAggregator,
delete_documents_aggregator: DocumentsDeletionAggregator, delete_documents_aggregator: DocumentsDeletionAggregator,
update_documents_aggregator: DocumentsAggregator, update_documents_aggregator: DocumentsAggregator,
edit_documents_by_function_aggregator: EditDocumentsByFunctionAggregator,
get_fetch_documents_aggregator: DocumentsFetchAggregator, get_fetch_documents_aggregator: DocumentsFetchAggregator,
post_fetch_documents_aggregator: DocumentsFetchAggregator, post_fetch_documents_aggregator: DocumentsFetchAggregator,
get_similar_aggregator: SimilarAggregator, get_similar_aggregator: SimilarAggregator,
@ -453,6 +467,7 @@ impl Segment {
Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateDeleteDocuments(agreg)) => self.delete_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateDeleteDocuments(agreg)) => self.delete_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateEditDocumentsByFunction(agreg)) => self.edit_documents_by_function_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateGetFetchDocuments(agreg)) => self.get_fetch_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateGetFetchDocuments(agreg)) => self.get_fetch_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregatePostFetchDocuments(agreg)) => self.post_fetch_documents_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregatePostFetchDocuments(agreg)) => self.post_fetch_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateGetSimilar(agreg)) => self.get_similar_aggregator.aggregate(agreg), Some(AnalyticsMsg::AggregateGetSimilar(agreg)) => self.get_similar_aggregator.aggregate(agreg),
@ -508,6 +523,7 @@ impl Segment {
add_documents_aggregator, add_documents_aggregator,
delete_documents_aggregator, delete_documents_aggregator,
update_documents_aggregator, update_documents_aggregator,
edit_documents_by_function_aggregator,
get_fetch_documents_aggregator, get_fetch_documents_aggregator,
post_fetch_documents_aggregator, post_fetch_documents_aggregator,
get_similar_aggregator, get_similar_aggregator,
@ -549,6 +565,11 @@ impl Segment {
{ {
let _ = self.batcher.push(update_documents).await; let _ = self.batcher.push(update_documents).await;
} }
if let Some(edit_documents_by_function) = take(edit_documents_by_function_aggregator)
.into_event(user, "Documents Edited By Function")
{
let _ = self.batcher.push(edit_documents_by_function).await;
}
if let Some(get_fetch_documents) = if let Some(get_fetch_documents) =
take(get_fetch_documents_aggregator).into_event(user, "Documents Fetched GET") take(get_fetch_documents_aggregator).into_event(user, "Documents Fetched GET")
{ {
@ -1465,6 +1486,75 @@ impl DocumentsAggregator {
} }
} }
#[derive(Default)]
pub struct EditDocumentsByFunctionAggregator {
timestamp: Option<OffsetDateTime>,
// Set to true if at least one request was filtered
filtered: bool,
// Set to true if at least one request contained a context
with_context: bool,
// context
user_agents: HashSet<String>,
index_creation: bool,
}
impl EditDocumentsByFunctionAggregator {
pub fn from_query(
documents_query: &DocumentEditionByFunction,
index_creation: bool,
request: &HttpRequest,
) -> Self {
let DocumentEditionByFunction { filter, context, function: _ } = documents_query;
Self {
timestamp: Some(OffsetDateTime::now_utc()),
user_agents: extract_user_agents(request).into_iter().collect(),
filtered: filter.is_some(),
with_context: context.is_some(),
index_creation,
}
}
/// Aggregate one [DocumentsAggregator] into another.
pub fn aggregate(&mut self, other: Self) {
let Self { timestamp, user_agents, index_creation, filtered, with_context } = other;
if self.timestamp.is_none() {
self.timestamp = timestamp;
}
// we can't create a union because there is no `into_union` method
for user_agent in user_agents {
self.user_agents.insert(user_agent);
}
self.index_creation |= index_creation;
self.filtered |= filtered;
self.with_context |= with_context;
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
let Self { timestamp, user_agents, index_creation, filtered, with_context } = self;
let properties = json!({
"user-agent": user_agents,
"filtered": filtered,
"with_context": with_context,
"index_creation": index_creation,
});
Some(Track {
timestamp,
user: user.clone(),
event: event_name.to_string(),
properties,
..Default::default()
})
}
}
#[derive(Default, Serialize)] #[derive(Default, Serialize)]
pub struct DocumentsDeletionAggregator { pub struct DocumentsDeletionAggregator {
#[serde(skip)] #[serde(skip)]

View File

@ -579,28 +579,33 @@ pub async fn delete_documents_by_filter(
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)] #[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
pub struct DocumentEditionByFunction { pub struct DocumentEditionByFunction {
#[deserr(default, error = DeserrJsonError<InvalidDocumentFilter>)] #[deserr(default, error = DeserrJsonError<InvalidDocumentFilter>)]
filter: Option<Value>, pub filter: Option<Value>,
#[deserr(default, error = DeserrJsonError<InvalidDocumentEditionContext>)] #[deserr(default, error = DeserrJsonError<InvalidDocumentEditionContext>)]
context: Option<Value>, pub context: Option<Value>,
#[deserr(error = DeserrJsonError<InvalidDocumentEditionFunctionFilter>, missing_field_error = DeserrJsonError::missing_document_edition_function)] #[deserr(error = DeserrJsonError<InvalidDocumentEditionFunctionFilter>, missing_field_error = DeserrJsonError::missing_document_edition_function)]
function: String, pub function: String,
} }
pub async fn edit_documents_by_function( pub async fn edit_documents_by_function(
index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ALL }>, Data<IndexScheduler>>, index_scheduler: GuardedData<ActionPolicy<{ actions::DOCUMENTS_ALL }>, Data<IndexScheduler>>,
index_uid: web::Path<String>, index_uid: web::Path<String>,
body: AwebJson<DocumentEditionByFunction, DeserrJsonError>, params: AwebJson<DocumentEditionByFunction, DeserrJsonError>,
req: HttpRequest, req: HttpRequest,
opt: web::Data<Opt>, opt: web::Data<Opt>,
_analytics: web::Data<dyn Analytics>, analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
debug!(parameters = ?body, "Edit documents by function"); debug!(parameters = ?params, "Edit documents by function");
let index_uid = IndexUid::try_from(index_uid.into_inner())?; let index_uid = IndexUid::try_from(index_uid.into_inner())?;
let index_uid = index_uid.into_inner(); let index_uid = index_uid.into_inner();
let DocumentEditionByFunction { filter, context, function } = body.into_inner(); let params = params.into_inner();
// analytics.delete_documents(DocumentDeletionKind::PerFilter, &req); analytics.update_documents_by_function(
&params,
index_scheduler.index(&index_uid).is_err(),
&req,
);
let DocumentEditionByFunction { filter, context, function } = params;
let engine = milli::rhai::Engine::new(); let engine = milli::rhai::Engine::new();
if let Err(e) = engine.compile(&function) { if let Err(e) = engine.compile(&function) {
return Err(ResponseError::from_msg(e.to_string(), Code::BadRequest)); return Err(ResponseError::from_msg(e.to_string(), Code::BadRequest));