add the required shared values between all the events and fix the timestamp

This commit is contained in:
Tamo 2024-10-17 09:06:23 +02:00
parent 7382fb21e4
commit ef77c7699b
2 changed files with 57 additions and 24 deletions

View File

@ -166,8 +166,8 @@ impl Analytics {
/// The method used to publish most analytics that do not need to be batched every hours /// The method used to publish most analytics that do not need to be batched every hours
pub fn publish<T: Aggregate>(&self, event: T, request: &HttpRequest) { pub fn publish<T: Aggregate>(&self, event: T, request: &HttpRequest) {
let Some(ref segment) = self.segment else { return }; if let Some(ref segment) = self.segment {
let user_agents = extract_user_agents(request); let _ = segment.sender.try_send(segment_analytics::Message::new(event, request));
let _ = segment.sender.try_send(segment_analytics::Message::new(event)); }
} }
} }

View File

@ -28,7 +28,6 @@ use super::{
config_user_id_path, Aggregate, AggregateMethod, DocumentDeletionKind, DocumentFetchKind, config_user_id_path, Aggregate, AggregateMethod, DocumentDeletionKind, DocumentFetchKind,
MEILISEARCH_CONFIG_PATH, MEILISEARCH_CONFIG_PATH,
}; };
use crate::analytics::Analytics;
use crate::option::{ use crate::option::{
default_http_addr, IndexerOpts, LogMode, MaxMemory, MaxThreads, ScheduleSnapshot, default_http_addr, IndexerOpts, LogMode, MaxMemory, MaxThreads, ScheduleSnapshot,
}; };
@ -58,7 +57,7 @@ fn write_user_id(db_path: &Path, user_id: &InstanceUid) {
const SEGMENT_API_KEY: &str = "P3FWhhEsJiEDCuEHpmcN9DHcK4hVfBvb"; const SEGMENT_API_KEY: &str = "P3FWhhEsJiEDCuEHpmcN9DHcK4hVfBvb";
pub fn extract_user_agents(request: &HttpRequest) -> Vec<String> { pub fn extract_user_agents(request: &HttpRequest) -> HashSet<String> {
request request
.headers() .headers()
.get(ANALYTICS_HEADER) .get(ANALYTICS_HEADER)
@ -77,14 +76,26 @@ pub struct Message {
type_id: TypeId, type_id: TypeId,
// Same for the aggregate function. // Same for the aggregate function.
aggregator_function: fn(Box<dyn Aggregate>, Box<dyn Aggregate>) -> Option<Box<dyn Aggregate>>, aggregator_function: fn(Box<dyn Aggregate>, Box<dyn Aggregate>) -> Option<Box<dyn Aggregate>>,
event: Box<dyn Aggregate>, event: Event,
}
pub struct Event {
original: Box<dyn Aggregate>,
timestamp: OffsetDateTime,
user_agents: HashSet<String>,
total: usize,
} }
impl Message { impl Message {
pub fn new<T: Aggregate>(event: T) -> Self { pub fn new<T: Aggregate>(event: T, request: &HttpRequest) -> Self {
Self { Self {
type_id: TypeId::of::<T>(), type_id: TypeId::of::<T>(),
event: Box::new(event), event: Event {
original: Box::new(event),
timestamp: OffsetDateTime::now_utc(),
user_agents: extract_user_agents(request),
total: 1,
},
aggregator_function: T::downcast_aggregate, aggregator_function: T::downcast_aggregate,
} }
} }
@ -400,7 +411,7 @@ pub struct Segment {
user: User, user: User,
opt: Opt, opt: Opt,
batcher: AutoBatcher, batcher: AutoBatcher,
events: HashMap<TypeId, Box<dyn Aggregate>>, events: HashMap<TypeId, Event>,
} }
impl Segment { impl Segment {
@ -451,20 +462,32 @@ impl Segment {
_ = interval.tick() => { _ = interval.tick() => {
self.tick(index_scheduler.clone(), auth_controller.clone()).await; self.tick(index_scheduler.clone(), auth_controller.clone()).await;
}, },
msg = self.inbox.recv() => { Some(msg) = self.inbox.recv() => {
match msg { self.handle_msg(msg);
Some(Message { type_id, event, aggregator_function }) => { }
}
}
}
fn handle_msg(&mut self, Message { type_id, aggregator_function, event }: Message) {
let new_event = match self.events.remove(&type_id) { let new_event = match self.events.remove(&type_id) {
Some(old) => (aggregator_function)(old, event).unwrap(), Some(old) => {
// The function should never fail since we retrieved the corresponding TypeId in the map. But in the unfortunate
// case it could happens we're going to silently ignore the error
let Some(original) = (aggregator_function)(old.original, event.original) else {
return;
};
Event {
original,
// We always want to return the FIRST timestamp ever encountered
timestamp: old.timestamp,
user_agents: old.user_agents.union(&event.user_agents).cloned().collect(),
total: old.total.saturating_add(event.total),
}
}
None => event, None => event,
}; };
self.events.insert(type_id, new_event); self.events.insert(type_id, new_event);
},
None => (),
}
}
}
}
} }
async fn tick( async fn tick(
@ -503,11 +526,21 @@ impl Segment {
let events = std::mem::take(&mut self.events); let events = std::mem::take(&mut self.events);
for (_, event) in events { for (_, event) in events {
let Event { original, timestamp, user_agents, total } = event;
let name = original.event_name();
let mut properties = original.into_event();
if properties["user-agent"].is_null() {
properties["user-agent"] = json!(user_agents);
};
if properties["requests"]["total_received"].is_null() {
properties["requests"]["total_received"] = total.into();
};
self.batcher.push(Track { self.batcher.push(Track {
user: self.user.clone(), user: self.user.clone(),
event: event.event_name().to_string(), event: name.to_string(),
properties: event.into_event(), properties,
timestamp: todo!(), timestamp: Some(timestamp),
..Default::default() ..Default::default()
}); });
} }