mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-04 20:37:15 +02:00
add analytics on the get documents resource
This commit is contained in:
parent
4a4210c116
commit
d08f8690d2
4 changed files with 146 additions and 4 deletions
|
@ -23,7 +23,9 @@ use tokio::select;
|
|||
use tokio::sync::mpsc::{self, Receiver, Sender};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::{config_user_id_path, DocumentDeletionKind, MEILISEARCH_CONFIG_PATH};
|
||||
use super::{
|
||||
config_user_id_path, DocumentDeletionKind, DocumentFetchKind, MEILISEARCH_CONFIG_PATH,
|
||||
};
|
||||
use crate::analytics::Analytics;
|
||||
use crate::option::{default_http_addr, IndexerOpts, MaxMemory, MaxThreads, ScheduleSnapshot};
|
||||
use crate::routes::indexes::documents::UpdateDocumentsQuery;
|
||||
|
@ -72,6 +74,8 @@ pub enum AnalyticsMsg {
|
|||
AggregateAddDocuments(DocumentsAggregator),
|
||||
AggregateDeleteDocuments(DocumentsDeletionAggregator),
|
||||
AggregateUpdateDocuments(DocumentsAggregator),
|
||||
AggregateGetFetchDocuments(DocumentsFetchAggregator),
|
||||
AggregatePostFetchDocuments(DocumentsFetchAggregator),
|
||||
AggregateTasks(TasksAggregator),
|
||||
AggregateHealth(HealthAggregator),
|
||||
}
|
||||
|
@ -139,6 +143,8 @@ impl SegmentAnalytics {
|
|||
add_documents_aggregator: DocumentsAggregator::default(),
|
||||
delete_documents_aggregator: DocumentsDeletionAggregator::default(),
|
||||
update_documents_aggregator: DocumentsAggregator::default(),
|
||||
get_fetch_documents_aggregator: DocumentsFetchAggregator::default(),
|
||||
post_fetch_documents_aggregator: DocumentsFetchAggregator::default(),
|
||||
get_tasks_aggregator: TasksAggregator::default(),
|
||||
health_aggregator: HealthAggregator::default(),
|
||||
});
|
||||
|
@ -205,6 +211,16 @@ impl super::Analytics for SegmentAnalytics {
|
|||
let _ = self.sender.try_send(AnalyticsMsg::AggregateUpdateDocuments(aggregate));
|
||||
}
|
||||
|
||||
fn get_fetch_documents(&self, documents_query: &DocumentFetchKind, request: &HttpRequest) {
|
||||
let aggregate = DocumentsFetchAggregator::from_query(documents_query, request);
|
||||
let _ = self.sender.try_send(AnalyticsMsg::AggregateGetFetchDocuments(aggregate));
|
||||
}
|
||||
|
||||
fn post_fetch_documents(&self, documents_query: &DocumentFetchKind, request: &HttpRequest) {
|
||||
let aggregate = DocumentsFetchAggregator::from_query(documents_query, request);
|
||||
let _ = self.sender.try_send(AnalyticsMsg::AggregatePostFetchDocuments(aggregate));
|
||||
}
|
||||
|
||||
fn get_tasks(&self, query: &TasksFilterQuery, request: &HttpRequest) {
|
||||
let aggregate = TasksAggregator::from_query(query, request);
|
||||
let _ = self.sender.try_send(AnalyticsMsg::AggregateTasks(aggregate));
|
||||
|
@ -338,6 +354,8 @@ pub struct Segment {
|
|||
add_documents_aggregator: DocumentsAggregator,
|
||||
delete_documents_aggregator: DocumentsDeletionAggregator,
|
||||
update_documents_aggregator: DocumentsAggregator,
|
||||
get_fetch_documents_aggregator: DocumentsFetchAggregator,
|
||||
post_fetch_documents_aggregator: DocumentsFetchAggregator,
|
||||
get_tasks_aggregator: TasksAggregator,
|
||||
health_aggregator: HealthAggregator,
|
||||
}
|
||||
|
@ -400,6 +418,8 @@ impl Segment {
|
|||
Some(AnalyticsMsg::AggregateAddDocuments(agreg)) => self.add_documents_aggregator.aggregate(agreg),
|
||||
Some(AnalyticsMsg::AggregateDeleteDocuments(agreg)) => self.delete_documents_aggregator.aggregate(agreg),
|
||||
Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg),
|
||||
Some(AnalyticsMsg::AggregateGetFetchDocuments(agreg)) => self.get_fetch_documents_aggregator.aggregate(agreg),
|
||||
Some(AnalyticsMsg::AggregatePostFetchDocuments(agreg)) => self.post_fetch_documents_aggregator.aggregate(agreg),
|
||||
Some(AnalyticsMsg::AggregateTasks(agreg)) => self.get_tasks_aggregator.aggregate(agreg),
|
||||
Some(AnalyticsMsg::AggregateHealth(agreg)) => self.health_aggregator.aggregate(agreg),
|
||||
None => (),
|
||||
|
@ -450,6 +470,10 @@ impl Segment {
|
|||
.into_event(&self.user, "Documents Deleted");
|
||||
let update_documents = std::mem::take(&mut self.update_documents_aggregator)
|
||||
.into_event(&self.user, "Documents Updated");
|
||||
let get_fetch_documents = std::mem::take(&mut self.get_fetch_documents_aggregator)
|
||||
.into_event(&self.user, "Documents Fetched GET");
|
||||
let post_fetch_documents = std::mem::take(&mut self.post_fetch_documents_aggregator)
|
||||
.into_event(&self.user, "Documents Fetched POST");
|
||||
let get_tasks =
|
||||
std::mem::take(&mut self.get_tasks_aggregator).into_event(&self.user, "Tasks Seen");
|
||||
let health =
|
||||
|
@ -473,6 +497,12 @@ impl Segment {
|
|||
if let Some(update_documents) = update_documents {
|
||||
let _ = self.batcher.push(update_documents).await;
|
||||
}
|
||||
if let Some(get_fetch_documents) = get_fetch_documents {
|
||||
let _ = self.batcher.push(get_fetch_documents).await;
|
||||
}
|
||||
if let Some(post_fetch_documents) = post_fetch_documents {
|
||||
let _ = self.batcher.push(post_fetch_documents).await;
|
||||
}
|
||||
if let Some(get_tasks) = get_tasks {
|
||||
let _ = self.batcher.push(get_tasks).await;
|
||||
}
|
||||
|
@ -1135,3 +1165,73 @@ impl HealthAggregator {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Serialize)]
|
||||
pub struct DocumentsFetchAggregator {
|
||||
#[serde(skip)]
|
||||
timestamp: Option<OffsetDateTime>,
|
||||
|
||||
// context
|
||||
#[serde(rename = "user-agent")]
|
||||
user_agents: HashSet<String>,
|
||||
|
||||
total_received: usize,
|
||||
|
||||
// a call on ../documents/:doc_id
|
||||
per_document_id: bool,
|
||||
// if a filter was used
|
||||
per_filter: bool,
|
||||
|
||||
// pagination
|
||||
max_limit: usize,
|
||||
max_offset: usize,
|
||||
}
|
||||
|
||||
impl DocumentsFetchAggregator {
|
||||
pub fn from_query(query: &DocumentFetchKind, request: &HttpRequest) -> Self {
|
||||
let (limit, offset) = match query {
|
||||
DocumentFetchKind::PerDocumentId => (1, 0),
|
||||
DocumentFetchKind::Normal { limit, offset, .. } => (*limit, *offset),
|
||||
};
|
||||
Self {
|
||||
timestamp: Some(OffsetDateTime::now_utc()),
|
||||
user_agents: extract_user_agents(request).into_iter().collect(),
|
||||
total_received: 1,
|
||||
per_document_id: matches!(query, DocumentFetchKind::PerDocumentId),
|
||||
per_filter: matches!(query, DocumentFetchKind::Normal { with_filter, .. } if *with_filter),
|
||||
max_limit: limit,
|
||||
max_offset: offset,
|
||||
}
|
||||
}
|
||||
|
||||
/// Aggregate one [DocumentsFetchAggregator] into another.
|
||||
pub fn aggregate(&mut self, other: Self) {
|
||||
if self.timestamp.is_none() {
|
||||
self.timestamp = other.timestamp;
|
||||
}
|
||||
for user_agent in other.user_agents {
|
||||
self.user_agents.insert(user_agent);
|
||||
}
|
||||
|
||||
self.total_received = self.total_received.saturating_add(other.total_received);
|
||||
self.per_document_id |= other.per_document_id;
|
||||
self.per_filter |= other.per_filter;
|
||||
|
||||
self.max_limit = self.max_limit.max(other.max_limit);
|
||||
self.max_offset |= self.max_offset.max(other.max_offset);
|
||||
}
|
||||
|
||||
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
|
||||
// if we had no timestamp it means we never encountered any events and
|
||||
// thus we don't need to send this event.
|
||||
let timestamp = self.timestamp?;
|
||||
|
||||
Some(Track {
|
||||
timestamp: Some(timestamp),
|
||||
user: user.clone(),
|
||||
event: event_name.to_string(),
|
||||
properties: serde_json::to_value(self).ok()?,
|
||||
..Default::default()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue