From 5f50fc946442e8b308cbfa7c3a9f3138a84f6705 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Tue, 24 Jun 2025 17:05:49 +0200 Subject: [PATCH] Add new analytics to the chat completions route --- .../routes/chats/chat_completion_analytics.rs | 139 ++++++++++++++++++ .../src/routes/chats/chat_completions.rs | 32 ++++ crates/meilisearch/src/routes/chats/mod.rs | 1 + 3 files changed, 172 insertions(+) create mode 100644 crates/meilisearch/src/routes/chats/chat_completion_analytics.rs diff --git a/crates/meilisearch/src/routes/chats/chat_completion_analytics.rs b/crates/meilisearch/src/routes/chats/chat_completion_analytics.rs new file mode 100644 index 000000000..4fde81653 --- /dev/null +++ b/crates/meilisearch/src/routes/chats/chat_completion_analytics.rs @@ -0,0 +1,139 @@ +use std::collections::BinaryHeap; + +use serde_json::{json, Value}; + +use crate::aggregate_methods; +use crate::analytics::{Aggregate, AggregateMethod}; + +aggregate_methods!( + ChatCompletionPOST => "Chat Completion POST", +); + +#[derive(Default)] +pub struct ChatCompletionAggregator { + // requests + total_received: usize, + total_succeeded: usize, + time_spent: BinaryHeap, + + // chat completion specific metrics + total_messages: usize, + total_streamed_requests: usize, + total_non_streamed_requests: usize, + + // model usage tracking + models_used: std::collections::HashMap, + + _method: std::marker::PhantomData, +} + +impl ChatCompletionAggregator { + pub fn from_request(model: &str, message_count: usize, is_stream: bool) -> Self { + let mut models_used = std::collections::HashMap::new(); + models_used.insert(model.to_string(), 1); + + Self { + total_received: 1, + total_messages: message_count, + total_streamed_requests: if is_stream { 1 } else { 0 }, + total_non_streamed_requests: if is_stream { 0 } else { 1 }, + models_used, + ..Default::default() + } + } + + pub fn succeed(&mut self, time_spent: std::time::Duration) { + self.total_succeeded += 1; + self.time_spent.push(time_spent.as_millis() as usize); + } +} + +impl Aggregate for ChatCompletionAggregator { + fn event_name(&self) -> &'static str { + Method::event_name() + } + + fn aggregate(mut self: Box, new: Box) -> Box { + let Self { + total_received, + total_succeeded, + mut time_spent, + total_messages, + total_streamed_requests, + total_non_streamed_requests, + models_used, + .. + } = *new; + + // Aggregate time spent + self.time_spent.append(&mut time_spent); + + // Aggregate counters + self.total_received = self.total_received.saturating_add(total_received); + self.total_succeeded = self.total_succeeded.saturating_add(total_succeeded); + self.total_messages = self.total_messages.saturating_add(total_messages); + self.total_streamed_requests = + self.total_streamed_requests.saturating_add(total_streamed_requests); + self.total_non_streamed_requests = + self.total_non_streamed_requests.saturating_add(total_non_streamed_requests); + + // Aggregate model usage + for (model, count) in models_used { + *self.models_used.entry(model).or_insert(0) += count; + } + + self + } + + fn into_event(self: Box) -> Value { + let Self { + total_received, + total_succeeded, + time_spent, + total_messages, + total_streamed_requests, + total_non_streamed_requests, + models_used, + .. + } = *self; + + // Compute time statistics + let time_spent: Vec = time_spent.into_sorted_vec(); + let (max_time, min_time, avg_time) = if time_spent.is_empty() { + (0, 0, 0) + } else { + let max_time = time_spent.last().unwrap_or(&0); + let min_time = time_spent.first().unwrap_or(&0); + let sum: usize = time_spent.iter().sum(); + let avg_time = sum / time_spent.len(); + (*max_time, *min_time, avg_time) + }; + + // Compute average messages per request + let avg_messages_per_request = + if total_received > 0 { total_messages as f64 / total_received as f64 } else { 0.0 }; + + // Compute streaming vs non-streaming proportions + let streaming_ratio = if total_received > 0 { + total_streamed_requests as f64 / total_received as f64 + } else { + 0.0 + }; + + json!({ + "total_received": total_received, + "total_succeeded": total_succeeded, + "time_spent": { + "max": max_time, + "min": min_time, + "avg": avg_time + }, + "total_messages": total_messages, + "avg_messages_per_request": avg_messages_per_request, + "total_streamed_requests": total_streamed_requests, + "total_non_streamed_requests": total_non_streamed_requests, + "streaming_ratio": streaming_ratio, + "models_used": models_used, + }) + } +} diff --git a/crates/meilisearch/src/routes/chats/chat_completions.rs b/crates/meilisearch/src/routes/chats/chat_completions.rs index 8108e24dc..552a627b1 100644 --- a/crates/meilisearch/src/routes/chats/chat_completions.rs +++ b/crates/meilisearch/src/routes/chats/chat_completions.rs @@ -36,6 +36,7 @@ use serde_json::json; use tokio::runtime::Handle; use tokio::sync::mpsc::error::SendError; +use super::chat_completion_analytics::{ChatCompletionAggregator, ChatCompletionPOST}; use super::config::Config; use super::errors::{MistralError, OpenAiOutsideError, StreamErrorEvent}; use super::utils::format_documents; @@ -43,6 +44,7 @@ use super::{ ChatsParam, MEILI_APPEND_CONVERSATION_MESSAGE_NAME, MEILI_SEARCH_IN_INDEX_FUNCTION_NAME, MEILI_SEARCH_PROGRESS_NAME, MEILI_SEARCH_SOURCES_NAME, }; +use crate::analytics::Analytics; use crate::error::MeilisearchHttpError; use crate::extractors::authentication::policies::ActionPolicy; use crate::extractors::authentication::{extract_token_from_request, GuardedData, Policy as _}; @@ -64,6 +66,7 @@ async fn chat( req: HttpRequest, search_queue: web::Data, web::Json(chat_completion): web::Json, + analytics: web::Data, ) -> impl Responder { let ChatsParam { workspace_uid } = chats_param.into_inner(); @@ -76,6 +79,7 @@ async fn chat( &workspace_uid, req, chat_completion, + analytics, ) .await, ) @@ -88,6 +92,7 @@ async fn chat( &workspace_uid, req, chat_completion, + analytics, ) .await, ) @@ -315,9 +320,18 @@ async fn non_streamed_chat( workspace_uid: &str, req: HttpRequest, chat_completion: CreateChatCompletionRequest, + analytics: web::Data, ) -> Result { index_scheduler.features().check_chat_completions("using the /chats chat completions route")?; + // Create analytics aggregator + let aggregate = ChatCompletionAggregator::::from_request( + &chat_completion.model, + chat_completion.messages.len(), + false, // non_streamed_chat is not streaming + ); + let start_time = std::time::Instant::now(); + if let Some(n) = chat_completion.n.filter(|&n| n != 1) { return Err(ResponseError::from_msg( format!("You tried to specify n = {n} but only single choices are supported (n = 1)."), @@ -414,6 +428,11 @@ async fn non_streamed_chat( } } + // Record success in analytics + let mut aggregate = aggregate; + aggregate.succeed(start_time.elapsed()); + analytics.publish(aggregate, &req); + Ok(HttpResponse::Ok().json(response)) } @@ -424,6 +443,7 @@ async fn streamed_chat( workspace_uid: &str, req: HttpRequest, mut chat_completion: CreateChatCompletionRequest, + analytics: web::Data, ) -> Result { index_scheduler.features().check_chat_completions("using the /chats chat completions route")?; let filters = index_scheduler.filters(); @@ -445,6 +465,14 @@ async fn streamed_chat( } }; + // Create analytics aggregator + let mut aggregate = ChatCompletionAggregator::::from_request( + &chat_completion.model, + chat_completion.messages.len(), + true, // streamed_chat is always streaming + ); + let start_time = std::time::Instant::now(); + let config = Config::new(&chat_settings); let auth_token = extract_token_from_request(&req)?.unwrap().to_string(); let system_role = chat_settings.source.system_role(&chat_completion.model); @@ -490,6 +518,10 @@ async fn streamed_chat( let _ = tx.stop().await; }); + // Record success in analytics after the stream is set up + aggregate.succeed(start_time.elapsed()); + analytics.publish(aggregate, &req); + Ok(Sse::from_infallible_receiver(rx).with_retry_duration(Duration::from_secs(10))) } diff --git a/crates/meilisearch/src/routes/chats/mod.rs b/crates/meilisearch/src/routes/chats/mod.rs index a8a93e6a4..8633bd496 100644 --- a/crates/meilisearch/src/routes/chats/mod.rs +++ b/crates/meilisearch/src/routes/chats/mod.rs @@ -19,6 +19,7 @@ use crate::extractors::authentication::policies::ActionPolicy; use crate::extractors::authentication::GuardedData; use crate::routes::PAGINATION_DEFAULT_LIMIT; +mod chat_completion_analytics; pub mod chat_completions; mod config; mod errors;