Merge pull request #5709 from meilisearch/analytics-chat-completions

Add analytics to the chat completions
This commit is contained in:
Many the fish 2025-07-01 09:14:47 +00:00 committed by GitHub
commit e8b2bb3ea6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 168 additions and 0 deletions

View File

@ -0,0 +1,135 @@
use std::collections::BinaryHeap;
use serde_json::{json, Value};
use crate::analytics::Aggregate;
#[derive(Default)]
pub struct ChatCompletionAggregator {
// requests
total_received: usize,
total_succeeded: usize,
time_spent: BinaryHeap<usize>,
// chat completion specific metrics
total_messages: usize,
total_streamed_requests: usize,
total_non_streamed_requests: usize,
// model usage tracking
models_used: std::collections::HashMap<String, usize>,
}
impl ChatCompletionAggregator {
pub fn from_request(model: &str, message_count: usize, is_stream: bool) -> Self {
let mut models_used = std::collections::HashMap::new();
models_used.insert(model.to_string(), 1);
Self {
total_received: 1,
total_succeeded: 0,
time_spent: BinaryHeap::new(),
total_messages: message_count,
total_streamed_requests: if is_stream { 1 } else { 0 },
total_non_streamed_requests: if is_stream { 0 } else { 1 },
models_used,
}
}
pub fn succeed(&mut self, time_spent: std::time::Duration) {
self.total_succeeded += 1;
self.time_spent.push(time_spent.as_millis() as usize);
}
}
impl Aggregate for ChatCompletionAggregator {
fn event_name(&self) -> &'static str {
"Chat Completion POST"
}
fn aggregate(mut self: Box<Self>, new: Box<Self>) -> Box<Self> {
let Self {
total_received,
total_succeeded,
mut time_spent,
total_messages,
total_streamed_requests,
total_non_streamed_requests,
models_used,
..
} = *new;
// Aggregate time spent
self.time_spent.append(&mut time_spent);
// Aggregate counters
self.total_received = self.total_received.saturating_add(total_received);
self.total_succeeded = self.total_succeeded.saturating_add(total_succeeded);
self.total_messages = self.total_messages.saturating_add(total_messages);
self.total_streamed_requests =
self.total_streamed_requests.saturating_add(total_streamed_requests);
self.total_non_streamed_requests =
self.total_non_streamed_requests.saturating_add(total_non_streamed_requests);
// Aggregate model usage
for (model, count) in models_used {
*self.models_used.entry(model).or_insert(0) += count;
}
self
}
fn into_event(self: Box<Self>) -> Value {
let Self {
total_received,
total_succeeded,
time_spent,
total_messages,
total_streamed_requests,
total_non_streamed_requests,
models_used,
..
} = *self;
// Compute time statistics
let time_spent: Vec<usize> = time_spent.into_sorted_vec();
let (max_time, min_time, avg_time) = if time_spent.is_empty() {
(0, 0, 0)
} else {
let max_time = time_spent.last().unwrap_or(&0);
let min_time = time_spent.first().unwrap_or(&0);
let sum: usize = time_spent.iter().sum();
let avg_time = sum / time_spent.len();
(*max_time, *min_time, avg_time)
};
// Compute average messages per request
let avg_messages_per_request =
if total_received > 0 { total_messages as f64 / total_received as f64 } else { 0.0 };
// Compute streaming vs non-streaming proportions
let streaming_ratio = if total_received > 0 {
total_streamed_requests as f64 / total_received as f64
} else {
0.0
};
json!({
"total_received": total_received,
"total_succeeded": total_succeeded,
"time_spent": {
"max": max_time,
"min": min_time,
"avg": avg_time
},
"total_messages": total_messages,
"avg_messages_per_request": avg_messages_per_request,
"total_streamed_requests": total_streamed_requests,
"total_non_streamed_requests": total_non_streamed_requests,
"streaming_ratio": streaming_ratio,
"models_used": models_used,
})
}
}

View File

@ -36,6 +36,7 @@ use serde_json::json;
use tokio::runtime::Handle;
use tokio::sync::mpsc::error::SendError;
use super::chat_completion_analytics::ChatCompletionAggregator;
use super::config::Config;
use super::errors::{MistralError, OpenAiOutsideError, StreamErrorEvent};
use super::utils::format_documents;
@ -43,6 +44,7 @@ use super::{
ChatsParam, MEILI_APPEND_CONVERSATION_MESSAGE_NAME, MEILI_SEARCH_IN_INDEX_FUNCTION_NAME,
MEILI_SEARCH_PROGRESS_NAME, MEILI_SEARCH_SOURCES_NAME,
};
use crate::analytics::Analytics;
use crate::error::MeilisearchHttpError;
use crate::extractors::authentication::policies::ActionPolicy;
use crate::extractors::authentication::{extract_token_from_request, GuardedData, Policy as _};
@ -64,6 +66,7 @@ async fn chat(
req: HttpRequest,
search_queue: web::Data<SearchQueue>,
web::Json(chat_completion): web::Json<CreateChatCompletionRequest>,
analytics: web::Data<Analytics>,
) -> impl Responder {
let ChatsParam { workspace_uid } = chats_param.into_inner();
@ -76,6 +79,7 @@ async fn chat(
&workspace_uid,
req,
chat_completion,
analytics,
)
.await,
)
@ -88,6 +92,7 @@ async fn chat(
&workspace_uid,
req,
chat_completion,
analytics,
)
.await,
)
@ -315,9 +320,18 @@ async fn non_streamed_chat(
workspace_uid: &str,
req: HttpRequest,
chat_completion: CreateChatCompletionRequest,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
index_scheduler.features().check_chat_completions("using the /chats chat completions route")?;
// Create analytics aggregator
let aggregate = ChatCompletionAggregator::from_request(
&chat_completion.model,
chat_completion.messages.len(),
false, // non_streamed_chat is not streaming
);
let start_time = std::time::Instant::now();
if let Some(n) = chat_completion.n.filter(|&n| n != 1) {
return Err(ResponseError::from_msg(
format!("You tried to specify n = {n} but only single choices are supported (n = 1)."),
@ -414,6 +428,11 @@ async fn non_streamed_chat(
}
}
// Record success in analytics
let mut aggregate = aggregate;
aggregate.succeed(start_time.elapsed());
analytics.publish(aggregate, &req);
Ok(HttpResponse::Ok().json(response))
}
@ -424,6 +443,7 @@ async fn streamed_chat(
workspace_uid: &str,
req: HttpRequest,
mut chat_completion: CreateChatCompletionRequest,
analytics: web::Data<Analytics>,
) -> Result<impl Responder, ResponseError> {
index_scheduler.features().check_chat_completions("using the /chats chat completions route")?;
let filters = index_scheduler.filters();
@ -445,6 +465,14 @@ async fn streamed_chat(
}
};
// Create analytics aggregator
let mut aggregate = ChatCompletionAggregator::from_request(
&chat_completion.model,
chat_completion.messages.len(),
true, // streamed_chat is always streaming
);
let start_time = std::time::Instant::now();
let config = Config::new(&chat_settings);
let auth_token = extract_token_from_request(&req)?.unwrap().to_string();
let system_role = chat_settings.source.system_role(&chat_completion.model);
@ -490,6 +518,10 @@ async fn streamed_chat(
let _ = tx.stop().await;
});
// Record success in analytics after the stream is set up
aggregate.succeed(start_time.elapsed());
analytics.publish(aggregate, &req);
Ok(Sse::from_infallible_receiver(rx).with_retry_duration(Duration::from_secs(10)))
}

View File

@ -19,6 +19,7 @@ use crate::extractors::authentication::policies::ActionPolicy;
use crate::extractors::authentication::GuardedData;
use crate::routes::PAGINATION_DEFAULT_LIMIT;
mod chat_completion_analytics;
pub mod chat_completions;
mod config;
mod errors;