diff --git a/Cargo.lock b/Cargo.lock index cb413bc53..d0ff33438 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3520,6 +3520,7 @@ dependencies = [ "file-store", "meilisearch-auth", "meilisearch-types", + "serde", "time", "uuid", ] @@ -4834,9 +4835,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.204" +version = "1.0.209" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bc76f558e0cbb2a839d37354c575f1dc3fdc6546b5be373ba43d95f231bf7c12" +checksum = "99fce0ffe7310761ca6bf9faf5115afbc19688edd00171d81b1bb1b116c63e09" dependencies = [ "serde_derive", ] @@ -4852,9 +4853,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.204" +version = "1.0.209" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222" +checksum = "a5831b979fd7b5439637af1752d535ff49f4860c0f341d1baeb6faf0f4242170" dependencies = [ "proc-macro2", "quote", diff --git a/meilisearch-types/src/versioning.rs b/meilisearch-types/src/versioning.rs index 3c4726403..2ec9d9b0c 100644 --- a/meilisearch-types/src/versioning.rs +++ b/meilisearch-types/src/versioning.rs @@ -10,38 +10,52 @@ static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR"); static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH"); /// Persists the version of the current Meilisearch binary to a VERSION file -pub fn create_version_file(db_path: &Path) -> io::Result<()> { +pub fn create_current_version_file(db_path: &Path) -> io::Result<()> { + create_version_file(db_path, VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH) +} + +pub fn create_version_file( + db_path: &Path, + major: &str, + minor: &str, + patch: &str, +) -> io::Result<()> { let version_path = db_path.join(VERSION_FILE_NAME); - fs::write(version_path, format!("{}.{}.{}", VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH)) + fs::write(version_path, format!("{}.{}.{}", major, minor, patch)) } /// Ensures Meilisearch version is compatible with the database, returns an error versions mismatch. pub fn check_version_file(db_path: &Path) -> anyhow::Result<()> { - let version_path = db_path.join(VERSION_FILE_NAME); + let (major, minor, patch) = get_version(db_path)?; - match fs::read_to_string(version_path) { - Ok(version) => { - let version_components = version.split('.').collect::>(); - let (major, minor, patch) = match &version_components[..] { - [major, minor, patch] => (major.to_string(), minor.to_string(), patch.to_string()), - _ => return Err(VersionFileError::MalformedVersionFile.into()), - }; - - if major != VERSION_MAJOR || minor != VERSION_MINOR { - return Err(VersionFileError::VersionMismatch { major, minor, patch }.into()); - } - } - Err(error) => { - return match error.kind() { - ErrorKind::NotFound => Err(VersionFileError::MissingVersionFile.into()), - _ => Err(error.into()), - } - } + if major != VERSION_MAJOR || minor != VERSION_MINOR { + return Err(VersionFileError::VersionMismatch { major, minor, patch }.into()); } Ok(()) } +pub fn get_version(db_path: &Path) -> Result<(String, String, String), VersionFileError> { + let version_path = db_path.join(VERSION_FILE_NAME); + + match fs::read_to_string(version_path) { + Ok(version) => parse_version(&version), + Err(error) => match error.kind() { + ErrorKind::NotFound => Err(VersionFileError::MissingVersionFile), + _ => Err(error.into()), + }, + } +} + +pub fn parse_version(version: &str) -> Result<(String, String, String), VersionFileError> { + let version_components = version.split('.').collect::>(); + let (major, minor, patch) = match &version_components[..] { + [major, minor, patch] => (major.to_string(), minor.to_string(), patch.to_string()), + _ => return Err(VersionFileError::MalformedVersionFile), + }; + Ok((major, minor, patch)) +} + #[derive(thiserror::Error, Debug)] pub enum VersionFileError { #[error( @@ -58,4 +72,7 @@ pub enum VersionFileError { env!("CARGO_PKG_VERSION").to_string() )] VersionMismatch { major: String, minor: String, patch: String }, + + #[error(transparent)] + IoError(#[from] std::io::Error), } diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index 6f29ba10c..b24f18fae 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -36,7 +36,7 @@ use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchR use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod}; use meilisearch_types::settings::apply_settings_to_builder; use meilisearch_types::tasks::KindWithContent; -use meilisearch_types::versioning::{check_version_file, create_version_file}; +use meilisearch_types::versioning::{check_version_file, create_current_version_file}; use meilisearch_types::{compression, milli, VERSION_FILE_NAME}; pub use option::Opt; use option::ScheduleSnapshot; @@ -319,7 +319,7 @@ fn open_or_create_database_unchecked( match ( index_scheduler_builder(), auth_controller.map_err(anyhow::Error::from), - create_version_file(&opt.db_path).map_err(anyhow::Error::from), + create_current_version_file(&opt.db_path).map_err(anyhow::Error::from), ) { (Ok(i), Ok(a), Ok(())) => Ok((i, a)), (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => { diff --git a/meilitool/Cargo.toml b/meilitool/Cargo.toml index bf68b219e..ce6c1ad5b 100644 --- a/meilitool/Cargo.toml +++ b/meilitool/Cargo.toml @@ -15,5 +15,6 @@ dump = { path = "../dump" } file-store = { path = "../file-store" } meilisearch-auth = { path = "../meilisearch-auth" } meilisearch-types = { path = "../meilisearch-types" } +serde = { version = "1.0.209", features = ["derive"] } time = { version = "0.3.36", features = ["formatting"] } uuid = { version = "1.10.0", features = ["v4"], default-features = false } diff --git a/meilitool/src/main.rs b/meilitool/src/main.rs index 06c4890a5..9dbff2486 100644 --- a/meilitool/src/main.rs +++ b/meilitool/src/main.rs @@ -2,7 +2,7 @@ use std::fs::{read_dir, read_to_string, remove_file, File}; use std::io::BufWriter; use std::path::PathBuf; -use anyhow::Context; +use anyhow::{bail, Context}; use clap::{Parser, Subcommand}; use dump::{DumpWriter, IndexMetadata}; use file_store::FileStore; @@ -10,9 +10,10 @@ use meilisearch_auth::AuthController; use meilisearch_types::heed::types::{SerdeJson, Str}; use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; +use meilisearch_types::milli::index::{db_name, main_key}; use meilisearch_types::milli::{obkv_to_json, BEU32}; use meilisearch_types::tasks::{Status, Task}; -use meilisearch_types::versioning::check_version_file; +use meilisearch_types::versioning::{create_version_file, get_version, parse_version}; use meilisearch_types::Index; use time::macros::format_description; use time::OffsetDateTime; @@ -62,21 +63,458 @@ enum Command { #[arg(long)] skip_enqueued_tasks: bool, }, + + /// Attempts to upgrade from one major version to the next without a dump. + /// + /// Make sure to run this commmand when Meilisearch is not running! + /// If Meilisearch is running while executing this command, the database could be corrupted + /// (contain data from both the old and the new versions) + /// + /// Supported upgrade paths: + /// + /// - v1.9.0 -> v1.10.0 + OfflineUpgrade { + #[arg(long)] + target_version: String, + }, } fn main() -> anyhow::Result<()> { let Cli { db_path, command } = Cli::parse(); - check_version_file(&db_path).context("While checking the version file")?; + let detected_version = get_version(&db_path).context("While checking the version file")?; match command { Command::ClearTaskQueue => clear_task_queue(db_path), Command::ExportADump { dump_dir, skip_enqueued_tasks } => { export_a_dump(db_path, dump_dir, skip_enqueued_tasks) } + Command::OfflineUpgrade { target_version } => { + let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?; + OfflineUpgrade { db_path, current_version: detected_version, target_version }.upgrade() + } } } +struct OfflineUpgrade { + db_path: PathBuf, + current_version: (String, String, String), + target_version: (String, String, String), +} + +impl OfflineUpgrade { + fn upgrade(self) -> anyhow::Result<()> { + // TODO: if we make this process support more versions, introduce a more flexible way of checking for the version + // currently only supports v1.9 to v1.10 + let (current_major, current_minor, current_patch) = &self.current_version; + + match (current_major.as_str(), current_minor.as_str(), current_patch.as_str()) { + ("1", "9", _) => {} + _ => { + bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9") + } + } + + let (target_major, target_minor, target_patch) = &self.target_version; + + match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) { + ("1", "10", _) => {} + _ => { + bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to v1.10") + } + } + + println!("Upgrading from {current_major}.{current_minor}.{current_patch} to {target_major}.{target_minor}.{target_patch}"); + + self.v1_9_to_v1_10()?; + + println!("Writing VERSION file"); + + create_version_file(&self.db_path, target_major, target_minor, target_patch) + .context("while writing VERSION file after the upgrade")?; + + println!("Success"); + + Ok(()) + } + + fn v1_9_to_v1_10(&self) -> anyhow::Result<()> { + // 2 changes here + + // 1. date format. needs to be done before opening the Index + // 2. REST embedders. We don't support this case right now, so bail + + let index_scheduler_path = self.db_path.join("tasks"); + let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) } + .with_context(|| { + format!("While trying to open {:?}", index_scheduler_path.display()) + })?; + + let mut sched_wtxn = env.write_txn()?; + + let index_mapping: Database = + try_opening_database(&env, &sched_wtxn, "index-mapping")?; + + let index_stats: Database = + try_opening_database(&env, &sched_wtxn, "index-stats").with_context(|| { + format!("While trying to open {:?}", index_scheduler_path.display()) + })?; + + let index_count = + index_mapping.len(&sched_wtxn).context("while reading the number of indexes")?; + + // FIXME: not ideal, we have to pre-populate all indexes to prevent double borrow of sched_wtxn + // 1. immutably for the iteration + // 2. mutably for updating index stats + let indexes: Vec<_> = index_mapping + .iter(&sched_wtxn)? + .map(|res| res.map(|(uid, uuid)| (uid.to_owned(), uuid))) + .collect(); + + let mut rest_embedders = Vec::new(); + + let mut unwrapped_indexes = Vec::new(); + + // check that update can take place + for (index_index, result) in indexes.into_iter().enumerate() { + let (uid, uuid) = result?; + let index_path = self.db_path.join("indexes").join(uuid.to_string()); + + println!( + "[{}/{index_count}]Checking that update can take place for `{uid}` at `{}`", + index_index + 1, + index_path.display() + ); + + let index_env = unsafe { + // FIXME: fetch the 25 magic number from the index file + EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| { + format!("while opening index {uid} at '{}'", index_path.display()) + })? + }; + + let index_txn = index_env.read_txn().with_context(|| { + format!( + "while obtaining a write transaction for index {uid} at {}", + index_path.display() + ) + })?; + + println!("\t- Checking for incompatible embedders (REST embedders)"); + let rest_embedders_for_index = find_rest_embedders(&uid, &index_env, &index_txn)?; + + if rest_embedders_for_index.is_empty() { + unwrapped_indexes.push((uid, uuid)); + } else { + // no need to add to unwrapped indexes because we'll exit early + rest_embedders.push((uid, rest_embedders_for_index)); + } + } + + if !rest_embedders.is_empty() { + let rest_embedders = rest_embedders + .into_iter() + .flat_map(|(index, embedders)| std::iter::repeat(index.clone()).zip(embedders)) + .map(|(index, embedder)| format!("\t- embedder `{embedder}` in index `{index}`")) + .collect::>() + .join("\n"); + bail!("The update cannot take place because there are REST embedder(s). Remove them before proceeding with the update:\n{rest_embedders}\n\n\ + The database has not been modified and is still a valid v1.9 database."); + } + + println!("Update can take place, updating"); + + for (index_index, (uid, uuid)) in unwrapped_indexes.into_iter().enumerate() { + let index_path = self.db_path.join("indexes").join(uuid.to_string()); + + println!( + "[{}/{index_count}]Updating index `{uid}` at `{}`", + index_index + 1, + index_path.display() + ); + + let index_env = unsafe { + // FIXME: fetch the 25 magic number from the index file + EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| { + format!("while opening index {uid} at '{}'", index_path.display()) + })? + }; + + let mut index_wtxn = index_env.write_txn().with_context(|| { + format!( + "while obtaining a write transaction for index `{uid}` at `{}`", + index_path.display() + ) + })?; + + println!("\t- Updating index stats"); + update_index_stats(index_stats, &uid, uuid, &mut sched_wtxn)?; + println!("\t- Updating date format"); + update_date_format(&uid, &index_env, &mut index_wtxn)?; + + index_wtxn.commit().with_context(|| { + format!( + "while committing the write txn for index `{uid}` at {}", + index_path.display() + ) + })?; + } + + sched_wtxn.commit().context("while committing the write txn for the index-scheduler")?; + + println!("Upgrading database succeeded"); + + Ok(()) + } +} + +pub mod v1_9 { + pub type FieldDistribution = std::collections::BTreeMap; + + /// The statistics that can be computed from an `Index` object. + #[derive(serde::Serialize, serde::Deserialize, Debug)] + pub struct IndexStats { + /// Number of documents in the index. + pub number_of_documents: u64, + /// Size taken up by the index' DB, in bytes. + /// + /// This includes the size taken by both the used and free pages of the DB, and as the free pages + /// are not returned to the disk after a deletion, this number is typically larger than + /// `used_database_size` that only includes the size of the used pages. + pub database_size: u64, + /// Size taken by the used pages of the index' DB, in bytes. + /// + /// As the DB backend does not return to the disk the pages that are not currently used by the DB, + /// this value is typically smaller than `database_size`. + pub used_database_size: u64, + /// Association of every field name with the number of times it occurs in the documents. + pub field_distribution: FieldDistribution, + /// Creation date of the index. + pub created_at: time::OffsetDateTime, + /// Date of the last update of the index. + pub updated_at: time::OffsetDateTime, + } + + use serde::{Deserialize, Serialize}; + + #[derive(Debug, Deserialize, Serialize)] + pub struct IndexEmbeddingConfig { + pub name: String, + pub config: EmbeddingConfig, + } + + #[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)] + pub struct EmbeddingConfig { + /// Options of the embedder, specific to each kind of embedder + pub embedder_options: EmbedderOptions, + } + + /// Options of an embedder, specific to each kind of embedder. + #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] + pub enum EmbedderOptions { + HuggingFace(hf::EmbedderOptions), + OpenAi(openai::EmbedderOptions), + Ollama(ollama::EmbedderOptions), + UserProvided(manual::EmbedderOptions), + Rest(rest::EmbedderOptions), + } + + impl Default for EmbedderOptions { + fn default() -> Self { + Self::OpenAi(openai::EmbedderOptions { api_key: None, dimensions: None }) + } + } + + mod hf { + #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] + pub struct EmbedderOptions { + pub model: String, + pub revision: Option, + } + } + mod openai { + + #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] + pub struct EmbedderOptions { + pub api_key: Option, + pub dimensions: Option, + } + } + mod ollama { + #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] + pub struct EmbedderOptions { + pub embedding_model: String, + pub url: Option, + pub api_key: Option, + } + } + mod manual { + #[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)] + pub struct EmbedderOptions { + pub dimensions: usize, + } + } + mod rest { + #[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize, Hash)] + pub struct EmbedderOptions { + pub api_key: Option, + pub dimensions: Option, + pub url: String, + pub input_field: Vec, + // path to the array of embeddings + pub path_to_embeddings: Vec, + // shape of a single embedding + pub embedding_object: Vec, + } + } + + pub type OffsetDateTime = time::OffsetDateTime; +} + +pub mod v1_10 { + use crate::v1_9; + + pub type FieldDistribution = std::collections::BTreeMap; + + /// The statistics that can be computed from an `Index` object. + #[derive(serde::Serialize, serde::Deserialize, Debug)] + pub struct IndexStats { + /// Number of documents in the index. + pub number_of_documents: u64, + /// Size taken up by the index' DB, in bytes. + /// + /// This includes the size taken by both the used and free pages of the DB, and as the free pages + /// are not returned to the disk after a deletion, this number is typically larger than + /// `used_database_size` that only includes the size of the used pages. + pub database_size: u64, + /// Size taken by the used pages of the index' DB, in bytes. + /// + /// As the DB backend does not return to the disk the pages that are not currently used by the DB, + /// this value is typically smaller than `database_size`. + pub used_database_size: u64, + /// Association of every field name with the number of times it occurs in the documents. + pub field_distribution: FieldDistribution, + /// Creation date of the index. + #[serde(with = "time::serde::rfc3339")] + pub created_at: time::OffsetDateTime, + /// Date of the last update of the index. + #[serde(with = "time::serde::rfc3339")] + pub updated_at: time::OffsetDateTime, + } + + impl From for IndexStats { + fn from( + v1_9::IndexStats { + number_of_documents, + database_size, + used_database_size, + field_distribution, + created_at, + updated_at, + }: v1_9::IndexStats, + ) -> Self { + IndexStats { + number_of_documents, + database_size, + used_database_size, + field_distribution, + created_at, + updated_at, + } + } + } + + #[derive(serde::Serialize, serde::Deserialize)] + #[serde(transparent)] + pub struct OffsetDateTime(#[serde(with = "time::serde::rfc3339")] pub time::OffsetDateTime); +} + +fn update_index_stats( + index_stats: Database, + index_uid: &str, + index_uuid: uuid::Uuid, + sched_wtxn: &mut RwTxn, +) -> anyhow::Result<()> { + let ctx = || format!("while updating index stats for index `{index_uid}`"); + + let stats: Option = index_stats + .remap_data_type::>() + .get(sched_wtxn, &index_uuid) + .with_context(ctx)?; + + if let Some(stats) = stats { + let stats: v1_10::IndexStats = stats.into(); + + index_stats + .remap_data_type::>() + .put(sched_wtxn, &index_uuid, &stats) + .with_context(ctx)?; + } + + Ok(()) +} + +fn update_date_format( + index_uid: &str, + index_env: &Env, + index_wtxn: &mut RwTxn, +) -> anyhow::Result<()> { + let main = try_opening_poly_database(index_env, index_wtxn, db_name::MAIN) + .with_context(|| format!("while updating date format for index `{index_uid}`"))?; + + date_round_trip(index_wtxn, index_uid, main, main_key::CREATED_AT_KEY)?; + date_round_trip(index_wtxn, index_uid, main, main_key::UPDATED_AT_KEY)?; + + Ok(()) +} + +fn find_rest_embedders( + index_uid: &str, + index_env: &Env, + index_txn: &RoTxn, +) -> anyhow::Result> { + let main = try_opening_poly_database(index_env, index_txn, db_name::MAIN) + .with_context(|| format!("while checking REST embedders for index `{index_uid}`"))?; + + let mut rest_embedders = vec![]; + + for config in main + .remap_types::>>() + .get(index_txn, main_key::EMBEDDING_CONFIGS)? + .unwrap_or_default() + { + if let v1_9::EmbedderOptions::Rest(_) = config.config.embedder_options { + rest_embedders.push(config.name); + } + } + + Ok(rest_embedders) +} + +fn date_round_trip( + wtxn: &mut RwTxn, + index_uid: &str, + db: Database, + key: &str, +) -> anyhow::Result<()> { + let datetime = + db.remap_types::>().get(wtxn, key).with_context( + || format!("could not read `{key}` while updating date format for index `{index_uid}`"), + )?; + + if let Some(datetime) = datetime { + db.remap_types::>() + .put(wtxn, key, &v1_10::OffsetDateTime(datetime)) + .with_context(|| { + format!( + "could not write `{key}` while updating date format for index `{index_uid}`" + ) + })?; + } + + Ok(()) +} + /// Clears the task queue located at `db_path`. fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> { let path = db_path.join("tasks");