Remove the write txn method from the index scheduler

This commit is contained in:
Kerollmops 2025-06-10 14:03:05 +02:00
parent b32e30ad27
commit bbe802c656
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
4 changed files with 23 additions and 35 deletions

View File

@ -56,7 +56,7 @@ use meilisearch_types::features::{
}; };
use meilisearch_types::heed::byteorder::BE; use meilisearch_types::heed::byteorder::BE;
use meilisearch_types::heed::types::{DecodeIgnore, SerdeJson, Str, I128}; use meilisearch_types::heed::types::{DecodeIgnore, SerdeJson, Str, I128};
use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn, WithoutTls}; use meilisearch_types::heed::{self, Database, Env, RoTxn, WithoutTls};
use meilisearch_types::milli::index::IndexEmbeddingConfig; use meilisearch_types::milli::index::IndexEmbeddingConfig;
use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs}; use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs};
@ -311,11 +311,7 @@ impl IndexScheduler {
Ok(this) Ok(this)
} }
pub fn write_txn(&self) -> Result<RwTxn> { fn read_txn(&self) -> Result<RoTxn<WithoutTls>> {
self.env.write_txn().map_err(|e| e.into())
}
pub fn read_txn(&self) -> Result<RoTxn<WithoutTls>> {
self.env.read_txn().map_err(|e| e.into()) self.env.read_txn().map_err(|e| e.into())
} }
@ -901,8 +897,9 @@ impl IndexScheduler {
res.map(EmbeddingConfigs::new) res.map(EmbeddingConfigs::new)
} }
pub fn chat_settings(&self, rtxn: &RoTxn, uid: &str) -> Result<Option<ChatCompletionSettings>> { pub fn chat_settings(&self, uid: &str) -> Result<Option<ChatCompletionSettings>> {
self.chat_settings.get(rtxn, uid).map_err(Into::into) let rtxn = self.env.read_txn()?;
self.chat_settings.get(&rtxn, uid).map_err(Into::into)
} }
/// Return true if chat workspace exists. /// Return true if chat workspace exists.
@ -911,17 +908,18 @@ impl IndexScheduler {
Ok(self.chat_settings.remap_data_type::<DecodeIgnore>().get(&rtxn, name)?.is_some()) Ok(self.chat_settings.remap_data_type::<DecodeIgnore>().get(&rtxn, name)?.is_some())
} }
pub fn put_chat_settings( pub fn put_chat_settings(&self, uid: &str, settings: &ChatCompletionSettings) -> Result<()> {
&self, let mut wtxn = self.env.write_txn()?;
wtxn: &mut RwTxn, self.chat_settings.put(&mut wtxn, uid, settings)?;
uid: &str, wtxn.commit()?;
settings: &ChatCompletionSettings, Ok(())
) -> Result<()> {
self.chat_settings.put(wtxn, uid, settings).map_err(Into::into)
} }
pub fn delete_chat_settings(&self, wtxn: &mut RwTxn, uid: &str) -> Result<bool> { pub fn delete_chat_settings(&self, uid: &str) -> Result<bool> {
self.chat_settings.delete(wtxn, uid).map_err(Into::into) let mut wtxn = self.env.write_txn()?;
let deleted = self.chat_settings.delete(&mut wtxn, uid)?;
wtxn.commit()?;
Ok(deleted)
} }
} }

View File

@ -315,8 +315,7 @@ async fn non_streamed_chat(
)); ));
let filters = index_scheduler.filters(); let filters = index_scheduler.filters();
let rtxn = index_scheduler.read_txn()?; let chat_settings = match index_scheduler.chat_settings(workspace_uid).unwrap() {
let chat_settings = match index_scheduler.chat_settings(&rtxn, workspace_uid).unwrap() {
Some(settings) => settings, Some(settings) => settings,
None => { None => {
return Err(ResponseError::from_msg( return Err(ResponseError::from_msg(
@ -413,8 +412,7 @@ async fn streamed_chat(
index_scheduler.features().check_chat_completions("using the /chats chat completions route")?; index_scheduler.features().check_chat_completions("using the /chats chat completions route")?;
let filters = index_scheduler.filters(); let filters = index_scheduler.filters();
let rtxn = index_scheduler.read_txn()?; let chat_settings = match index_scheduler.chat_settings(workspace_uid)? {
let chat_settings = match index_scheduler.chat_settings(&rtxn, workspace_uid)? {
Some(settings) => settings, Some(settings) => settings,
None => { None => {
return Err(ResponseError::from_msg( return Err(ResponseError::from_msg(
@ -423,7 +421,6 @@ async fn streamed_chat(
)) ))
} }
}; };
drop(rtxn);
let config = Config::new(&chat_settings); let config = Config::new(&chat_settings);
let auth_token = extract_token_from_request(&req)?.unwrap().to_string(); let auth_token = extract_token_from_request(&req)?.unwrap().to_string();

View File

@ -72,10 +72,8 @@ pub async fn delete_chat(
) -> Result<HttpResponse, ResponseError> { ) -> Result<HttpResponse, ResponseError> {
index_scheduler.features().check_chat_completions("deleting a chat")?; index_scheduler.features().check_chat_completions("deleting a chat")?;
let mut wtxn = index_scheduler.write_txn()?;
let workspace_uid = workspace_uid.into_inner(); let workspace_uid = workspace_uid.into_inner();
if index_scheduler.delete_chat_settings(&mut wtxn, &workspace_uid)? { if index_scheduler.delete_chat_settings(&workspace_uid)? {
wtxn.commit()?;
Ok(HttpResponse::NoContent().finish()) Ok(HttpResponse::NoContent().finish())
} else { } else {
Err(ResponseError::from_msg(format!("chat {workspace_uid} not found"), Code::ChatNotFound)) Err(ResponseError::from_msg(format!("chat {workspace_uid} not found"), Code::ChatNotFound))

View File

@ -42,8 +42,7 @@ async fn get_settings(
let ChatsParam { workspace_uid } = chats_param.into_inner(); let ChatsParam { workspace_uid } = chats_param.into_inner();
// TODO do a spawn_blocking here ??? // TODO do a spawn_blocking here ???
let rtxn = index_scheduler.read_txn()?; let mut settings = match index_scheduler.chat_settings(&workspace_uid)? {
let mut settings = match index_scheduler.chat_settings(&rtxn, &workspace_uid)? {
Some(settings) => settings, Some(settings) => settings,
None => { None => {
return Err(ResponseError::from_msg( return Err(ResponseError::from_msg(
@ -68,8 +67,7 @@ async fn patch_settings(
let ChatsParam { workspace_uid } = chats_param.into_inner(); let ChatsParam { workspace_uid } = chats_param.into_inner();
// TODO do a spawn_blocking here // TODO do a spawn_blocking here
let mut wtxn = index_scheduler.write_txn()?; let old_settings = index_scheduler.chat_settings(&workspace_uid)?.unwrap_or_default();
let old_settings = index_scheduler.chat_settings(&wtxn, &workspace_uid)?.unwrap_or_default();
let prompts = match new.prompts { let prompts = match new.prompts {
Setting::Set(new_prompts) => DbChatCompletionPrompts { Setting::Set(new_prompts) => DbChatCompletionPrompts {
@ -147,8 +145,7 @@ async fn patch_settings(
// ); // );
settings.validate()?; settings.validate()?;
index_scheduler.put_chat_settings(&mut wtxn, &workspace_uid, &settings)?; index_scheduler.put_chat_settings(&workspace_uid, &settings)?;
wtxn.commit()?;
settings.hide_secrets(); settings.hide_secrets();
@ -165,11 +162,9 @@ async fn reset_settings(
index_scheduler.features().check_chat_completions("using the /chats/settings route")?; index_scheduler.features().check_chat_completions("using the /chats/settings route")?;
let ChatsParam { workspace_uid } = chats_param.into_inner(); let ChatsParam { workspace_uid } = chats_param.into_inner();
let mut wtxn = index_scheduler.write_txn()?; if index_scheduler.chat_settings(&workspace_uid)?.is_some() {
if index_scheduler.chat_settings(&wtxn, &workspace_uid)?.is_some() {
let settings = Default::default(); let settings = Default::default();
index_scheduler.put_chat_settings(&mut wtxn, &workspace_uid, &settings)?; index_scheduler.put_chat_settings(&workspace_uid, &settings)?;
wtxn.commit()?;
Ok(HttpResponse::Ok().json(settings)) Ok(HttpResponse::Ok().json(settings))
} else { } else {
Err(ResponseError::from_msg( Err(ResponseError::from_msg(