From 362836efb7d5924a485fa3e15171257f40214509 Mon Sep 17 00:00:00 2001 From: Tamo Date: Mon, 28 Oct 2024 11:57:02 +0100 Subject: [PATCH] make an upgrade module where we'll be able to shove each version instead of putting everything in the same file --- crates/meilitool/src/main.rs | 428 +-------------------------------- meilitool/src/upgrade/mod.rs | 46 ++++ meilitool/src/upgrade/v1_10.rs | 279 +++++++++++++++++++++ meilitool/src/upgrade/v1_9.rs | 100 ++++++++ 4 files changed, 430 insertions(+), 423 deletions(-) create mode 100644 meilitool/src/upgrade/mod.rs create mode 100644 meilitool/src/upgrade/v1_10.rs create mode 100644 meilitool/src/upgrade/v1_9.rs diff --git a/crates/meilitool/src/main.rs b/crates/meilitool/src/main.rs index 9dbff2486..ef137f746 100644 --- a/crates/meilitool/src/main.rs +++ b/crates/meilitool/src/main.rs @@ -2,7 +2,7 @@ use std::fs::{read_dir, read_to_string, remove_file, File}; use std::io::BufWriter; use std::path::PathBuf; -use anyhow::{bail, Context}; +use anyhow::Context; use clap::{Parser, Subcommand}; use dump::{DumpWriter, IndexMetadata}; use file_store::FileStore; @@ -10,15 +10,16 @@ use meilisearch_auth::AuthController; use meilisearch_types::heed::types::{SerdeJson, Str}; use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; -use meilisearch_types::milli::index::{db_name, main_key}; use meilisearch_types::milli::{obkv_to_json, BEU32}; use meilisearch_types::tasks::{Status, Task}; -use meilisearch_types::versioning::{create_version_file, get_version, parse_version}; +use meilisearch_types::versioning::{get_version, parse_version}; use meilisearch_types::Index; use time::macros::format_description; use time::OffsetDateTime; +use upgrade::OfflineUpgrade; use uuid_codec::UuidCodec; +mod upgrade; mod uuid_codec; #[derive(Parser)] @@ -72,7 +73,7 @@ enum Command { /// /// Supported upgrade paths: /// - /// - v1.9.0 -> v1.10.0 + /// - v1.9.0 -> v1.10.0 -> v1.11.0 OfflineUpgrade { #[arg(long)] target_version: String, @@ -96,425 +97,6 @@ fn main() -> anyhow::Result<()> { } } -struct OfflineUpgrade { - db_path: PathBuf, - current_version: (String, String, String), - target_version: (String, String, String), -} - -impl OfflineUpgrade { - fn upgrade(self) -> anyhow::Result<()> { - // TODO: if we make this process support more versions, introduce a more flexible way of checking for the version - // currently only supports v1.9 to v1.10 - let (current_major, current_minor, current_patch) = &self.current_version; - - match (current_major.as_str(), current_minor.as_str(), current_patch.as_str()) { - ("1", "9", _) => {} - _ => { - bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9") - } - } - - let (target_major, target_minor, target_patch) = &self.target_version; - - match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) { - ("1", "10", _) => {} - _ => { - bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to v1.10") - } - } - - println!("Upgrading from {current_major}.{current_minor}.{current_patch} to {target_major}.{target_minor}.{target_patch}"); - - self.v1_9_to_v1_10()?; - - println!("Writing VERSION file"); - - create_version_file(&self.db_path, target_major, target_minor, target_patch) - .context("while writing VERSION file after the upgrade")?; - - println!("Success"); - - Ok(()) - } - - fn v1_9_to_v1_10(&self) -> anyhow::Result<()> { - // 2 changes here - - // 1. date format. needs to be done before opening the Index - // 2. REST embedders. We don't support this case right now, so bail - - let index_scheduler_path = self.db_path.join("tasks"); - let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) } - .with_context(|| { - format!("While trying to open {:?}", index_scheduler_path.display()) - })?; - - let mut sched_wtxn = env.write_txn()?; - - let index_mapping: Database = - try_opening_database(&env, &sched_wtxn, "index-mapping")?; - - let index_stats: Database = - try_opening_database(&env, &sched_wtxn, "index-stats").with_context(|| { - format!("While trying to open {:?}", index_scheduler_path.display()) - })?; - - let index_count = - index_mapping.len(&sched_wtxn).context("while reading the number of indexes")?; - - // FIXME: not ideal, we have to pre-populate all indexes to prevent double borrow of sched_wtxn - // 1. immutably for the iteration - // 2. mutably for updating index stats - let indexes: Vec<_> = index_mapping - .iter(&sched_wtxn)? - .map(|res| res.map(|(uid, uuid)| (uid.to_owned(), uuid))) - .collect(); - - let mut rest_embedders = Vec::new(); - - let mut unwrapped_indexes = Vec::new(); - - // check that update can take place - for (index_index, result) in indexes.into_iter().enumerate() { - let (uid, uuid) = result?; - let index_path = self.db_path.join("indexes").join(uuid.to_string()); - - println!( - "[{}/{index_count}]Checking that update can take place for `{uid}` at `{}`", - index_index + 1, - index_path.display() - ); - - let index_env = unsafe { - // FIXME: fetch the 25 magic number from the index file - EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| { - format!("while opening index {uid} at '{}'", index_path.display()) - })? - }; - - let index_txn = index_env.read_txn().with_context(|| { - format!( - "while obtaining a write transaction for index {uid} at {}", - index_path.display() - ) - })?; - - println!("\t- Checking for incompatible embedders (REST embedders)"); - let rest_embedders_for_index = find_rest_embedders(&uid, &index_env, &index_txn)?; - - if rest_embedders_for_index.is_empty() { - unwrapped_indexes.push((uid, uuid)); - } else { - // no need to add to unwrapped indexes because we'll exit early - rest_embedders.push((uid, rest_embedders_for_index)); - } - } - - if !rest_embedders.is_empty() { - let rest_embedders = rest_embedders - .into_iter() - .flat_map(|(index, embedders)| std::iter::repeat(index.clone()).zip(embedders)) - .map(|(index, embedder)| format!("\t- embedder `{embedder}` in index `{index}`")) - .collect::>() - .join("\n"); - bail!("The update cannot take place because there are REST embedder(s). Remove them before proceeding with the update:\n{rest_embedders}\n\n\ - The database has not been modified and is still a valid v1.9 database."); - } - - println!("Update can take place, updating"); - - for (index_index, (uid, uuid)) in unwrapped_indexes.into_iter().enumerate() { - let index_path = self.db_path.join("indexes").join(uuid.to_string()); - - println!( - "[{}/{index_count}]Updating index `{uid}` at `{}`", - index_index + 1, - index_path.display() - ); - - let index_env = unsafe { - // FIXME: fetch the 25 magic number from the index file - EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| { - format!("while opening index {uid} at '{}'", index_path.display()) - })? - }; - - let mut index_wtxn = index_env.write_txn().with_context(|| { - format!( - "while obtaining a write transaction for index `{uid}` at `{}`", - index_path.display() - ) - })?; - - println!("\t- Updating index stats"); - update_index_stats(index_stats, &uid, uuid, &mut sched_wtxn)?; - println!("\t- Updating date format"); - update_date_format(&uid, &index_env, &mut index_wtxn)?; - - index_wtxn.commit().with_context(|| { - format!( - "while committing the write txn for index `{uid}` at {}", - index_path.display() - ) - })?; - } - - sched_wtxn.commit().context("while committing the write txn for the index-scheduler")?; - - println!("Upgrading database succeeded"); - - Ok(()) - } -} - -pub mod v1_9 { - pub type FieldDistribution = std::collections::BTreeMap; - - /// The statistics that can be computed from an `Index` object. - #[derive(serde::Serialize, serde::Deserialize, Debug)] - pub struct IndexStats { - /// Number of documents in the index. - pub number_of_documents: u64, - /// Size taken up by the index' DB, in bytes. - /// - /// This includes the size taken by both the used and free pages of the DB, and as the free pages - /// are not returned to the disk after a deletion, this number is typically larger than - /// `used_database_size` that only includes the size of the used pages. - pub database_size: u64, - /// Size taken by the used pages of the index' DB, in bytes. - /// - /// As the DB backend does not return to the disk the pages that are not currently used by the DB, - /// this value is typically smaller than `database_size`. - pub used_database_size: u64, - /// Association of every field name with the number of times it occurs in the documents. - pub field_distribution: FieldDistribution, - /// Creation date of the index. - pub created_at: time::OffsetDateTime, - /// Date of the last update of the index. - pub updated_at: time::OffsetDateTime, - } - - use serde::{Deserialize, Serialize}; - - #[derive(Debug, Deserialize, Serialize)] - pub struct IndexEmbeddingConfig { - pub name: String, - pub config: EmbeddingConfig, - } - - #[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)] - pub struct EmbeddingConfig { - /// Options of the embedder, specific to each kind of embedder - pub embedder_options: EmbedderOptions, - } - - /// Options of an embedder, specific to each kind of embedder. - #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] - pub enum EmbedderOptions { - HuggingFace(hf::EmbedderOptions), - OpenAi(openai::EmbedderOptions), - Ollama(ollama::EmbedderOptions), - UserProvided(manual::EmbedderOptions), - Rest(rest::EmbedderOptions), - } - - impl Default for EmbedderOptions { - fn default() -> Self { - Self::OpenAi(openai::EmbedderOptions { api_key: None, dimensions: None }) - } - } - - mod hf { - #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] - pub struct EmbedderOptions { - pub model: String, - pub revision: Option, - } - } - mod openai { - - #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] - pub struct EmbedderOptions { - pub api_key: Option, - pub dimensions: Option, - } - } - mod ollama { - #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] - pub struct EmbedderOptions { - pub embedding_model: String, - pub url: Option, - pub api_key: Option, - } - } - mod manual { - #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] - pub struct EmbedderOptions { - pub dimensions: usize, - } - } - mod rest { - #[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize, Hash)] - pub struct EmbedderOptions { - pub api_key: Option, - pub dimensions: Option, - pub url: String, - pub input_field: Vec, - // path to the array of embeddings - pub path_to_embeddings: Vec, - // shape of a single embedding - pub embedding_object: Vec, - } - } - - pub type OffsetDateTime = time::OffsetDateTime; -} - -pub mod v1_10 { - use crate::v1_9; - - pub type FieldDistribution = std::collections::BTreeMap; - - /// The statistics that can be computed from an `Index` object. - #[derive(serde::Serialize, serde::Deserialize, Debug)] - pub struct IndexStats { - /// Number of documents in the index. - pub number_of_documents: u64, - /// Size taken up by the index' DB, in bytes. - /// - /// This includes the size taken by both the used and free pages of the DB, and as the free pages - /// are not returned to the disk after a deletion, this number is typically larger than - /// `used_database_size` that only includes the size of the used pages. - pub database_size: u64, - /// Size taken by the used pages of the index' DB, in bytes. - /// - /// As the DB backend does not return to the disk the pages that are not currently used by the DB, - /// this value is typically smaller than `database_size`. - pub used_database_size: u64, - /// Association of every field name with the number of times it occurs in the documents. - pub field_distribution: FieldDistribution, - /// Creation date of the index. - #[serde(with = "time::serde::rfc3339")] - pub created_at: time::OffsetDateTime, - /// Date of the last update of the index. - #[serde(with = "time::serde::rfc3339")] - pub updated_at: time::OffsetDateTime, - } - - impl From for IndexStats { - fn from( - v1_9::IndexStats { - number_of_documents, - database_size, - used_database_size, - field_distribution, - created_at, - updated_at, - }: v1_9::IndexStats, - ) -> Self { - IndexStats { - number_of_documents, - database_size, - used_database_size, - field_distribution, - created_at, - updated_at, - } - } - } - - #[derive(serde::Serialize, serde::Deserialize)] - #[serde(transparent)] - pub struct OffsetDateTime(#[serde(with = "time::serde::rfc3339")] pub time::OffsetDateTime); -} - -fn update_index_stats( - index_stats: Database, - index_uid: &str, - index_uuid: uuid::Uuid, - sched_wtxn: &mut RwTxn, -) -> anyhow::Result<()> { - let ctx = || format!("while updating index stats for index `{index_uid}`"); - - let stats: Option = index_stats - .remap_data_type::>() - .get(sched_wtxn, &index_uuid) - .with_context(ctx)?; - - if let Some(stats) = stats { - let stats: v1_10::IndexStats = stats.into(); - - index_stats - .remap_data_type::>() - .put(sched_wtxn, &index_uuid, &stats) - .with_context(ctx)?; - } - - Ok(()) -} - -fn update_date_format( - index_uid: &str, - index_env: &Env, - index_wtxn: &mut RwTxn, -) -> anyhow::Result<()> { - let main = try_opening_poly_database(index_env, index_wtxn, db_name::MAIN) - .with_context(|| format!("while updating date format for index `{index_uid}`"))?; - - date_round_trip(index_wtxn, index_uid, main, main_key::CREATED_AT_KEY)?; - date_round_trip(index_wtxn, index_uid, main, main_key::UPDATED_AT_KEY)?; - - Ok(()) -} - -fn find_rest_embedders( - index_uid: &str, - index_env: &Env, - index_txn: &RoTxn, -) -> anyhow::Result> { - let main = try_opening_poly_database(index_env, index_txn, db_name::MAIN) - .with_context(|| format!("while checking REST embedders for index `{index_uid}`"))?; - - let mut rest_embedders = vec![]; - - for config in main - .remap_types::>>() - .get(index_txn, main_key::EMBEDDING_CONFIGS)? - .unwrap_or_default() - { - if let v1_9::EmbedderOptions::Rest(_) = config.config.embedder_options { - rest_embedders.push(config.name); - } - } - - Ok(rest_embedders) -} - -fn date_round_trip( - wtxn: &mut RwTxn, - index_uid: &str, - db: Database, - key: &str, -) -> anyhow::Result<()> { - let datetime = - db.remap_types::>().get(wtxn, key).with_context( - || format!("could not read `{key}` while updating date format for index `{index_uid}`"), - )?; - - if let Some(datetime) = datetime { - db.remap_types::>() - .put(wtxn, key, &v1_10::OffsetDateTime(datetime)) - .with_context(|| { - format!( - "could not write `{key}` while updating date format for index `{index_uid}`" - ) - })?; - } - - Ok(()) -} - /// Clears the task queue located at `db_path`. fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> { let path = db_path.join("tasks"); diff --git a/meilitool/src/upgrade/mod.rs b/meilitool/src/upgrade/mod.rs new file mode 100644 index 000000000..053c61c14 --- /dev/null +++ b/meilitool/src/upgrade/mod.rs @@ -0,0 +1,46 @@ +mod v1_10; +mod v1_9; + +use std::path::PathBuf; + +use anyhow::{bail, Context}; +use meilisearch_types::versioning::create_version_file; + +use v1_10::v1_9_to_v1_10; + +pub struct OfflineUpgrade { + pub db_path: PathBuf, + pub current_version: (String, String, String), + pub target_version: (String, String, String), +} + +impl OfflineUpgrade { + pub fn upgrade(self) -> anyhow::Result<()> { + let (current_major, current_minor, current_patch) = &self.current_version; + let (target_major, target_minor, target_patch) = &self.target_version; + + println!("Upgrading from {current_major}.{current_minor}.{current_patch} to {target_major}.{target_minor}.{target_patch}"); + + match ( + (current_major.as_str(), current_minor.as_str(), current_patch.as_str()), + (target_major.as_str(), target_minor.as_str(), target_patch.as_str()), + ) { + (("1", "9", _), ("1", "10", _)) => v1_9_to_v1_10(&self.db_path)?, + ((major, minor, _), _) if major != "1" && minor != "9" => + bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9"), + (_, (major, minor, _)) if major != "1" && minor != "10" => + bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to v1.10"), + _ => + bail!("Unsupported upgrade from {current_major}.{current_minor}.{current_patch} to {target_major}.{target_minor}.{target_patch}. Can only upgrade from v1.9 to v1.10"), + } + + println!("Writing VERSION file"); + + create_version_file(&self.db_path, target_major, target_minor, target_patch) + .context("while writing VERSION file after the upgrade")?; + + println!("Success"); + + Ok(()) + } +} diff --git a/meilitool/src/upgrade/v1_10.rs b/meilitool/src/upgrade/v1_10.rs new file mode 100644 index 000000000..96af99c39 --- /dev/null +++ b/meilitool/src/upgrade/v1_10.rs @@ -0,0 +1,279 @@ +use anyhow::bail; +use std::path::Path; + +use anyhow::Context; +use meilisearch_types::{ + heed::{ + types::{SerdeJson, Str}, + Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified, + }, + milli::index::{db_name, main_key}, +}; + +use crate::{try_opening_database, try_opening_poly_database, uuid_codec::UuidCodec}; + +use super::v1_9; + +pub type FieldDistribution = std::collections::BTreeMap; + +/// The statistics that can be computed from an `Index` object. +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct IndexStats { + /// Number of documents in the index. + pub number_of_documents: u64, + /// Size taken up by the index' DB, in bytes. + /// + /// This includes the size taken by both the used and free pages of the DB, and as the free pages + /// are not returned to the disk after a deletion, this number is typically larger than + /// `used_database_size` that only includes the size of the used pages. + pub database_size: u64, + /// Size taken by the used pages of the index' DB, in bytes. + /// + /// As the DB backend does not return to the disk the pages that are not currently used by the DB, + /// this value is typically smaller than `database_size`. + pub used_database_size: u64, + /// Association of every field name with the number of times it occurs in the documents. + pub field_distribution: FieldDistribution, + /// Creation date of the index. + #[serde(with = "time::serde::rfc3339")] + pub created_at: time::OffsetDateTime, + /// Date of the last update of the index. + #[serde(with = "time::serde::rfc3339")] + pub updated_at: time::OffsetDateTime, +} + +impl From for IndexStats { + fn from( + v1_9::IndexStats { + number_of_documents, + database_size, + used_database_size, + field_distribution, + created_at, + updated_at, + }: v1_9::IndexStats, + ) -> Self { + IndexStats { + number_of_documents, + database_size, + used_database_size, + field_distribution, + created_at, + updated_at, + } + } +} + +#[derive(serde::Serialize, serde::Deserialize)] +#[serde(transparent)] +pub struct OffsetDateTime(#[serde(with = "time::serde::rfc3339")] pub time::OffsetDateTime); + +fn update_index_stats( + index_stats: Database, + index_uid: &str, + index_uuid: uuid::Uuid, + sched_wtxn: &mut RwTxn, +) -> anyhow::Result<()> { + let ctx = || format!("while updating index stats for index `{index_uid}`"); + + let stats: Option = index_stats + .remap_data_type::>() + .get(sched_wtxn, &index_uuid) + .with_context(ctx)?; + + if let Some(stats) = stats { + let stats: self::IndexStats = stats.into(); + + index_stats + .remap_data_type::>() + .put(sched_wtxn, &index_uuid, &stats) + .with_context(ctx)?; + } + + Ok(()) +} + +fn update_date_format( + index_uid: &str, + index_env: &Env, + index_wtxn: &mut RwTxn, +) -> anyhow::Result<()> { + let main = try_opening_poly_database(index_env, index_wtxn, db_name::MAIN) + .with_context(|| format!("while updating date format for index `{index_uid}`"))?; + + date_round_trip(index_wtxn, index_uid, main, main_key::CREATED_AT_KEY)?; + date_round_trip(index_wtxn, index_uid, main, main_key::UPDATED_AT_KEY)?; + + Ok(()) +} + +fn find_rest_embedders( + index_uid: &str, + index_env: &Env, + index_txn: &RoTxn, +) -> anyhow::Result> { + let main = try_opening_poly_database(index_env, index_txn, db_name::MAIN) + .with_context(|| format!("while checking REST embedders for index `{index_uid}`"))?; + + let mut rest_embedders = vec![]; + + for config in main + .remap_types::>>() + .get(index_txn, main_key::EMBEDDING_CONFIGS)? + .unwrap_or_default() + { + if let v1_9::EmbedderOptions::Rest(_) = config.config.embedder_options { + rest_embedders.push(config.name); + } + } + + Ok(rest_embedders) +} + +fn date_round_trip( + wtxn: &mut RwTxn, + index_uid: &str, + db: Database, + key: &str, +) -> anyhow::Result<()> { + let datetime = + db.remap_types::>().get(wtxn, key).with_context( + || format!("could not read `{key}` while updating date format for index `{index_uid}`"), + )?; + + if let Some(datetime) = datetime { + db.remap_types::>() + .put(wtxn, key, &self::OffsetDateTime(datetime)) + .with_context(|| { + format!( + "could not write `{key}` while updating date format for index `{index_uid}`" + ) + })?; + } + + Ok(()) +} + +pub fn v1_9_to_v1_10(db_path: &Path) -> anyhow::Result<()> { + // 2 changes here + + // 1. date format. needs to be done before opening the Index + // 2. REST embedders. We don't support this case right now, so bail + + let index_scheduler_path = db_path.join("tasks"); + let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) } + .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; + + let mut sched_wtxn = env.write_txn()?; + + let index_mapping: Database = + try_opening_database(&env, &sched_wtxn, "index-mapping")?; + + let index_stats: Database = + try_opening_database(&env, &sched_wtxn, "index-stats").with_context(|| { + format!("While trying to open {:?}", index_scheduler_path.display()) + })?; + + let index_count = + index_mapping.len(&sched_wtxn).context("while reading the number of indexes")?; + + // FIXME: not ideal, we have to pre-populate all indexes to prevent double borrow of sched_wtxn + // 1. immutably for the iteration + // 2. mutably for updating index stats + let indexes: Vec<_> = index_mapping + .iter(&sched_wtxn)? + .map(|res| res.map(|(uid, uuid)| (uid.to_owned(), uuid))) + .collect(); + + let mut rest_embedders = Vec::new(); + + let mut unwrapped_indexes = Vec::new(); + + // check that update can take place + for (index_index, result) in indexes.into_iter().enumerate() { + let (uid, uuid) = result?; + let index_path = db_path.join("indexes").join(uuid.to_string()); + + println!( + "[{}/{index_count}]Checking that update can take place for `{uid}` at `{}`", + index_index + 1, + index_path.display() + ); + + let index_env = unsafe { + // FIXME: fetch the 25 magic number from the index file + EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| { + format!("while opening index {uid} at '{}'", index_path.display()) + })? + }; + + let index_txn = index_env.read_txn().with_context(|| { + format!( + "while obtaining a write transaction for index {uid} at {}", + index_path.display() + ) + })?; + + println!("\t- Checking for incompatible embedders (REST embedders)"); + let rest_embedders_for_index = find_rest_embedders(&uid, &index_env, &index_txn)?; + + if rest_embedders_for_index.is_empty() { + unwrapped_indexes.push((uid, uuid)); + } else { + // no need to add to unwrapped indexes because we'll exit early + rest_embedders.push((uid, rest_embedders_for_index)); + } + } + + if !rest_embedders.is_empty() { + let rest_embedders = rest_embedders + .into_iter() + .flat_map(|(index, embedders)| std::iter::repeat(index.clone()).zip(embedders)) + .map(|(index, embedder)| format!("\t- embedder `{embedder}` in index `{index}`")) + .collect::>() + .join("\n"); + bail!("The update cannot take place because there are REST embedder(s). Remove them before proceeding with the update:\n{rest_embedders}\n\n\ + The database has not been modified and is still a valid v1.9 database."); + } + + println!("Update can take place, updating"); + + for (index_index, (uid, uuid)) in unwrapped_indexes.into_iter().enumerate() { + let index_path = db_path.join("indexes").join(uuid.to_string()); + + println!( + "[{}/{index_count}]Updating index `{uid}` at `{}`", + index_index + 1, + index_path.display() + ); + + let index_env = unsafe { + // FIXME: fetch the 25 magic number from the index file + EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| { + format!("while opening index {uid} at '{}'", index_path.display()) + })? + }; + + let mut index_wtxn = index_env.write_txn().with_context(|| { + format!( + "while obtaining a write transaction for index `{uid}` at `{}`", + index_path.display() + ) + })?; + + println!("\t- Updating index stats"); + update_index_stats(index_stats, &uid, uuid, &mut sched_wtxn)?; + println!("\t- Updating date format"); + update_date_format(&uid, &index_env, &mut index_wtxn)?; + + index_wtxn.commit().with_context(|| { + format!("while committing the write txn for index `{uid}` at {}", index_path.display()) + })?; + } + + sched_wtxn.commit().context("while committing the write txn for the index-scheduler")?; + + println!("Upgrading database succeeded"); + + Ok(()) +} diff --git a/meilitool/src/upgrade/v1_9.rs b/meilitool/src/upgrade/v1_9.rs new file mode 100644 index 000000000..faa2d9814 --- /dev/null +++ b/meilitool/src/upgrade/v1_9.rs @@ -0,0 +1,100 @@ +use serde::{Deserialize, Serialize}; + +pub type FieldDistribution = std::collections::BTreeMap; + +/// The statistics that can be computed from an `Index` object. +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct IndexStats { + /// Number of documents in the index. + pub number_of_documents: u64, + /// Size taken up by the index' DB, in bytes. + /// + /// This includes the size taken by both the used and free pages of the DB, and as the free pages + /// are not returned to the disk after a deletion, this number is typically larger than + /// `used_database_size` that only includes the size of the used pages. + pub database_size: u64, + /// Size taken by the used pages of the index' DB, in bytes. + /// + /// As the DB backend does not return to the disk the pages that are not currently used by the DB, + /// this value is typically smaller than `database_size`. + pub used_database_size: u64, + /// Association of every field name with the number of times it occurs in the documents. + pub field_distribution: FieldDistribution, + /// Creation date of the index. + pub created_at: time::OffsetDateTime, + /// Date of the last update of the index. + pub updated_at: time::OffsetDateTime, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct IndexEmbeddingConfig { + pub name: String, + pub config: EmbeddingConfig, +} + +#[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)] +pub struct EmbeddingConfig { + /// Options of the embedder, specific to each kind of embedder + pub embedder_options: EmbedderOptions, +} + +/// Options of an embedder, specific to each kind of embedder. +#[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] +pub enum EmbedderOptions { + HuggingFace(hf::EmbedderOptions), + OpenAi(openai::EmbedderOptions), + Ollama(ollama::EmbedderOptions), + UserProvided(manual::EmbedderOptions), + Rest(rest::EmbedderOptions), +} + +impl Default for EmbedderOptions { + fn default() -> Self { + Self::OpenAi(openai::EmbedderOptions { api_key: None, dimensions: None }) + } +} + +mod hf { + #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] + pub struct EmbedderOptions { + pub model: String, + pub revision: Option, + } +} +mod openai { + + #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] + pub struct EmbedderOptions { + pub api_key: Option, + pub dimensions: Option, + } +} +mod ollama { + #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] + pub struct EmbedderOptions { + pub embedding_model: String, + pub url: Option, + pub api_key: Option, + } +} +mod manual { + #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] + pub struct EmbedderOptions { + pub dimensions: usize, + } +} +mod rest { + #[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize, Hash)] + pub struct EmbedderOptions { + pub api_key: Option, + pub dimensions: Option, + pub url: String, + pub input_field: Vec, + // path to the array of embeddings + pub path_to_embeddings: Vec, + // shape of a single embedding + pub embedding_object: Vec, + } +} + +pub type OffsetDateTime = time::OffsetDateTime;