diff --git a/crates/dump/src/reader/mod.rs b/crates/dump/src/reader/mod.rs index 2b4440ab7..23e7eec9e 100644 --- a/crates/dump/src/reader/mod.rs +++ b/crates/dump/src/reader/mod.rs @@ -116,6 +116,15 @@ impl DumpReader { } } + pub fn chat_completions_settings( + &mut self, + ) -> Result> + '_>> { + match self { + DumpReader::Current(current) => current.chat_completions_settings(), + DumpReader::Compat(_compat) => Ok(Box::new(std::iter::empty())), + } + } + pub fn features(&self) -> Result> { match self { DumpReader::Current(current) => Ok(current.features()), diff --git a/crates/dump/src/reader/v6/mod.rs b/crates/dump/src/reader/v6/mod.rs index 0b4ba5bdd..449a7e5fe 100644 --- a/crates/dump/src/reader/v6/mod.rs +++ b/crates/dump/src/reader/v6/mod.rs @@ -1,3 +1,4 @@ +use std::ffi::OsStr; use std::fs::{self, File}; use std::io::{BufRead, BufReader, ErrorKind}; use std::path::Path; @@ -21,6 +22,7 @@ pub type Unchecked = meilisearch_types::settings::Unchecked; pub type Task = crate::TaskDump; pub type Batch = meilisearch_types::batches::Batch; pub type Key = meilisearch_types::keys::Key; +pub type ChatCompletionSettings = meilisearch_types::features::ChatCompletionSettings; pub type RuntimeTogglableFeatures = meilisearch_types::features::RuntimeTogglableFeatures; pub type Network = meilisearch_types::features::Network; @@ -192,6 +194,34 @@ impl V6Reader { ) } + pub fn chat_completions_settings( + &mut self, + ) -> Result> + '_>> { + let entries = match fs::read_dir(self.dump.path().join("chat-completions-settings")) { + Ok(entries) => entries, + Err(e) if e.kind() == ErrorKind::NotFound => return Ok(Box::new(std::iter::empty())), + Err(e) => return Err(e.into()), + }; + Ok(Box::new( + entries + .map(|entry| -> Result> { + let entry = entry?; + let file_name = entry.file_name(); + let path = Path::new(&file_name); + if entry.file_type()?.is_file() && path.extension() == Some(OsStr::new("json")) + { + let name = path.file_stem().unwrap().to_str().unwrap().to_string(); + let file = File::open(entry.path())?; + let settings = serde_json::from_reader(file)?; + Ok(Some((name, settings))) + } else { + Ok(None) + } + }) + .filter_map(|entry| entry.transpose()), + )) + } + pub fn features(&self) -> Option { self.features } diff --git a/crates/dump/src/writer.rs b/crates/dump/src/writer.rs index 63b006b5c..9f828595a 100644 --- a/crates/dump/src/writer.rs +++ b/crates/dump/src/writer.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; use flate2::write::GzEncoder; use flate2::Compression; use meilisearch_types::batches::Batch; -use meilisearch_types::features::{Network, RuntimeTogglableFeatures}; +use meilisearch_types::features::{ChatCompletionSettings, Network, RuntimeTogglableFeatures}; use meilisearch_types::keys::Key; use meilisearch_types::settings::{Checked, Settings}; use serde_json::{Map, Value}; @@ -51,6 +51,10 @@ impl DumpWriter { KeyWriter::new(self.dir.path().to_path_buf()) } + pub fn create_chat_completions_settings(&self) -> Result { + ChatCompletionsSettingsWriter::new(self.dir.path().join("chat-completions-settings")) + } + pub fn create_tasks_queue(&self) -> Result { TaskWriter::new(self.dir.path().join("tasks")) } @@ -104,6 +108,24 @@ impl KeyWriter { } } +pub struct ChatCompletionsSettingsWriter { + path: PathBuf, +} + +impl ChatCompletionsSettingsWriter { + pub(crate) fn new(path: PathBuf) -> Result { + std::fs::create_dir(&path)?; + Ok(ChatCompletionsSettingsWriter { path }) + } + + pub fn push_settings(&mut self, name: &str, settings: &ChatCompletionSettings) -> Result<()> { + let mut settings_file = File::create(self.path.join(name).with_extension("json"))?; + serde_json::to_writer(&mut settings_file, &settings)?; + settings_file.flush()?; + Ok(()) + } +} + pub struct TaskWriter { queue: BufWriter, update_files: PathBuf, diff --git a/crates/index-scheduler/src/processing.rs b/crates/index-scheduler/src/processing.rs index 2aa7cf859..fdd8e42ef 100644 --- a/crates/index-scheduler/src/processing.rs +++ b/crates/index-scheduler/src/processing.rs @@ -103,6 +103,7 @@ make_enum_progress! { pub enum DumpCreationProgress { StartTheDumpCreation, DumpTheApiKeys, + DumpTheChatCompletionSettings, DumpTheTasks, DumpTheBatches, DumpTheIndexes, diff --git a/crates/index-scheduler/src/scheduler/process_dump_creation.rs b/crates/index-scheduler/src/scheduler/process_dump_creation.rs index ec1be0e93..b8d100415 100644 --- a/crates/index-scheduler/src/scheduler/process_dump_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_dump_creation.rs @@ -43,7 +43,16 @@ impl IndexScheduler { let rtxn = self.env.read_txn()?; - // 2. dump the tasks + // 2. dump the chat completion settings + // TODO should I skip the export if the chat completion has been disabled? + progress.update_progress(DumpCreationProgress::DumpTheChatCompletionSettings); + let mut dump_chat_completion_settings = dump.create_chat_completions_settings()?; + for result in self.chat_settings.iter(&rtxn)? { + let (name, chat_settings) = result?; + dump_chat_completion_settings.push_settings(name, &chat_settings)?; + } + + // 3. dump the tasks progress.update_progress(DumpCreationProgress::DumpTheTasks); let mut dump_tasks = dump.create_tasks_queue()?; @@ -81,7 +90,7 @@ impl IndexScheduler { let mut dump_content_file = dump_tasks.push_task(&t.into())?; - // 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. + // 3.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet. if let Some(content_file) = content_file { if self.scheduler.must_stop_processing.get() { return Err(Error::AbortedTask); @@ -105,7 +114,7 @@ impl IndexScheduler { } dump_tasks.flush()?; - // 3. dump the batches + // 4. dump the batches progress.update_progress(DumpCreationProgress::DumpTheBatches); let mut dump_batches = dump.create_batches_queue()?; @@ -138,7 +147,7 @@ impl IndexScheduler { } dump_batches.flush()?; - // 4. Dump the indexes + // 5. Dump the indexes progress.update_progress(DumpCreationProgress::DumpTheIndexes); let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32; let mut count = 0; @@ -175,7 +184,7 @@ impl IndexScheduler { let documents = index .all_documents(&rtxn) .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; - // 4.1. Dump the documents + // 5.1. Dump the documents for ret in documents { if self.scheduler.must_stop_processing.get() { return Err(Error::AbortedTask); @@ -233,7 +242,7 @@ impl IndexScheduler { atomic.fetch_add(1, Ordering::Relaxed); } - // 4.2. Dump the settings + // 5.2. Dump the settings let settings = meilisearch_types::settings::settings( index, &rtxn, @@ -244,7 +253,7 @@ impl IndexScheduler { Ok(()) })?; - // 5. Dump experimental feature settings + // 6. Dump experimental feature settings progress.update_progress(DumpCreationProgress::DumpTheExperimentalFeatures); let features = self.features().runtime_features(); dump.create_experimental_features(features)?; diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index e1acef2ce..43d7afe0e 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -498,14 +498,20 @@ fn import_dump( keys.push(key); } - // 3. Import the runtime features and network + // 3. Import the `ChatCompletionSettings`s. + for result in dump_reader.chat_completions_settings()? { + let (name, settings) = result?; + index_scheduler.put_chat_settings(&name, &settings)?; + } + + // 4. Import the runtime features and network let features = dump_reader.features()?.unwrap_or_default(); index_scheduler.put_runtime_features(features)?; let network = dump_reader.network()?.cloned().unwrap_or_default(); index_scheduler.put_network(network)?; - // 3.1 Use all cpus to process dump if `max_indexing_threads` not configured + // 4.1 Use all cpus to process dump if `max_indexing_threads` not configured let backup_config; let base_config = index_scheduler.indexer_config(); @@ -522,7 +528,7 @@ fn import_dump( // /!\ The tasks must be imported AFTER importing the indexes or else the scheduler might // try to process tasks while we're trying to import the indexes. - // 4. Import the indexes. + // 5. Import the indexes. for index_reader in dump_reader.indexes()? { let mut index_reader = index_reader?; let metadata = index_reader.metadata(); @@ -535,20 +541,20 @@ fn import_dump( let mut wtxn = index.write_txn()?; let mut builder = milli::update::Settings::new(&mut wtxn, &index, indexer_config); - // 4.1 Import the primary key if there is one. + // 5.1 Import the primary key if there is one. if let Some(ref primary_key) = metadata.primary_key { builder.set_primary_key(primary_key.to_string()); } - // 4.2 Import the settings. + // 5.2 Import the settings. tracing::info!("Importing the settings."); let settings = index_reader.settings()?; apply_settings_to_builder(&settings, &mut builder); let embedder_stats: Arc = Default::default(); builder.execute(&|| false, &progress, embedder_stats.clone())?; - // 4.3 Import the documents. - // 4.3.1 We need to recreate the grenad+obkv format accepted by the index. + // 5.3 Import the documents. + // 5.3.1 We need to recreate the grenad+obkv format accepted by the index. tracing::info!("Importing the documents."); let file = tempfile::tempfile()?; let mut builder = DocumentsBatchBuilder::new(BufWriter::new(file)); @@ -559,7 +565,7 @@ fn import_dump( // This flush the content of the batch builder. let file = builder.into_inner()?.into_inner()?; - // 4.3.2 We feed it to the milli index. + // 5.3.2 We feed it to the milli index. let reader = BufReader::new(file); let reader = DocumentsBatchReader::from_reader(reader)?; @@ -591,15 +597,15 @@ fn import_dump( index_scheduler.refresh_index_stats(&uid)?; } - // 5. Import the queue + // 6. Import the queue let mut index_scheduler_dump = index_scheduler.register_dumped_task()?; - // 5.1. Import the batches + // 6.1. Import the batches for ret in dump_reader.batches()? { let batch = ret?; index_scheduler_dump.register_dumped_batch(batch)?; } - // 5.2. Import the tasks + // 6.2. Import the tasks for ret in dump_reader.tasks()? { let (task, file) = ret?; index_scheduler_dump.register_dumped_task(task, file)?;