remove a lot of ununsed code

This commit is contained in:
Tamo 2024-10-17 09:14:34 +02:00
parent ef77c7699b
commit 4ee65d870e
3 changed files with 17 additions and 586 deletions

View File

@ -22,9 +22,7 @@ pub use segment_analytics::SimilarAggregator;
use crate::Opt;
use self::segment_analytics::extract_user_agents;
pub type MultiSearchAggregator = segment_analytics::MultiSearchAggregator;
pub type FacetSearchAggregator = segment_analytics::FacetSearchAggregator;
pub use self::segment_analytics::MultiSearchAggregator;
/// A macro used to quickly define events that don't aggregate or send anything besides an empty event with its name.
#[macro_export]

View File

@ -5,7 +5,7 @@ use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use actix_web::http::header::{CONTENT_TYPE, USER_AGENT};
use actix_web::http::header::USER_AGENT;
use actix_web::HttpRequest;
use byte_unit::Byte;
use index_scheduler::IndexScheduler;
@ -24,21 +24,15 @@ use tokio::select;
use tokio::sync::mpsc::{self, Receiver, Sender};
use uuid::Uuid;
use super::{
config_user_id_path, Aggregate, AggregateMethod, DocumentDeletionKind, DocumentFetchKind,
MEILISEARCH_CONFIG_PATH,
};
use super::{config_user_id_path, Aggregate, AggregateMethod, MEILISEARCH_CONFIG_PATH};
use crate::option::{
default_http_addr, IndexerOpts, LogMode, MaxMemory, MaxThreads, ScheduleSnapshot,
};
use crate::routes::indexes::documents::{DocumentEditionByFunction, UpdateDocumentsQuery};
use crate::routes::indexes::facet_search::FacetSearchQuery;
use crate::routes::{create_all_stats, Stats};
use crate::search::{
FacetSearchResult, FederatedSearch, MatchingStrategy, SearchQuery, SearchQueryWithIndex,
SearchResult, SimilarQuery, SimilarResult, DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER,
DEFAULT_HIGHLIGHT_POST_TAG, DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT,
DEFAULT_SEMANTIC_RATIO,
FederatedSearch, SearchQuery, SearchQueryWithIndex, SearchResult, SimilarQuery, SimilarResult,
DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, DEFAULT_HIGHLIGHT_POST_TAG,
DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, DEFAULT_SEMANTIC_RATIO,
};
use crate::{aggregate_methods, Opt};
@ -75,6 +69,7 @@ pub struct Message {
// Thus we have to send it in the message directly.
type_id: TypeId,
// Same for the aggregate function.
#[allow(clippy::type_complexity)]
aggregator_function: fn(Box<dyn Aggregate>, Box<dyn Aggregate>) -> Option<Box<dyn Aggregate>>,
event: Event,
}
@ -169,97 +164,6 @@ impl SegmentAnalytics {
}
}
/*
impl super::Analytics for SegmentAnalytics {
fn instance_uid(&self) -> Option<&InstanceUid> {
Some(&self.instance_uid)
}
fn publish(&self, event_name: String, mut send: Value, request: Option<&HttpRequest>) {
let user_agent = request.map(extract_user_agents);
send["user-agent"] = json!(user_agent);
let event = Track {
user: self.user.clone(),
event: event_name.clone(),
properties: send,
..Default::default()
};
let _ = self.sender.try_send(AnalyticsMsg::BatchMessage(event));
}
fn get_search(&self, aggregate: SearchAggregator) {
let _ = self.sender.try_send(AnalyticsMsg::AggregateGetSearch(aggregate));
}
fn post_search(&self, aggregate: SearchAggregator) {
let _ = self.sender.try_send(AnalyticsMsg::AggregatePostSearch(aggregate));
}
fn get_similar(&self, aggregate: SimilarAggregator) {
let _ = self.sender.try_send(AnalyticsMsg::AggregateGetSimilar(aggregate));
}
fn post_similar(&self, aggregate: SimilarAggregator) {
let _ = self.sender.try_send(AnalyticsMsg::AggregatePostSimilar(aggregate));
}
fn post_facet_search(&self, aggregate: FacetSearchAggregator) {
let _ = self.sender.try_send(AnalyticsMsg::AggregatePostFacetSearch(aggregate));
}
fn post_multi_search(&self, aggregate: MultiSearchAggregator) {
let _ = self.sender.try_send(AnalyticsMsg::AggregatePostMultiSearch(aggregate));
}
fn add_documents(
&self,
documents_query: &UpdateDocumentsQuery,
index_creation: bool,
request: &HttpRequest,
) {
let aggregate = DocumentsAggregator::from_query(documents_query, index_creation, request);
let _ = self.sender.try_send(AnalyticsMsg::AggregateAddDocuments(aggregate));
}
fn delete_documents(&self, kind: DocumentDeletionKind, request: &HttpRequest) {
let aggregate = DocumentsDeletionAggregator::from_query(kind, request);
let _ = self.sender.try_send(AnalyticsMsg::AggregateDeleteDocuments(aggregate));
}
fn update_documents(
&self,
documents_query: &UpdateDocumentsQuery,
index_creation: bool,
request: &HttpRequest,
) {
let aggregate = DocumentsAggregator::from_query(documents_query, index_creation, request);
let _ = self.sender.try_send(AnalyticsMsg::AggregateUpdateDocuments(aggregate));
}
fn update_documents_by_function(
&self,
documents_query: &DocumentEditionByFunction,
index_creation: bool,
request: &HttpRequest,
) {
let aggregate =
EditDocumentsByFunctionAggregator::from_query(documents_query, index_creation, request);
let _ = self.sender.try_send(AnalyticsMsg::AggregateEditDocumentsByFunction(aggregate));
}
fn get_fetch_documents(&self, documents_query: &DocumentFetchKind, request: &HttpRequest) {
let aggregate = DocumentsFetchAggregator::from_query(documents_query, request);
let _ = self.sender.try_send(AnalyticsMsg::AggregateGetFetchDocuments(aggregate));
}
fn post_fetch_documents(&self, documents_query: &DocumentFetchKind, request: &HttpRequest) {
let aggregate = DocumentsFetchAggregator::from_query(documents_query, request);
let _ = self.sender.try_send(AnalyticsMsg::AggregatePostFetchDocuments(aggregate));
}
}
*/
/// This structure represent the `infos` field we send in the analytics.
/// It's quite close to the `Opt` structure except all sensitive informations
/// have been simplified to a boolean.
@ -536,13 +440,16 @@ impl Segment {
properties["requests"]["total_received"] = total.into();
};
self.batcher.push(Track {
let _ = self
.batcher
.push(Track {
user: self.user.clone(),
event: name.to_string(),
properties,
timestamp: Some(timestamp),
..Default::default()
});
})
.await;
}
let _ = self.batcher.flush().await;
@ -1181,479 +1088,6 @@ impl Aggregate for MultiSearchAggregator {
}
}
#[derive(Default)]
pub struct FacetSearchAggregator {
timestamp: Option<OffsetDateTime>,
// context
user_agents: HashSet<String>,
// requests
total_received: usize,
total_succeeded: usize,
time_spent: BinaryHeap<usize>,
// The set of all facetNames that were used
facet_names: HashSet<String>,
// As there been any other parameter than the facetName or facetQuery ones?
additional_search_parameters_provided: bool,
}
impl FacetSearchAggregator {
#[allow(clippy::field_reassign_with_default)]
pub fn from_query(query: &FacetSearchQuery, request: &HttpRequest) -> Self {
let FacetSearchQuery {
facet_query: _,
facet_name,
vector,
q,
filter,
matching_strategy,
attributes_to_search_on,
hybrid,
ranking_score_threshold,
locales,
} = query;
let mut ret = Self::default();
ret.timestamp = Some(OffsetDateTime::now_utc());
ret.total_received = 1;
ret.user_agents = extract_user_agents(request).into_iter().collect();
ret.facet_names = Some(facet_name.clone()).into_iter().collect();
ret.additional_search_parameters_provided = q.is_some()
|| vector.is_some()
|| filter.is_some()
|| *matching_strategy != MatchingStrategy::default()
|| attributes_to_search_on.is_some()
|| hybrid.is_some()
|| ranking_score_threshold.is_some()
|| locales.is_some();
ret
}
pub fn succeed(&mut self, result: &FacetSearchResult) {
let FacetSearchResult { facet_hits: _, facet_query: _, processing_time_ms } = result;
self.total_succeeded = self.total_succeeded.saturating_add(1);
self.time_spent.push(*processing_time_ms as usize);
}
/// Aggregate one [FacetSearchAggregator] into another.
pub fn aggregate(&mut self, mut other: Self) {
let Self {
timestamp,
user_agents,
total_received,
total_succeeded,
ref mut time_spent,
facet_names,
additional_search_parameters_provided,
} = other;
if self.timestamp.is_none() {
self.timestamp = timestamp;
}
// context
for user_agent in user_agents.into_iter() {
self.user_agents.insert(user_agent);
}
// request
self.total_received = self.total_received.saturating_add(total_received);
self.total_succeeded = self.total_succeeded.saturating_add(total_succeeded);
self.time_spent.append(time_spent);
// facet_names
for facet_name in facet_names.into_iter() {
self.facet_names.insert(facet_name);
}
// additional_search_parameters_provided
self.additional_search_parameters_provided |= additional_search_parameters_provided;
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
let Self {
timestamp,
user_agents,
total_received,
total_succeeded,
time_spent,
facet_names,
additional_search_parameters_provided,
} = self;
if total_received == 0 {
None
} else {
// the index of the 99th percentage of value
let percentile_99th = 0.99 * (total_succeeded as f64 - 1.) + 1.;
// we get all the values in a sorted manner
let time_spent = time_spent.into_sorted_vec();
// We are only interested by the slowest value of the 99th fastest results
let time_spent = time_spent.get(percentile_99th as usize);
let properties = json!({
"user-agent": user_agents,
"requests": {
"99th_response_time": time_spent.map(|t| format!("{:.2}", t)),
"total_succeeded": total_succeeded,
"total_failed": total_received.saturating_sub(total_succeeded), // just to be sure we never panics
"total_received": total_received,
},
"facets": {
"total_distinct_facet_count": facet_names.len(),
"additional_search_parameters_provided": additional_search_parameters_provided,
},
});
Some(Track {
timestamp,
user: user.clone(),
event: event_name.to_string(),
properties,
..Default::default()
})
}
}
}
#[derive(Default)]
pub struct DocumentsAggregator {
timestamp: Option<OffsetDateTime>,
// set to true when at least one request was received
updated: bool,
// context
user_agents: HashSet<String>,
content_types: HashSet<String>,
primary_keys: HashSet<String>,
index_creation: bool,
}
impl DocumentsAggregator {
pub fn from_query(
documents_query: &UpdateDocumentsQuery,
index_creation: bool,
request: &HttpRequest,
) -> Self {
let UpdateDocumentsQuery { primary_key, csv_delimiter: _ } = documents_query;
let mut primary_keys = HashSet::new();
if let Some(primary_key) = primary_key.clone() {
primary_keys.insert(primary_key);
}
let mut content_types = HashSet::new();
let content_type = request
.headers()
.get(CONTENT_TYPE)
.and_then(|s| s.to_str().ok())
.unwrap_or("unknown")
.to_string();
content_types.insert(content_type);
Self {
timestamp: Some(OffsetDateTime::now_utc()),
updated: true,
user_agents: extract_user_agents(request).into_iter().collect(),
content_types,
primary_keys,
index_creation,
}
}
/// Aggregate one [DocumentsAggregator] into another.
pub fn aggregate(&mut self, other: Self) {
let Self { timestamp, user_agents, primary_keys, content_types, index_creation, updated } =
other;
if self.timestamp.is_none() {
self.timestamp = timestamp;
}
self.updated |= updated;
// we can't create a union because there is no `into_union` method
for user_agent in user_agents {
self.user_agents.insert(user_agent);
}
for primary_key in primary_keys {
self.primary_keys.insert(primary_key);
}
for content_type in content_types {
self.content_types.insert(content_type);
}
self.index_creation |= index_creation;
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
let Self { timestamp, user_agents, primary_keys, content_types, index_creation, updated } =
self;
if !updated {
None
} else {
let properties = json!({
"user-agent": user_agents,
"payload_type": content_types,
"primary_key": primary_keys,
"index_creation": index_creation,
});
Some(Track {
timestamp,
user: user.clone(),
event: event_name.to_string(),
properties,
..Default::default()
})
}
}
}
#[derive(Default)]
pub struct EditDocumentsByFunctionAggregator {
timestamp: Option<OffsetDateTime>,
// Set to true if at least one request was filtered
filtered: bool,
// Set to true if at least one request contained a context
with_context: bool,
// context
user_agents: HashSet<String>,
index_creation: bool,
}
impl EditDocumentsByFunctionAggregator {
pub fn from_query(
documents_query: &DocumentEditionByFunction,
index_creation: bool,
request: &HttpRequest,
) -> Self {
let DocumentEditionByFunction { filter, context, function: _ } = documents_query;
Self {
timestamp: Some(OffsetDateTime::now_utc()),
user_agents: extract_user_agents(request).into_iter().collect(),
filtered: filter.is_some(),
with_context: context.is_some(),
index_creation,
}
}
/// Aggregate one [DocumentsAggregator] into another.
pub fn aggregate(&mut self, other: Self) {
let Self { timestamp, user_agents, index_creation, filtered, with_context } = other;
if self.timestamp.is_none() {
self.timestamp = timestamp;
}
// we can't create a union because there is no `into_union` method
for user_agent in user_agents {
self.user_agents.insert(user_agent);
}
self.index_creation |= index_creation;
self.filtered |= filtered;
self.with_context |= with_context;
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
let Self { timestamp, user_agents, index_creation, filtered, with_context } = self;
// if we had no timestamp it means we never encountered any events and
// thus we don't need to send this event.
let timestamp = timestamp?;
let properties = json!({
"user-agent": user_agents,
"filtered": filtered,
"with_context": with_context,
"index_creation": index_creation,
});
Some(Track {
timestamp: Some(timestamp),
user: user.clone(),
event: event_name.to_string(),
properties,
..Default::default()
})
}
}
#[derive(Default, Serialize)]
pub struct DocumentsDeletionAggregator {
#[serde(skip)]
timestamp: Option<OffsetDateTime>,
// context
#[serde(rename = "user-agent")]
user_agents: HashSet<String>,
#[serde(rename = "requests.total_received")]
total_received: usize,
per_document_id: bool,
clear_all: bool,
per_batch: bool,
per_filter: bool,
}
impl DocumentsDeletionAggregator {
pub fn from_query(kind: DocumentDeletionKind, request: &HttpRequest) -> Self {
Self {
timestamp: Some(OffsetDateTime::now_utc()),
user_agents: extract_user_agents(request).into_iter().collect(),
total_received: 1,
per_document_id: matches!(kind, DocumentDeletionKind::PerDocumentId),
clear_all: matches!(kind, DocumentDeletionKind::ClearAll),
per_batch: matches!(kind, DocumentDeletionKind::PerBatch),
per_filter: matches!(kind, DocumentDeletionKind::PerFilter),
}
}
/// Aggregate one [DocumentsAggregator] into another.
pub fn aggregate(&mut self, other: Self) {
let Self {
timestamp,
user_agents,
total_received,
per_document_id,
clear_all,
per_batch,
per_filter,
} = other;
if self.timestamp.is_none() {
self.timestamp = timestamp;
}
// we can't create a union because there is no `into_union` method
for user_agent in user_agents {
self.user_agents.insert(user_agent);
}
self.total_received = self.total_received.saturating_add(total_received);
self.per_document_id |= per_document_id;
self.clear_all |= clear_all;
self.per_batch |= per_batch;
self.per_filter |= per_filter;
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
// if we had no timestamp it means we never encountered any events and
// thus we don't need to send this event.
let timestamp = self.timestamp?;
Some(Track {
timestamp: Some(timestamp),
user: user.clone(),
event: event_name.to_string(),
properties: serde_json::to_value(self).ok()?,
..Default::default()
})
}
}
#[derive(Default, Serialize)]
pub struct DocumentsFetchAggregator {
#[serde(skip)]
timestamp: Option<OffsetDateTime>,
// context
#[serde(rename = "user-agent")]
user_agents: HashSet<String>,
#[serde(rename = "requests.total_received")]
total_received: usize,
// a call on ../documents/:doc_id
per_document_id: bool,
// if a filter was used
per_filter: bool,
#[serde(rename = "vector.retrieve_vectors")]
retrieve_vectors: bool,
// pagination
#[serde(rename = "pagination.max_limit")]
max_limit: usize,
#[serde(rename = "pagination.max_offset")]
max_offset: usize,
}
impl DocumentsFetchAggregator {
pub fn from_query(query: &DocumentFetchKind, request: &HttpRequest) -> Self {
let (limit, offset, retrieve_vectors) = match query {
DocumentFetchKind::PerDocumentId { retrieve_vectors } => (1, 0, *retrieve_vectors),
DocumentFetchKind::Normal { limit, offset, retrieve_vectors, .. } => {
(*limit, *offset, *retrieve_vectors)
}
};
Self {
timestamp: Some(OffsetDateTime::now_utc()),
user_agents: extract_user_agents(request).into_iter().collect(),
total_received: 1,
per_document_id: matches!(query, DocumentFetchKind::PerDocumentId { .. }),
per_filter: matches!(query, DocumentFetchKind::Normal { with_filter, .. } if *with_filter),
max_limit: limit,
max_offset: offset,
retrieve_vectors,
}
}
/// Aggregate one [DocumentsFetchAggregator] into another.
pub fn aggregate(&mut self, other: Self) {
let Self {
timestamp,
user_agents,
total_received,
per_document_id,
per_filter,
max_limit,
max_offset,
retrieve_vectors,
} = other;
if self.timestamp.is_none() {
self.timestamp = timestamp;
}
for user_agent in user_agents {
self.user_agents.insert(user_agent);
}
self.total_received = self.total_received.saturating_add(total_received);
self.per_document_id |= per_document_id;
self.per_filter |= per_filter;
self.max_limit = self.max_limit.max(max_limit);
self.max_offset = self.max_offset.max(max_offset);
self.retrieve_vectors |= retrieve_vectors;
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
// if we had no timestamp it means we never encountered any events and
// thus we don't need to send this event.
let timestamp = self.timestamp?;
Some(Track {
timestamp: Some(timestamp),
user: user.clone(),
event: event_name.to_string(),
properties: serde_json::to_value(self).ok()?,
..Default::default()
})
}
}
aggregate_methods!(
SimilarPOST => "Similar POST",
SimilarGET => "Similar GET",

View File

@ -9,7 +9,6 @@ use meilisearch_types::error::deserr_codes::*;
use meilisearch_types::error::ResponseError;
use meilisearch_types::index_uid::IndexUid;
use meilisearch_types::locales::Locale;
use serde::Serialize;
use serde_json::Value;
use tracing::debug;