Implement a first version of a streamed chat API

This commit is contained in:
Clément Renault 2025-05-14 11:18:21 +02:00
parent 2cd85c732a
commit 0f05c0eb6f
No known key found for this signature in database
GPG key ID: F250A4C4E3AE5F5F
3 changed files with 180 additions and 17 deletions

View file

@ -1,7 +1,8 @@
use std::mem;
use actix_web::web::{self, Data};
use actix_web::HttpResponse;
use actix_web::{Either, HttpResponse, Responder};
use actix_web_lab::sse::{self, Event};
use async_openai::config::OpenAIConfig;
use async_openai::types::{
ChatCompletionRequestAssistantMessageArgs, ChatCompletionRequestMessage,
@ -10,6 +11,7 @@ use async_openai::types::{
FunctionObjectArgs,
};
use async_openai::Client;
use futures::StreamExt;
use index_scheduler::IndexScheduler;
use meilisearch_types::error::ResponseError;
use meilisearch_types::keys::actions;
@ -53,10 +55,22 @@ async fn chat(
index_scheduler: GuardedData<ActionPolicy<{ actions::CHAT_GET }>, Data<IndexScheduler>>,
search_queue: web::Data<SearchQueue>,
web::Json(mut chat_completion): web::Json<CreateChatCompletionRequest>,
) -> Result<HttpResponse, ResponseError> {
) -> impl Responder {
// To enable later on, when the feature will be experimental
// index_scheduler.features().check_chat("Using the /chat route")?;
if chat_completion.stream.unwrap_or(false) {
Either::Right(streamed_chat(index_scheduler, search_queue, chat_completion).await)
} else {
Either::Left(non_streamed_chat(index_scheduler, search_queue, chat_completion).await)
}
}
async fn non_streamed_chat(
index_scheduler: GuardedData<ActionPolicy<{ actions::CHAT_GET }>, Data<IndexScheduler>>,
search_queue: web::Data<SearchQueue>,
mut chat_completion: CreateChatCompletionRequest,
) -> Result<HttpResponse, ResponseError> {
let api_key = std::env::var("MEILI_OPENAI_API_KEY")
.expect("cannot find OpenAI API Key (MEILI_OPENAI_API_KEY)");
let config = OpenAIConfig::default().with_api_key(&api_key); // we can also change the API base
@ -119,7 +133,7 @@ async fn chat(
.build()
.unwrap(),
);
response = dbg!(client.chat().create(chat_completion.clone()).await.unwrap());
response = client.chat().create(chat_completion.clone()).await.unwrap();
let choice = &mut response.choices[0];
match choice.finish_reason {
@ -221,6 +235,24 @@ async fn chat(
Ok(HttpResponse::Ok().json(response))
}
async fn streamed_chat(
index_scheduler: GuardedData<ActionPolicy<{ actions::CHAT_GET }>, Data<IndexScheduler>>,
search_queue: web::Data<SearchQueue>,
mut chat_completion: CreateChatCompletionRequest,
) -> impl Responder {
assert!(chat_completion.stream.unwrap_or(false));
let api_key = std::env::var("MEILI_OPENAI_API_KEY")
.expect("cannot find OpenAI API Key (MEILI_OPENAI_API_KEY)");
let config = OpenAIConfig::default().with_api_key(&api_key); // we can also change the API base
let client = Client::with_config(config);
let response = client.chat().create_stream(chat_completion).await.unwrap();
actix_web_lab::sse::Sse::from_stream(response.map(|response| {
response
.map(|mut r| Event::Data(sse::Data::new_json(r.choices.pop().unwrap().delta).unwrap()))
}))
}
#[derive(Deserialize)]
struct SearchInIndexParameters {
/// The index uid to search in.