Add new analytics to the chat completions route

This commit is contained in:
Clément Renault 2025-06-24 17:05:49 +02:00
parent bd2bd0f33b
commit 5f50fc9464
No known key found for this signature in database
GPG key ID: F250A4C4E3AE5F5F
3 changed files with 172 additions and 0 deletions

View file

@ -36,6 +36,7 @@ use serde_json::json;
use tokio::runtime::Handle;
use tokio::sync::mpsc::error::SendError;
use super::chat_completion_analytics::{ChatCompletionAggregator, ChatCompletionPOST};
use super::config::Config;
use super::errors::{MistralError, OpenAiOutsideError, StreamErrorEvent};
use super::utils::format_documents;
@ -43,6 +44,7 @@ use super::{
ChatsParam, MEILI_APPEND_CONVERSATION_MESSAGE_NAME, MEILI_SEARCH_IN_INDEX_FUNCTION_NAME,
MEILI_SEARCH_PROGRESS_NAME, MEILI_SEARCH_SOURCES_NAME,
};
use crate::analytics::Analytics;
use crate::error::MeilisearchHttpError;
use crate::extractors::authentication::policies::ActionPolicy;
use crate::extractors::authentication::{extract_token_from_request, GuardedData, Policy as _};
@ -64,6 +66,7 @@ async fn chat(
req: HttpRequest,
search_queue: web::Data<SearchQueue>,
web::Json(chat_completion): web::Json<CreateChatCompletionRequest>,
analytics: web::Data<Analytics>,
) -> impl Responder {
let ChatsParam { workspace_uid } = chats_param.into_inner();
@ -76,6 +79,7 @@ async fn chat(
&workspace_uid,
req,
chat_completion,
analytics,
)
.await,
)
@ -88,6 +92,7 @@ async fn chat(
&workspace_uid,
req,
chat_completion,
analytics,
)
.await,
)
@ -315,9 +320,18 @@ async fn non_streamed_chat(
workspace_uid: &str,
req: HttpRequest,
chat_completion: CreateChatCompletionRequest,
analytics: web::Data<Analytics>,
) -> Result<HttpResponse, ResponseError> {
index_scheduler.features().check_chat_completions("using the /chats chat completions route")?;
// Create analytics aggregator
let aggregate = ChatCompletionAggregator::<ChatCompletionPOST>::from_request(
&chat_completion.model,
chat_completion.messages.len(),
false, // non_streamed_chat is not streaming
);
let start_time = std::time::Instant::now();
if let Some(n) = chat_completion.n.filter(|&n| n != 1) {
return Err(ResponseError::from_msg(
format!("You tried to specify n = {n} but only single choices are supported (n = 1)."),
@ -414,6 +428,11 @@ async fn non_streamed_chat(
}
}
// Record success in analytics
let mut aggregate = aggregate;
aggregate.succeed(start_time.elapsed());
analytics.publish(aggregate, &req);
Ok(HttpResponse::Ok().json(response))
}
@ -424,6 +443,7 @@ async fn streamed_chat(
workspace_uid: &str,
req: HttpRequest,
mut chat_completion: CreateChatCompletionRequest,
analytics: web::Data<Analytics>,
) -> Result<impl Responder, ResponseError> {
index_scheduler.features().check_chat_completions("using the /chats chat completions route")?;
let filters = index_scheduler.filters();
@ -445,6 +465,14 @@ async fn streamed_chat(
}
};
// Create analytics aggregator
let mut aggregate = ChatCompletionAggregator::<ChatCompletionPOST>::from_request(
&chat_completion.model,
chat_completion.messages.len(),
true, // streamed_chat is always streaming
);
let start_time = std::time::Instant::now();
let config = Config::new(&chat_settings);
let auth_token = extract_token_from_request(&req)?.unwrap().to_string();
let system_role = chat_settings.source.system_role(&chat_completion.model);
@ -490,6 +518,10 @@ async fn streamed_chat(
let _ = tx.stop().await;
});
// Record success in analytics after the stream is set up
aggregate.succeed(start_time.elapsed());
analytics.publish(aggregate, &req);
Ok(Sse::from_infallible_receiver(rx).with_retry_duration(Duration::from_secs(10)))
}