Analytics changes

This commit is contained in:
Louis Dureuil 2024-05-27 10:54:12 +02:00
parent d35278320e
commit 54b15059a0
No known key found for this signature in database
3 changed files with 289 additions and 2 deletions

View File

@ -25,6 +25,18 @@ impl SearchAggregator {
pub fn succeed(&mut self, _: &dyn Any) {}
}
#[derive(Default)]
pub struct SimilarAggregator;
#[allow(dead_code)]
impl SimilarAggregator {
pub fn from_query(_: &dyn Any, _: &dyn Any) -> Self {
Self
}
pub fn succeed(&mut self, _: &dyn Any) {}
}
#[derive(Default)]
pub struct MultiSearchAggregator;
@ -66,6 +78,8 @@ impl Analytics for MockAnalytics {
fn publish(&self, _event_name: String, _send: Value, _request: Option<&HttpRequest>) {}
fn get_search(&self, _aggregate: super::SearchAggregator) {}
fn post_search(&self, _aggregate: super::SearchAggregator) {}
fn get_similar(&self, _aggregate: super::SimilarAggregator) {}
fn post_similar(&self, _aggregate: super::SimilarAggregator) {}
fn post_multi_search(&self, _aggregate: super::MultiSearchAggregator) {}
fn post_facet_search(&self, _aggregate: super::FacetSearchAggregator) {}
fn add_documents(

View File

@ -22,6 +22,8 @@ pub type SegmentAnalytics = mock_analytics::MockAnalytics;
#[cfg(not(feature = "analytics"))]
pub type SearchAggregator = mock_analytics::SearchAggregator;
#[cfg(not(feature = "analytics"))]
pub type SimilarAggregator = mock_analytics::SimilarAggregator;
#[cfg(not(feature = "analytics"))]
pub type MultiSearchAggregator = mock_analytics::MultiSearchAggregator;
#[cfg(not(feature = "analytics"))]
pub type FacetSearchAggregator = mock_analytics::FacetSearchAggregator;
@ -32,6 +34,8 @@ pub type SegmentAnalytics = segment_analytics::SegmentAnalytics;
#[cfg(feature = "analytics")]
pub type SearchAggregator = segment_analytics::SearchAggregator;
#[cfg(feature = "analytics")]
pub type SimilarAggregator = segment_analytics::SimilarAggregator;
#[cfg(feature = "analytics")]
pub type MultiSearchAggregator = segment_analytics::MultiSearchAggregator;
#[cfg(feature = "analytics")]
pub type FacetSearchAggregator = segment_analytics::FacetSearchAggregator;
@ -86,6 +90,12 @@ pub trait Analytics: Sync + Send {
/// This method should be called to aggregate a post search
fn post_search(&self, aggregate: SearchAggregator);
/// This method should be called to aggregate a get similar request
fn get_similar(&self, aggregate: SimilarAggregator);
/// This method should be called to aggregate a post similar request
fn post_similar(&self, aggregate: SimilarAggregator);
/// This method should be called to aggregate a post array of searches
fn post_multi_search(&self, aggregate: MultiSearchAggregator);

View File

@ -36,8 +36,9 @@ use crate::routes::indexes::facet_search::FacetSearchQuery;
use crate::routes::{create_all_stats, Stats};
use crate::search::{
FacetSearchResult, MatchingStrategy, SearchQuery, SearchQueryWithIndex, SearchResult,
DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER, DEFAULT_HIGHLIGHT_POST_TAG,
DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT, DEFAULT_SEMANTIC_RATIO,
SimilarQuery, SimilarResult, DEFAULT_CROP_LENGTH, DEFAULT_CROP_MARKER,
DEFAULT_HIGHLIGHT_POST_TAG, DEFAULT_HIGHLIGHT_PRE_TAG, DEFAULT_SEARCH_LIMIT,
DEFAULT_SEMANTIC_RATIO,
};
use crate::Opt;
@ -73,6 +74,8 @@ pub enum AnalyticsMsg {
BatchMessage(Track),
AggregateGetSearch(SearchAggregator),
AggregatePostSearch(SearchAggregator),
AggregateGetSimilar(SimilarAggregator),
AggregatePostSimilar(SimilarAggregator),
AggregatePostMultiSearch(MultiSearchAggregator),
AggregatePostFacetSearch(FacetSearchAggregator),
AggregateAddDocuments(DocumentsAggregator),
@ -149,6 +152,8 @@ impl SegmentAnalytics {
update_documents_aggregator: DocumentsAggregator::default(),
get_fetch_documents_aggregator: DocumentsFetchAggregator::default(),
post_fetch_documents_aggregator: DocumentsFetchAggregator::default(),
get_similar_aggregator: SimilarAggregator::default(),
post_similar_aggregator: SimilarAggregator::default(),
});
tokio::spawn(segment.run(index_scheduler.clone(), auth_controller.clone()));
@ -184,6 +189,14 @@ impl super::Analytics for SegmentAnalytics {
let _ = self.sender.try_send(AnalyticsMsg::AggregatePostSearch(aggregate));
}
fn get_similar(&self, aggregate: SimilarAggregator) {
let _ = self.sender.try_send(AnalyticsMsg::AggregateGetSimilar(aggregate));
}
fn post_similar(&self, aggregate: SimilarAggregator) {
let _ = self.sender.try_send(AnalyticsMsg::AggregatePostSimilar(aggregate));
}
fn post_facet_search(&self, aggregate: FacetSearchAggregator) {
let _ = self.sender.try_send(AnalyticsMsg::AggregatePostFacetSearch(aggregate));
}
@ -379,6 +392,8 @@ pub struct Segment {
update_documents_aggregator: DocumentsAggregator,
get_fetch_documents_aggregator: DocumentsFetchAggregator,
post_fetch_documents_aggregator: DocumentsFetchAggregator,
get_similar_aggregator: SimilarAggregator,
post_similar_aggregator: SimilarAggregator,
}
impl Segment {
@ -441,6 +456,8 @@ impl Segment {
Some(AnalyticsMsg::AggregateUpdateDocuments(agreg)) => self.update_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateGetFetchDocuments(agreg)) => self.get_fetch_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregatePostFetchDocuments(agreg)) => self.post_fetch_documents_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregateGetSimilar(agreg)) => self.get_similar_aggregator.aggregate(agreg),
Some(AnalyticsMsg::AggregatePostSimilar(agreg)) => self.post_similar_aggregator.aggregate(agreg),
None => (),
}
}
@ -494,6 +511,8 @@ impl Segment {
update_documents_aggregator,
get_fetch_documents_aggregator,
post_fetch_documents_aggregator,
get_similar_aggregator,
post_similar_aggregator,
} = self;
if let Some(get_search) =
@ -541,6 +560,18 @@ impl Segment {
{
let _ = self.batcher.push(post_fetch_documents).await;
}
if let Some(get_similar_documents) =
take(get_similar_aggregator).into_event(user, "Similar GET")
{
let _ = self.batcher.push(get_similar_documents).await;
}
if let Some(post_similar_documents) =
take(post_similar_aggregator).into_event(user, "Similar POST")
{
let _ = self.batcher.push(post_similar_documents).await;
}
let _ = self.batcher.flush().await;
}
}
@ -1558,3 +1589,235 @@ impl DocumentsFetchAggregator {
})
}
}
#[derive(Default)]
pub struct SimilarAggregator {
timestamp: Option<OffsetDateTime>,
// context
user_agents: HashSet<String>,
// requests
total_received: usize,
total_succeeded: usize,
time_spent: BinaryHeap<usize>,
// filter
filter_with_geo_radius: bool,
filter_with_geo_bounding_box: bool,
// every time a request has a filter, this field must be incremented by the number of terms it contains
filter_sum_of_criteria_terms: usize,
// every time a request has a filter, this field must be incremented by one
filter_total_number_of_criteria: usize,
used_syntax: HashMap<String, usize>,
// Whether a non-default embedder was specified
embedder: bool,
// pagination
max_limit: usize,
max_offset: usize,
// formatting
max_attributes_to_retrieve: usize,
// scoring
show_ranking_score: bool,
show_ranking_score_details: bool,
}
impl SimilarAggregator {
#[allow(clippy::field_reassign_with_default)]
pub fn from_query(query: &SimilarQuery, request: &HttpRequest) -> Self {
let SimilarQuery {
id: _,
embedder,
offset,
limit,
attributes_to_retrieve: _,
show_ranking_score,
show_ranking_score_details,
filter,
} = query;
let mut ret = Self::default();
ret.timestamp = Some(OffsetDateTime::now_utc());
ret.total_received = 1;
ret.user_agents = extract_user_agents(request).into_iter().collect();
if let Some(ref filter) = filter {
static RE: Lazy<Regex> = Lazy::new(|| Regex::new("AND | OR").unwrap());
ret.filter_total_number_of_criteria = 1;
let syntax = match filter {
Value::String(_) => "string".to_string(),
Value::Array(values) => {
if values.iter().map(|v| v.to_string()).any(|s| RE.is_match(&s)) {
"mixed".to_string()
} else {
"array".to_string()
}
}
_ => "none".to_string(),
};
// convert the string to a HashMap
ret.used_syntax.insert(syntax, 1);
let stringified_filters = filter.to_string();
ret.filter_with_geo_radius = stringified_filters.contains("_geoRadius(");
ret.filter_with_geo_bounding_box = stringified_filters.contains("_geoBoundingBox(");
ret.filter_sum_of_criteria_terms = RE.split(&stringified_filters).count();
}
ret.max_limit = *limit;
ret.max_offset = *offset;
ret.show_ranking_score = *show_ranking_score;
ret.show_ranking_score_details = *show_ranking_score_details;
ret.embedder = embedder.is_some();
ret
}
pub fn succeed(&mut self, result: &SimilarResult) {
let SimilarResult { id: _, hits: _, processing_time_ms, hits_info: _ } = result;
self.total_succeeded = self.total_succeeded.saturating_add(1);
self.time_spent.push(*processing_time_ms as usize);
}
/// Aggregate one [SimilarAggregator] into another.
pub fn aggregate(&mut self, mut other: Self) {
let Self {
timestamp,
user_agents,
total_received,
total_succeeded,
ref mut time_spent,
filter_with_geo_radius,
filter_with_geo_bounding_box,
filter_sum_of_criteria_terms,
filter_total_number_of_criteria,
used_syntax,
max_limit,
max_offset,
max_attributes_to_retrieve,
show_ranking_score,
show_ranking_score_details,
embedder,
} = other;
if self.timestamp.is_none() {
self.timestamp = timestamp;
}
// context
for user_agent in user_agents.into_iter() {
self.user_agents.insert(user_agent);
}
// request
self.total_received = self.total_received.saturating_add(total_received);
self.total_succeeded = self.total_succeeded.saturating_add(total_succeeded);
self.time_spent.append(time_spent);
// filter
self.filter_with_geo_radius |= filter_with_geo_radius;
self.filter_with_geo_bounding_box |= filter_with_geo_bounding_box;
self.filter_sum_of_criteria_terms =
self.filter_sum_of_criteria_terms.saturating_add(filter_sum_of_criteria_terms);
self.filter_total_number_of_criteria =
self.filter_total_number_of_criteria.saturating_add(filter_total_number_of_criteria);
for (key, value) in used_syntax.into_iter() {
let used_syntax = self.used_syntax.entry(key).or_insert(0);
*used_syntax = used_syntax.saturating_add(value);
}
self.embedder |= embedder;
// pagination
self.max_limit = self.max_limit.max(max_limit);
self.max_offset = self.max_offset.max(max_offset);
// formatting
self.max_attributes_to_retrieve =
self.max_attributes_to_retrieve.max(max_attributes_to_retrieve);
// scoring
self.show_ranking_score |= show_ranking_score;
self.show_ranking_score_details |= show_ranking_score_details;
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
let Self {
timestamp,
user_agents,
total_received,
total_succeeded,
time_spent,
filter_with_geo_radius,
filter_with_geo_bounding_box,
filter_sum_of_criteria_terms,
filter_total_number_of_criteria,
used_syntax,
max_limit,
max_offset,
max_attributes_to_retrieve,
show_ranking_score,
show_ranking_score_details,
embedder,
} = self;
if total_received == 0 {
None
} else {
// we get all the values in a sorted manner
let time_spent = time_spent.into_sorted_vec();
// the index of the 99th percentage of value
let percentile_99th = time_spent.len() * 99 / 100;
// We are only interested by the slowest value of the 99th fastest results
let time_spent = time_spent.get(percentile_99th);
let properties = json!({
"user-agent": user_agents,
"requests": {
"99th_response_time": time_spent.map(|t| format!("{:.2}", t)),
"total_succeeded": total_succeeded,
"total_failed": total_received.saturating_sub(total_succeeded), // just to be sure we never panics
"total_received": total_received,
},
"filter": {
"with_geoRadius": filter_with_geo_radius,
"with_geoBoundingBox": filter_with_geo_bounding_box,
"avg_criteria_number": format!("{:.2}", filter_sum_of_criteria_terms as f64 / filter_total_number_of_criteria as f64),
"most_used_syntax": used_syntax.iter().max_by_key(|(_, v)| *v).map(|(k, _)| json!(k)).unwrap_or_else(|| json!(null)),
},
"hybrid": {
"embedder": embedder,
},
"pagination": {
"max_limit": max_limit,
"max_offset": max_offset,
},
"formatting": {
"max_attributes_to_retrieve": max_attributes_to_retrieve,
},
"scoring": {
"show_ranking_score": show_ranking_score,
"show_ranking_score_details": show_ranking_score_details,
},
});
Some(Track {
timestamp,
user: user.clone(),
event: event_name.to_string(),
properties,
..Default::default()
})
}
}
}