4056: Rewrite segment_analytics module with the destructuring syntax r=Kerollmops a=vivek-26

# Pull Request

## Related issue
Fixes #3928

## What does this PR do?
- This PR uses Rust Destructuring syntax in the `segment_analytics` module, such that adding or deleting fields causes an error at compile time.

## PR checklist
Please check if your PR fulfills the following requirements:
- [x] Does this PR fix an existing issue, or have you listed the changes applied in the PR description (and why they are needed)?
- [x] Have you read the contributing guidelines?
- [x] Have you made sure that the title is accurate and descriptive of the changes?

Thank you so much for contributing to Meilisearch!


Co-authored-by: Vivek Kumar <vivek.26@outlook.com>
This commit is contained in:
meili-bors[bot] 2023-09-12 08:07:50 +00:00 committed by GitHub
commit 77c3787b78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -615,19 +615,43 @@ pub struct SearchAggregator {
impl SearchAggregator {
pub fn from_query(query: &SearchQuery, request: &HttpRequest) -> Self {
let SearchQuery {
q,
vector,
offset,
limit,
page,
hits_per_page,
attributes_to_retrieve: _,
attributes_to_crop: _,
crop_length,
attributes_to_highlight: _,
show_matches_position,
show_ranking_score,
show_ranking_score_details,
filter,
sort,
facets: _,
highlight_pre_tag,
highlight_post_tag,
crop_marker,
matching_strategy,
attributes_to_search_on,
} = query;
let mut ret = Self::default();
ret.timestamp = Some(OffsetDateTime::now_utc());
ret.total_received = 1;
ret.user_agents = extract_user_agents(request).into_iter().collect();
if let Some(ref sort) = query.sort {
if let Some(ref sort) = sort {
ret.sort_total_number_of_criteria = 1;
ret.sort_with_geo_point = sort.iter().any(|s| s.contains("_geoPoint("));
ret.sort_sum_of_criteria_terms = sort.len();
}
if let Some(ref filter) = query.filter {
if let Some(ref filter) = filter {
static RE: Lazy<Regex> = Lazy::new(|| Regex::new("AND | OR").unwrap());
ret.filter_total_number_of_criteria = 1;
@ -652,80 +676,124 @@ impl SearchAggregator {
}
// attributes_to_search_on
if let Some(_) = query.attributes_to_search_on {
if let Some(_) = attributes_to_search_on {
ret.attributes_to_search_on_total_number_of_uses = 1;
}
if let Some(ref q) = query.q {
if let Some(ref q) = q {
ret.max_terms_number = q.split_whitespace().count();
}
if let Some(ref vector) = query.vector {
if let Some(ref vector) = vector {
ret.max_vector_size = vector.len();
}
if query.is_finite_pagination() {
let limit = query.hits_per_page.unwrap_or_else(DEFAULT_SEARCH_LIMIT);
let limit = hits_per_page.unwrap_or_else(DEFAULT_SEARCH_LIMIT);
ret.max_limit = limit;
ret.max_offset = query.page.unwrap_or(1).saturating_sub(1) * limit;
ret.max_offset = page.unwrap_or(1).saturating_sub(1) * limit;
ret.finite_pagination = 1;
} else {
ret.max_limit = query.limit;
ret.max_offset = query.offset;
ret.max_limit = *limit;
ret.max_offset = *offset;
ret.finite_pagination = 0;
}
ret.matching_strategy.insert(format!("{:?}", query.matching_strategy), 1);
ret.matching_strategy.insert(format!("{:?}", matching_strategy), 1);
ret.highlight_pre_tag = query.highlight_pre_tag != DEFAULT_HIGHLIGHT_PRE_TAG();
ret.highlight_post_tag = query.highlight_post_tag != DEFAULT_HIGHLIGHT_POST_TAG();
ret.crop_marker = query.crop_marker != DEFAULT_CROP_MARKER();
ret.crop_length = query.crop_length != DEFAULT_CROP_LENGTH();
ret.show_matches_position = query.show_matches_position;
ret.highlight_pre_tag = *highlight_pre_tag != DEFAULT_HIGHLIGHT_PRE_TAG();
ret.highlight_post_tag = *highlight_post_tag != DEFAULT_HIGHLIGHT_POST_TAG();
ret.crop_marker = *crop_marker != DEFAULT_CROP_MARKER();
ret.crop_length = *crop_length != DEFAULT_CROP_LENGTH();
ret.show_matches_position = *show_matches_position;
ret.show_ranking_score = query.show_ranking_score;
ret.show_ranking_score_details = query.show_ranking_score_details;
ret.show_ranking_score = *show_ranking_score;
ret.show_ranking_score_details = *show_ranking_score_details;
ret
}
pub fn succeed(&mut self, result: &SearchResult) {
let SearchResult {
hits: _,
query: _,
vector: _,
processing_time_ms,
hits_info: _,
facet_distribution: _,
facet_stats: _,
} = result;
self.total_succeeded = self.total_succeeded.saturating_add(1);
self.time_spent.push(result.processing_time_ms as usize);
self.time_spent.push(*processing_time_ms as usize);
}
/// Aggregate one [SearchAggregator] into another.
pub fn aggregate(&mut self, mut other: Self) {
let Self {
timestamp,
user_agents,
total_received,
total_succeeded,
ref mut time_spent,
sort_with_geo_point,
sort_sum_of_criteria_terms,
sort_total_number_of_criteria,
filter_with_geo_radius,
filter_with_geo_bounding_box,
filter_sum_of_criteria_terms,
filter_total_number_of_criteria,
used_syntax,
attributes_to_search_on_total_number_of_uses,
max_terms_number,
max_vector_size,
matching_strategy,
max_limit,
max_offset,
finite_pagination,
max_attributes_to_retrieve,
max_attributes_to_highlight,
highlight_pre_tag,
highlight_post_tag,
max_attributes_to_crop,
crop_marker,
show_matches_position,
crop_length,
facets_sum_of_terms,
facets_total_number_of_facets,
show_ranking_score,
show_ranking_score_details,
} = other;
if self.timestamp.is_none() {
self.timestamp = other.timestamp;
self.timestamp = timestamp;
}
// context
for user_agent in other.user_agents.into_iter() {
for user_agent in user_agents.into_iter() {
self.user_agents.insert(user_agent);
}
// request
self.total_received = self.total_received.saturating_add(other.total_received);
self.total_succeeded = self.total_succeeded.saturating_add(other.total_succeeded);
self.time_spent.append(&mut other.time_spent);
self.total_received = self.total_received.saturating_add(total_received);
self.total_succeeded = self.total_succeeded.saturating_add(total_succeeded);
self.time_spent.append(time_spent);
// sort
self.sort_with_geo_point |= other.sort_with_geo_point;
self.sort_with_geo_point |= sort_with_geo_point;
self.sort_sum_of_criteria_terms =
self.sort_sum_of_criteria_terms.saturating_add(other.sort_sum_of_criteria_terms);
self.sort_sum_of_criteria_terms.saturating_add(sort_sum_of_criteria_terms);
self.sort_total_number_of_criteria =
self.sort_total_number_of_criteria.saturating_add(other.sort_total_number_of_criteria);
self.sort_total_number_of_criteria.saturating_add(sort_total_number_of_criteria);
// filter
self.filter_with_geo_radius |= other.filter_with_geo_radius;
self.filter_with_geo_bounding_box |= other.filter_with_geo_bounding_box;
self.filter_with_geo_radius |= filter_with_geo_radius;
self.filter_with_geo_bounding_box |= filter_with_geo_bounding_box;
self.filter_sum_of_criteria_terms =
self.filter_sum_of_criteria_terms.saturating_add(other.filter_sum_of_criteria_terms);
self.filter_total_number_of_criteria = self
.filter_total_number_of_criteria
.saturating_add(other.filter_total_number_of_criteria);
for (key, value) in other.used_syntax.into_iter() {
self.filter_sum_of_criteria_terms.saturating_add(filter_sum_of_criteria_terms);
self.filter_total_number_of_criteria =
self.filter_total_number_of_criteria.saturating_add(filter_total_number_of_criteria);
for (key, value) in used_syntax.into_iter() {
let used_syntax = self.used_syntax.entry(key).or_insert(0);
*used_syntax = used_syntax.saturating_add(value);
}
@ -733,115 +801,149 @@ impl SearchAggregator {
// attributes_to_search_on
self.attributes_to_search_on_total_number_of_uses = self
.attributes_to_search_on_total_number_of_uses
.saturating_add(other.attributes_to_search_on_total_number_of_uses);
.saturating_add(attributes_to_search_on_total_number_of_uses);
// q
self.max_terms_number = self.max_terms_number.max(other.max_terms_number);
self.max_terms_number = self.max_terms_number.max(max_terms_number);
// vector
self.max_vector_size = self.max_vector_size.max(other.max_vector_size);
self.max_vector_size = self.max_vector_size.max(max_vector_size);
// pagination
self.max_limit = self.max_limit.max(other.max_limit);
self.max_offset = self.max_offset.max(other.max_offset);
self.finite_pagination += other.finite_pagination;
self.max_limit = self.max_limit.max(max_limit);
self.max_offset = self.max_offset.max(max_offset);
self.finite_pagination += finite_pagination;
// formatting
self.max_attributes_to_retrieve =
self.max_attributes_to_retrieve.max(other.max_attributes_to_retrieve);
self.max_attributes_to_retrieve.max(max_attributes_to_retrieve);
self.max_attributes_to_highlight =
self.max_attributes_to_highlight.max(other.max_attributes_to_highlight);
self.highlight_pre_tag |= other.highlight_pre_tag;
self.highlight_post_tag |= other.highlight_post_tag;
self.max_attributes_to_crop = self.max_attributes_to_crop.max(other.max_attributes_to_crop);
self.crop_marker |= other.crop_marker;
self.show_matches_position |= other.show_matches_position;
self.crop_length |= other.crop_length;
self.max_attributes_to_highlight.max(max_attributes_to_highlight);
self.highlight_pre_tag |= highlight_pre_tag;
self.highlight_post_tag |= highlight_post_tag;
self.max_attributes_to_crop = self.max_attributes_to_crop.max(max_attributes_to_crop);
self.crop_marker |= crop_marker;
self.show_matches_position |= show_matches_position;
self.crop_length |= crop_length;
// facets
self.facets_sum_of_terms =
self.facets_sum_of_terms.saturating_add(other.facets_sum_of_terms);
self.facets_sum_of_terms = self.facets_sum_of_terms.saturating_add(facets_sum_of_terms);
self.facets_total_number_of_facets =
self.facets_total_number_of_facets.saturating_add(other.facets_total_number_of_facets);
self.facets_total_number_of_facets.saturating_add(facets_total_number_of_facets);
// matching strategy
for (key, value) in other.matching_strategy.into_iter() {
for (key, value) in matching_strategy.into_iter() {
let matching_strategy = self.matching_strategy.entry(key).or_insert(0);
*matching_strategy = matching_strategy.saturating_add(value);
}
// scoring
self.show_ranking_score |= other.show_ranking_score;
self.show_ranking_score_details |= other.show_ranking_score_details;
self.show_ranking_score |= show_ranking_score;
self.show_ranking_score_details |= show_ranking_score_details;
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
if self.total_received == 0 {
let Self {
timestamp,
user_agents,
total_received,
total_succeeded,
time_spent,
sort_with_geo_point,
sort_sum_of_criteria_terms,
sort_total_number_of_criteria,
filter_with_geo_radius,
filter_with_geo_bounding_box,
filter_sum_of_criteria_terms,
filter_total_number_of_criteria,
used_syntax,
attributes_to_search_on_total_number_of_uses,
max_terms_number,
max_vector_size,
matching_strategy,
max_limit,
max_offset,
finite_pagination,
max_attributes_to_retrieve,
max_attributes_to_highlight,
highlight_pre_tag,
highlight_post_tag,
max_attributes_to_crop,
crop_marker,
show_matches_position,
crop_length,
facets_sum_of_terms,
facets_total_number_of_facets,
show_ranking_score,
show_ranking_score_details,
} = self;
if total_received == 0 {
None
} else {
// we get all the values in a sorted manner
let time_spent = self.time_spent.into_sorted_vec();
let time_spent = time_spent.into_sorted_vec();
// the index of the 99th percentage of value
let percentile_99th = time_spent.len() * 99 / 100;
// We are only interested by the slowest value of the 99th fastest results
let time_spent = time_spent.get(percentile_99th);
let properties = json!({
"user-agent": self.user_agents,
"user-agent": user_agents,
"requests": {
"99th_response_time": time_spent.map(|t| format!("{:.2}", t)),
"total_succeeded": self.total_succeeded,
"total_failed": self.total_received.saturating_sub(self.total_succeeded), // just to be sure we never panics
"total_received": self.total_received,
"total_succeeded": total_succeeded,
"total_failed": total_received.saturating_sub(total_succeeded), // just to be sure we never panics
"total_received": total_received,
},
"sort": {
"with_geoPoint": self.sort_with_geo_point,
"avg_criteria_number": format!("{:.2}", self.sort_sum_of_criteria_terms as f64 / self.sort_total_number_of_criteria as f64),
"with_geoPoint": sort_with_geo_point,
"avg_criteria_number": format!("{:.2}", sort_sum_of_criteria_terms as f64 / sort_total_number_of_criteria as f64),
},
"filter": {
"with_geoRadius": self.filter_with_geo_radius,
"with_geoBoundingBox": self.filter_with_geo_bounding_box,
"avg_criteria_number": format!("{:.2}", self.filter_sum_of_criteria_terms as f64 / self.filter_total_number_of_criteria as f64),
"most_used_syntax": self.used_syntax.iter().max_by_key(|(_, v)| *v).map(|(k, _)| json!(k)).unwrap_or_else(|| json!(null)),
"with_geoRadius": filter_with_geo_radius,
"with_geoBoundingBox": filter_with_geo_bounding_box,
"avg_criteria_number": format!("{:.2}", filter_sum_of_criteria_terms as f64 / filter_total_number_of_criteria as f64),
"most_used_syntax": used_syntax.iter().max_by_key(|(_, v)| *v).map(|(k, _)| json!(k)).unwrap_or_else(|| json!(null)),
},
"attributes_to_search_on": {
"total_number_of_uses": self.attributes_to_search_on_total_number_of_uses,
"total_number_of_uses": attributes_to_search_on_total_number_of_uses,
},
"q": {
"max_terms_number": self.max_terms_number,
"max_terms_number": max_terms_number,
},
"vector": {
"max_vector_size": self.max_vector_size,
"max_vector_size": max_vector_size,
},
"pagination": {
"max_limit": self.max_limit,
"max_offset": self.max_offset,
"most_used_navigation": if self.finite_pagination > (self.total_received / 2) { "exhaustive" } else { "estimated" },
"max_limit": max_limit,
"max_offset": max_offset,
"most_used_navigation": if finite_pagination > (total_received / 2) { "exhaustive" } else { "estimated" },
},
"formatting": {
"max_attributes_to_retrieve": self.max_attributes_to_retrieve,
"max_attributes_to_highlight": self.max_attributes_to_highlight,
"highlight_pre_tag": self.highlight_pre_tag,
"highlight_post_tag": self.highlight_post_tag,
"max_attributes_to_crop": self.max_attributes_to_crop,
"crop_marker": self.crop_marker,
"show_matches_position": self.show_matches_position,
"crop_length": self.crop_length,
"max_attributes_to_retrieve": max_attributes_to_retrieve,
"max_attributes_to_highlight": max_attributes_to_highlight,
"highlight_pre_tag": highlight_pre_tag,
"highlight_post_tag": highlight_post_tag,
"max_attributes_to_crop": max_attributes_to_crop,
"crop_marker": crop_marker,
"show_matches_position": show_matches_position,
"crop_length": crop_length,
},
"facets": {
"avg_facets_number": format!("{:.2}", self.facets_sum_of_terms as f64 / self.facets_total_number_of_facets as f64),
"avg_facets_number": format!("{:.2}", facets_sum_of_terms as f64 / facets_total_number_of_facets as f64),
},
"matching_strategy": {
"most_used_strategy": self.matching_strategy.iter().max_by_key(|(_, v)| *v).map(|(k, _)| json!(k)).unwrap_or_else(|| json!(null)),
"most_used_strategy": matching_strategy.iter().max_by_key(|(_, v)| *v).map(|(k, _)| json!(k)).unwrap_or_else(|| json!(null)),
},
"scoring": {
"show_ranking_score": self.show_ranking_score,
"show_ranking_score_details": self.show_ranking_score_details,
"show_ranking_score": show_ranking_score,
"show_ranking_score_details": show_ranking_score_details,
},
});
Some(Track {
timestamp: self.timestamp,
timestamp: timestamp,
user: user.clone(),
event: event_name.to_string(),
properties,
@ -881,8 +983,38 @@ impl MultiSearchAggregator {
let user_agents = extract_user_agents(request).into_iter().collect();
let distinct_indexes: HashSet<_> =
query.iter().map(|query| query.index_uid.as_str()).collect();
let distinct_indexes: HashSet<_> = query
.iter()
.map(|query| {
// make sure we get a compilation error if a field gets added to / removed from SearchQueryWithIndex
let SearchQueryWithIndex {
index_uid,
q: _,
vector: _,
offset: _,
limit: _,
page: _,
hits_per_page: _,
attributes_to_retrieve: _,
attributes_to_crop: _,
crop_length: _,
attributes_to_highlight: _,
show_ranking_score: _,
show_ranking_score_details: _,
show_matches_position: _,
filter: _,
sort: _,
facets: _,
highlight_pre_tag: _,
highlight_post_tag: _,
crop_marker: _,
matching_strategy: _,
attributes_to_search_on: _,
} = query;
index_uid.as_str()
})
.collect();
let show_ranking_score = query.iter().any(|query| query.show_ranking_score);
let show_ranking_score_details = query.iter().any(|query| query.show_ranking_score_details);
@ -904,6 +1036,7 @@ impl MultiSearchAggregator {
self.total_succeeded = self.total_succeeded.saturating_add(1);
}
/// Aggregate one [MultiSearchAggregator] into another.
pub fn aggregate(&mut self, other: Self) {
// write the aggregate in a way that will cause a compilation error if a field is added.
@ -945,33 +1078,45 @@ impl MultiSearchAggregator {
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
if self.total_received == 0 {
let Self {
timestamp,
total_received,
total_succeeded,
total_distinct_index_count,
total_single_index,
total_search_count,
user_agents,
show_ranking_score,
show_ranking_score_details,
} = self;
if total_received == 0 {
None
} else {
let properties = json!({
"user-agent": self.user_agents,
"user-agent": user_agents,
"requests": {
"total_succeeded": self.total_succeeded,
"total_failed": self.total_received.saturating_sub(self.total_succeeded), // just to be sure we never panics
"total_received": self.total_received,
"total_succeeded": total_succeeded,
"total_failed": total_received.saturating_sub(total_succeeded), // just to be sure we never panics
"total_received": total_received,
},
"indexes": {
"total_single_index": self.total_single_index,
"total_distinct_index_count": self.total_distinct_index_count,
"avg_distinct_index_count": (self.total_distinct_index_count as f64) / (self.total_received as f64), // not 0 else returned early
"total_single_index": total_single_index,
"total_distinct_index_count": total_distinct_index_count,
"avg_distinct_index_count": (total_distinct_index_count as f64) / (total_received as f64), // not 0 else returned early
},
"searches": {
"total_search_count": self.total_search_count,
"avg_search_count": (self.total_search_count as f64) / (self.total_received as f64),
"total_search_count": total_search_count,
"avg_search_count": (total_search_count as f64) / (total_received as f64),
},
"scoring": {
"show_ranking_score": self.show_ranking_score,
"show_ranking_score_details": self.show_ranking_score_details,
"show_ranking_score": show_ranking_score,
"show_ranking_score_details": show_ranking_score_details,
}
});
Some(Track {
timestamp: self.timestamp,
timestamp: timestamp,
user: user.clone(),
event: event_name.to_string(),
properties,
@ -1029,63 +1174,84 @@ impl FacetSearchAggregator {
}
pub fn succeed(&mut self, result: &FacetSearchResult) {
let FacetSearchResult { facet_hits: _, facet_query: _, processing_time_ms } = result;
self.total_succeeded = self.total_succeeded.saturating_add(1);
self.time_spent.push(result.processing_time_ms as usize);
self.time_spent.push(*processing_time_ms as usize);
}
/// Aggregate one [SearchAggregator] into another.
/// Aggregate one [FacetSearchAggregator] into another.
pub fn aggregate(&mut self, mut other: Self) {
let Self {
timestamp,
user_agents,
total_received,
total_succeeded,
ref mut time_spent,
facet_names,
additional_search_parameters_provided,
} = other;
if self.timestamp.is_none() {
self.timestamp = other.timestamp;
self.timestamp = timestamp;
}
// context
for user_agent in other.user_agents.into_iter() {
for user_agent in user_agents.into_iter() {
self.user_agents.insert(user_agent);
}
// request
self.total_received = self.total_received.saturating_add(other.total_received);
self.total_succeeded = self.total_succeeded.saturating_add(other.total_succeeded);
self.time_spent.append(&mut other.time_spent);
self.total_received = self.total_received.saturating_add(total_received);
self.total_succeeded = self.total_succeeded.saturating_add(total_succeeded);
self.time_spent.append(time_spent);
// facet_names
for facet_name in other.facet_names.into_iter() {
for facet_name in facet_names.into_iter() {
self.facet_names.insert(facet_name);
}
// additional_search_parameters_provided
self.additional_search_parameters_provided = self.additional_search_parameters_provided
| other.additional_search_parameters_provided;
self.additional_search_parameters_provided =
self.additional_search_parameters_provided | additional_search_parameters_provided;
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
if self.total_received == 0 {
let Self {
timestamp,
user_agents,
total_received,
total_succeeded,
time_spent,
facet_names,
additional_search_parameters_provided,
} = self;
if total_received == 0 {
None
} else {
// the index of the 99th percentage of value
let percentile_99th = 0.99 * (self.total_succeeded as f64 - 1.) + 1.;
let percentile_99th = 0.99 * (total_succeeded as f64 - 1.) + 1.;
// we get all the values in a sorted manner
let time_spent = self.time_spent.into_sorted_vec();
let time_spent = time_spent.into_sorted_vec();
// We are only interested by the slowest value of the 99th fastest results
let time_spent = time_spent.get(percentile_99th as usize);
let properties = json!({
"user-agent": self.user_agents,
"user-agent": user_agents,
"requests": {
"99th_response_time": time_spent.map(|t| format!("{:.2}", t)),
"total_succeeded": self.total_succeeded,
"total_failed": self.total_received.saturating_sub(self.total_succeeded), // just to be sure we never panics
"total_received": self.total_received,
"total_succeeded": total_succeeded,
"total_failed": total_received.saturating_sub(total_succeeded), // just to be sure we never panics
"total_received": total_received,
},
"facets": {
"total_distinct_facet_count": self.facet_names.len(),
"additional_search_parameters_provided": self.additional_search_parameters_provided,
"total_distinct_facet_count": facet_names.len(),
"additional_search_parameters_provided": additional_search_parameters_provided,
},
});
Some(Track {
timestamp: self.timestamp,
timestamp: timestamp,
user: user.clone(),
event: event_name.to_string(),
properties,
@ -1116,14 +1282,17 @@ impl DocumentsAggregator {
index_creation: bool,
request: &HttpRequest,
) -> Self {
let UpdateDocumentsQuery { primary_key, csv_delimiter: _ } = documents_query;
let mut ret = Self::default();
ret.timestamp = Some(OffsetDateTime::now_utc());
ret.updated = true;
ret.user_agents = extract_user_agents(request).into_iter().collect();
if let Some(primary_key) = documents_query.primary_key.clone() {
if let Some(primary_key) = primary_key.clone() {
ret.primary_keys.insert(primary_key);
}
let content_type = request
.headers()
.get(CONTENT_TYPE)
@ -1138,37 +1307,43 @@ impl DocumentsAggregator {
/// Aggregate one [DocumentsAggregator] into another.
pub fn aggregate(&mut self, other: Self) {
let Self { timestamp, user_agents, primary_keys, content_types, index_creation, updated } =
other;
if self.timestamp.is_none() {
self.timestamp = other.timestamp;
self.timestamp = timestamp;
}
self.updated |= other.updated;
self.updated |= updated;
// we can't create a union because there is no `into_union` method
for user_agent in other.user_agents {
for user_agent in user_agents {
self.user_agents.insert(user_agent);
}
for primary_key in other.primary_keys {
for primary_key in primary_keys {
self.primary_keys.insert(primary_key);
}
for content_type in other.content_types {
for content_type in content_types {
self.content_types.insert(content_type);
}
self.index_creation |= other.index_creation;
self.index_creation |= index_creation;
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
if !self.updated {
let Self { timestamp, user_agents, primary_keys, content_types, index_creation, updated } =
self;
if !updated {
None
} else {
let properties = json!({
"user-agent": self.user_agents,
"payload_type": self.content_types,
"primary_key": self.primary_keys,
"index_creation": self.index_creation,
"user-agent": user_agents,
"payload_type": content_types,
"primary_key": primary_keys,
"index_creation": index_creation,
});
Some(Track {
timestamp: self.timestamp,
timestamp: timestamp,
user: user.clone(),
event: event_name.to_string(),
properties,
@ -1214,19 +1389,29 @@ impl DocumentsDeletionAggregator {
/// Aggregate one [DocumentsAggregator] into another.
pub fn aggregate(&mut self, other: Self) {
let Self {
timestamp,
user_agents,
total_received,
per_document_id,
clear_all,
per_batch,
per_filter,
} = other;
if self.timestamp.is_none() {
self.timestamp = other.timestamp;
self.timestamp = timestamp;
}
// we can't create a union because there is no `into_union` method
for user_agent in other.user_agents {
for user_agent in user_agents {
self.user_agents.insert(user_agent);
}
self.total_received = self.total_received.saturating_add(other.total_received);
self.per_document_id |= other.per_document_id;
self.clear_all |= other.clear_all;
self.per_batch |= other.per_batch;
self.per_filter |= other.per_filter;
self.total_received = self.total_received.saturating_add(total_received);
self.per_document_id |= per_document_id;
self.clear_all |= clear_all;
self.per_batch |= per_batch;
self.per_filter |= per_filter;
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
@ -1269,49 +1454,82 @@ pub struct TasksAggregator {
impl TasksAggregator {
pub fn from_query(query: &TasksFilterQuery, request: &HttpRequest) -> Self {
let TasksFilterQuery {
limit: _,
from: _,
uids,
index_uids,
types,
statuses,
canceled_by,
before_enqueued_at,
after_enqueued_at,
before_started_at,
after_started_at,
before_finished_at,
after_finished_at,
} = query;
Self {
timestamp: Some(OffsetDateTime::now_utc()),
user_agents: extract_user_agents(request).into_iter().collect(),
filtered_by_uid: query.uids.is_some(),
filtered_by_index_uid: query.index_uids.is_some(),
filtered_by_type: query.types.is_some(),
filtered_by_status: query.statuses.is_some(),
filtered_by_canceled_by: query.canceled_by.is_some(),
filtered_by_before_enqueued_at: query.before_enqueued_at.is_some(),
filtered_by_after_enqueued_at: query.after_enqueued_at.is_some(),
filtered_by_before_started_at: query.before_started_at.is_some(),
filtered_by_after_started_at: query.after_started_at.is_some(),
filtered_by_before_finished_at: query.before_finished_at.is_some(),
filtered_by_after_finished_at: query.after_finished_at.is_some(),
filtered_by_uid: uids.is_some(),
filtered_by_index_uid: index_uids.is_some(),
filtered_by_type: types.is_some(),
filtered_by_status: statuses.is_some(),
filtered_by_canceled_by: canceled_by.is_some(),
filtered_by_before_enqueued_at: before_enqueued_at.is_some(),
filtered_by_after_enqueued_at: after_enqueued_at.is_some(),
filtered_by_before_started_at: before_started_at.is_some(),
filtered_by_after_started_at: after_started_at.is_some(),
filtered_by_before_finished_at: before_finished_at.is_some(),
filtered_by_after_finished_at: after_finished_at.is_some(),
total_received: 1,
}
}
/// Aggregate one [DocumentsAggregator] into another.
/// Aggregate one [TasksAggregator] into another.
pub fn aggregate(&mut self, other: Self) {
let Self {
timestamp,
user_agents,
total_received,
filtered_by_uid,
filtered_by_index_uid,
filtered_by_type,
filtered_by_status,
filtered_by_canceled_by,
filtered_by_before_enqueued_at,
filtered_by_after_enqueued_at,
filtered_by_before_started_at,
filtered_by_after_started_at,
filtered_by_before_finished_at,
filtered_by_after_finished_at,
} = other;
if self.timestamp.is_none() {
self.timestamp = other.timestamp;
self.timestamp = timestamp;
}
// we can't create a union because there is no `into_union` method
for user_agent in other.user_agents {
for user_agent in user_agents {
self.user_agents.insert(user_agent);
}
self.filtered_by_uid |= other.filtered_by_uid;
self.filtered_by_index_uid |= other.filtered_by_index_uid;
self.filtered_by_type |= other.filtered_by_type;
self.filtered_by_status |= other.filtered_by_status;
self.filtered_by_canceled_by |= other.filtered_by_canceled_by;
self.filtered_by_before_enqueued_at |= other.filtered_by_before_enqueued_at;
self.filtered_by_after_enqueued_at |= other.filtered_by_after_enqueued_at;
self.filtered_by_before_started_at |= other.filtered_by_before_started_at;
self.filtered_by_after_started_at |= other.filtered_by_after_started_at;
self.filtered_by_before_finished_at |= other.filtered_by_before_finished_at;
self.filtered_by_after_finished_at |= other.filtered_by_after_finished_at;
self.filtered_by_after_finished_at |= other.filtered_by_after_finished_at;
self.filtered_by_uid |= filtered_by_uid;
self.filtered_by_index_uid |= filtered_by_index_uid;
self.filtered_by_type |= filtered_by_type;
self.filtered_by_status |= filtered_by_status;
self.filtered_by_canceled_by |= filtered_by_canceled_by;
self.filtered_by_before_enqueued_at |= filtered_by_before_enqueued_at;
self.filtered_by_after_enqueued_at |= filtered_by_after_enqueued_at;
self.filtered_by_before_started_at |= filtered_by_before_started_at;
self.filtered_by_after_started_at |= filtered_by_after_started_at;
self.filtered_by_before_finished_at |= filtered_by_before_finished_at;
self.filtered_by_after_finished_at |= filtered_by_after_finished_at;
self.filtered_by_after_finished_at |= filtered_by_after_finished_at;
self.total_received = self.total_received.saturating_add(other.total_received);
self.total_received = self.total_received.saturating_add(total_received);
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
@ -1352,17 +1570,19 @@ impl HealthAggregator {
ret
}
/// Aggregate one [DocumentsAggregator] into another.
/// Aggregate one [HealthAggregator] into another.
pub fn aggregate(&mut self, other: Self) {
let Self { timestamp, user_agents, total_received } = other;
if self.timestamp.is_none() {
self.timestamp = other.timestamp;
self.timestamp = timestamp;
}
// we can't create a union because there is no `into_union` method
for user_agent in other.user_agents {
for user_agent in user_agents {
self.user_agents.insert(user_agent);
}
self.total_received = self.total_received.saturating_add(other.total_received);
self.total_received = self.total_received.saturating_add(total_received);
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {
@ -1423,19 +1643,29 @@ impl DocumentsFetchAggregator {
/// Aggregate one [DocumentsFetchAggregator] into another.
pub fn aggregate(&mut self, other: Self) {
let Self {
timestamp,
user_agents,
total_received,
per_document_id,
per_filter,
max_limit,
max_offset,
} = other;
if self.timestamp.is_none() {
self.timestamp = other.timestamp;
self.timestamp = timestamp;
}
for user_agent in other.user_agents {
for user_agent in user_agents {
self.user_agents.insert(user_agent);
}
self.total_received = self.total_received.saturating_add(other.total_received);
self.per_document_id |= other.per_document_id;
self.per_filter |= other.per_filter;
self.total_received = self.total_received.saturating_add(total_received);
self.per_document_id |= per_document_id;
self.per_filter |= per_filter;
self.max_limit = self.max_limit.max(other.max_limit);
self.max_offset = self.max_offset.max(other.max_offset);
self.max_limit = self.max_limit.max(max_limit);
self.max_offset = self.max_offset.max(max_offset);
}
pub fn into_event(self, user: &User, event_name: &str) -> Option<Track> {