mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-22 21:04:27 +01:00
Merge #4912
4912: Allow Meilitool to dumplessly, offline upgrade v1.9 -> v1.10 in some conditions r=Kerollmops a=dureuill
- bail early if the DB contains at least 1 REST embedder, providing the list of detected REST embedders, and without modifying the DB
- Might depend on the feature set that meilitool was compiled with and the featureset that the Meilisearch that created the DB was compiled with 💀. In case of runtime error, try again with a different feature set (passing or not passing `-p meilitool` when building after a `cargo clean`)
Co-authored-by: Louis Dureuil <louis@meilisearch.com>
This commit is contained in:
commit
db0cf3b2ed
9
Cargo.lock
generated
9
Cargo.lock
generated
@ -3520,6 +3520,7 @@ dependencies = [
|
|||||||
"file-store",
|
"file-store",
|
||||||
"meilisearch-auth",
|
"meilisearch-auth",
|
||||||
"meilisearch-types",
|
"meilisearch-types",
|
||||||
|
"serde",
|
||||||
"time",
|
"time",
|
||||||
"uuid",
|
"uuid",
|
||||||
]
|
]
|
||||||
@ -4834,9 +4835,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde"
|
name = "serde"
|
||||||
version = "1.0.204"
|
version = "1.0.209"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "bc76f558e0cbb2a839d37354c575f1dc3fdc6546b5be373ba43d95f231bf7c12"
|
checksum = "99fce0ffe7310761ca6bf9faf5115afbc19688edd00171d81b1bb1b116c63e09"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"serde_derive",
|
"serde_derive",
|
||||||
]
|
]
|
||||||
@ -4852,9 +4853,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "serde_derive"
|
name = "serde_derive"
|
||||||
version = "1.0.204"
|
version = "1.0.209"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222"
|
checksum = "a5831b979fd7b5439637af1752d535ff49f4860c0f341d1baeb6faf0f4242170"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"proc-macro2",
|
"proc-macro2",
|
||||||
"quote",
|
"quote",
|
||||||
|
@ -10,36 +10,50 @@ static VERSION_MINOR: &str = env!("CARGO_PKG_VERSION_MINOR");
|
|||||||
static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH");
|
static VERSION_PATCH: &str = env!("CARGO_PKG_VERSION_PATCH");
|
||||||
|
|
||||||
/// Persists the version of the current Meilisearch binary to a VERSION file
|
/// Persists the version of the current Meilisearch binary to a VERSION file
|
||||||
pub fn create_version_file(db_path: &Path) -> io::Result<()> {
|
pub fn create_current_version_file(db_path: &Path) -> io::Result<()> {
|
||||||
|
create_version_file(db_path, VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn create_version_file(
|
||||||
|
db_path: &Path,
|
||||||
|
major: &str,
|
||||||
|
minor: &str,
|
||||||
|
patch: &str,
|
||||||
|
) -> io::Result<()> {
|
||||||
let version_path = db_path.join(VERSION_FILE_NAME);
|
let version_path = db_path.join(VERSION_FILE_NAME);
|
||||||
fs::write(version_path, format!("{}.{}.{}", VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH))
|
fs::write(version_path, format!("{}.{}.{}", major, minor, patch))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Ensures Meilisearch version is compatible with the database, returns an error versions mismatch.
|
/// Ensures Meilisearch version is compatible with the database, returns an error versions mismatch.
|
||||||
pub fn check_version_file(db_path: &Path) -> anyhow::Result<()> {
|
pub fn check_version_file(db_path: &Path) -> anyhow::Result<()> {
|
||||||
let version_path = db_path.join(VERSION_FILE_NAME);
|
let (major, minor, patch) = get_version(db_path)?;
|
||||||
|
|
||||||
match fs::read_to_string(version_path) {
|
|
||||||
Ok(version) => {
|
|
||||||
let version_components = version.split('.').collect::<Vec<_>>();
|
|
||||||
let (major, minor, patch) = match &version_components[..] {
|
|
||||||
[major, minor, patch] => (major.to_string(), minor.to_string(), patch.to_string()),
|
|
||||||
_ => return Err(VersionFileError::MalformedVersionFile.into()),
|
|
||||||
};
|
|
||||||
|
|
||||||
if major != VERSION_MAJOR || minor != VERSION_MINOR {
|
if major != VERSION_MAJOR || minor != VERSION_MINOR {
|
||||||
return Err(VersionFileError::VersionMismatch { major, minor, patch }.into());
|
return Err(VersionFileError::VersionMismatch { major, minor, patch }.into());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
Err(error) => {
|
|
||||||
return match error.kind() {
|
pub fn get_version(db_path: &Path) -> Result<(String, String, String), VersionFileError> {
|
||||||
ErrorKind::NotFound => Err(VersionFileError::MissingVersionFile.into()),
|
let version_path = db_path.join(VERSION_FILE_NAME);
|
||||||
|
|
||||||
|
match fs::read_to_string(version_path) {
|
||||||
|
Ok(version) => parse_version(&version),
|
||||||
|
Err(error) => match error.kind() {
|
||||||
|
ErrorKind::NotFound => Err(VersionFileError::MissingVersionFile),
|
||||||
_ => Err(error.into()),
|
_ => Err(error.into()),
|
||||||
}
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
pub fn parse_version(version: &str) -> Result<(String, String, String), VersionFileError> {
|
||||||
|
let version_components = version.split('.').collect::<Vec<_>>();
|
||||||
|
let (major, minor, patch) = match &version_components[..] {
|
||||||
|
[major, minor, patch] => (major.to_string(), minor.to_string(), patch.to_string()),
|
||||||
|
_ => return Err(VersionFileError::MalformedVersionFile),
|
||||||
|
};
|
||||||
|
Ok((major, minor, patch))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(thiserror::Error, Debug)]
|
#[derive(thiserror::Error, Debug)]
|
||||||
@ -58,4 +72,7 @@ pub enum VersionFileError {
|
|||||||
env!("CARGO_PKG_VERSION").to_string()
|
env!("CARGO_PKG_VERSION").to_string()
|
||||||
)]
|
)]
|
||||||
VersionMismatch { major: String, minor: String, patch: String },
|
VersionMismatch { major: String, minor: String, patch: String },
|
||||||
|
|
||||||
|
#[error(transparent)]
|
||||||
|
IoError(#[from] std::io::Error),
|
||||||
}
|
}
|
||||||
|
@ -36,7 +36,7 @@ use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchR
|
|||||||
use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod};
|
use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod};
|
||||||
use meilisearch_types::settings::apply_settings_to_builder;
|
use meilisearch_types::settings::apply_settings_to_builder;
|
||||||
use meilisearch_types::tasks::KindWithContent;
|
use meilisearch_types::tasks::KindWithContent;
|
||||||
use meilisearch_types::versioning::{check_version_file, create_version_file};
|
use meilisearch_types::versioning::{check_version_file, create_current_version_file};
|
||||||
use meilisearch_types::{compression, milli, VERSION_FILE_NAME};
|
use meilisearch_types::{compression, milli, VERSION_FILE_NAME};
|
||||||
pub use option::Opt;
|
pub use option::Opt;
|
||||||
use option::ScheduleSnapshot;
|
use option::ScheduleSnapshot;
|
||||||
@ -319,7 +319,7 @@ fn open_or_create_database_unchecked(
|
|||||||
match (
|
match (
|
||||||
index_scheduler_builder(),
|
index_scheduler_builder(),
|
||||||
auth_controller.map_err(anyhow::Error::from),
|
auth_controller.map_err(anyhow::Error::from),
|
||||||
create_version_file(&opt.db_path).map_err(anyhow::Error::from),
|
create_current_version_file(&opt.db_path).map_err(anyhow::Error::from),
|
||||||
) {
|
) {
|
||||||
(Ok(i), Ok(a), Ok(())) => Ok((i, a)),
|
(Ok(i), Ok(a), Ok(())) => Ok((i, a)),
|
||||||
(Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => {
|
(Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)) => {
|
||||||
|
@ -15,5 +15,6 @@ dump = { path = "../dump" }
|
|||||||
file-store = { path = "../file-store" }
|
file-store = { path = "../file-store" }
|
||||||
meilisearch-auth = { path = "../meilisearch-auth" }
|
meilisearch-auth = { path = "../meilisearch-auth" }
|
||||||
meilisearch-types = { path = "../meilisearch-types" }
|
meilisearch-types = { path = "../meilisearch-types" }
|
||||||
|
serde = { version = "1.0.209", features = ["derive"] }
|
||||||
time = { version = "0.3.36", features = ["formatting"] }
|
time = { version = "0.3.36", features = ["formatting"] }
|
||||||
uuid = { version = "1.10.0", features = ["v4"], default-features = false }
|
uuid = { version = "1.10.0", features = ["v4"], default-features = false }
|
||||||
|
@ -2,7 +2,7 @@ use std::fs::{read_dir, read_to_string, remove_file, File};
|
|||||||
use std::io::BufWriter;
|
use std::io::BufWriter;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::{bail, Context};
|
||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
use dump::{DumpWriter, IndexMetadata};
|
use dump::{DumpWriter, IndexMetadata};
|
||||||
use file_store::FileStore;
|
use file_store::FileStore;
|
||||||
@ -10,9 +10,10 @@ use meilisearch_auth::AuthController;
|
|||||||
use meilisearch_types::heed::types::{SerdeJson, Str};
|
use meilisearch_types::heed::types::{SerdeJson, Str};
|
||||||
use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified};
|
use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified};
|
||||||
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
|
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
|
||||||
|
use meilisearch_types::milli::index::{db_name, main_key};
|
||||||
use meilisearch_types::milli::{obkv_to_json, BEU32};
|
use meilisearch_types::milli::{obkv_to_json, BEU32};
|
||||||
use meilisearch_types::tasks::{Status, Task};
|
use meilisearch_types::tasks::{Status, Task};
|
||||||
use meilisearch_types::versioning::check_version_file;
|
use meilisearch_types::versioning::{create_version_file, get_version, parse_version};
|
||||||
use meilisearch_types::Index;
|
use meilisearch_types::Index;
|
||||||
use time::macros::format_description;
|
use time::macros::format_description;
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
@ -62,20 +63,457 @@ enum Command {
|
|||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
skip_enqueued_tasks: bool,
|
skip_enqueued_tasks: bool,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// Attempts to upgrade from one major version to the next without a dump.
|
||||||
|
///
|
||||||
|
/// Make sure to run this commmand when Meilisearch is not running!
|
||||||
|
/// If Meilisearch is running while executing this command, the database could be corrupted
|
||||||
|
/// (contain data from both the old and the new versions)
|
||||||
|
///
|
||||||
|
/// Supported upgrade paths:
|
||||||
|
///
|
||||||
|
/// - v1.9.0 -> v1.10.0
|
||||||
|
OfflineUpgrade {
|
||||||
|
#[arg(long)]
|
||||||
|
target_version: String,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() -> anyhow::Result<()> {
|
fn main() -> anyhow::Result<()> {
|
||||||
let Cli { db_path, command } = Cli::parse();
|
let Cli { db_path, command } = Cli::parse();
|
||||||
|
|
||||||
check_version_file(&db_path).context("While checking the version file")?;
|
let detected_version = get_version(&db_path).context("While checking the version file")?;
|
||||||
|
|
||||||
match command {
|
match command {
|
||||||
Command::ClearTaskQueue => clear_task_queue(db_path),
|
Command::ClearTaskQueue => clear_task_queue(db_path),
|
||||||
Command::ExportADump { dump_dir, skip_enqueued_tasks } => {
|
Command::ExportADump { dump_dir, skip_enqueued_tasks } => {
|
||||||
export_a_dump(db_path, dump_dir, skip_enqueued_tasks)
|
export_a_dump(db_path, dump_dir, skip_enqueued_tasks)
|
||||||
}
|
}
|
||||||
|
Command::OfflineUpgrade { target_version } => {
|
||||||
|
let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?;
|
||||||
|
OfflineUpgrade { db_path, current_version: detected_version, target_version }.upgrade()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct OfflineUpgrade {
|
||||||
|
db_path: PathBuf,
|
||||||
|
current_version: (String, String, String),
|
||||||
|
target_version: (String, String, String),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl OfflineUpgrade {
|
||||||
|
fn upgrade(self) -> anyhow::Result<()> {
|
||||||
|
// TODO: if we make this process support more versions, introduce a more flexible way of checking for the version
|
||||||
|
// currently only supports v1.9 to v1.10
|
||||||
|
let (current_major, current_minor, current_patch) = &self.current_version;
|
||||||
|
|
||||||
|
match (current_major.as_str(), current_minor.as_str(), current_patch.as_str()) {
|
||||||
|
("1", "9", _) => {}
|
||||||
|
_ => {
|
||||||
|
bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let (target_major, target_minor, target_patch) = &self.target_version;
|
||||||
|
|
||||||
|
match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) {
|
||||||
|
("1", "10", _) => {}
|
||||||
|
_ => {
|
||||||
|
bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to v1.10")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("Upgrading from {current_major}.{current_minor}.{current_patch} to {target_major}.{target_minor}.{target_patch}");
|
||||||
|
|
||||||
|
self.v1_9_to_v1_10()?;
|
||||||
|
|
||||||
|
println!("Writing VERSION file");
|
||||||
|
|
||||||
|
create_version_file(&self.db_path, target_major, target_minor, target_patch)
|
||||||
|
.context("while writing VERSION file after the upgrade")?;
|
||||||
|
|
||||||
|
println!("Success");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn v1_9_to_v1_10(&self) -> anyhow::Result<()> {
|
||||||
|
// 2 changes here
|
||||||
|
|
||||||
|
// 1. date format. needs to be done before opening the Index
|
||||||
|
// 2. REST embedders. We don't support this case right now, so bail
|
||||||
|
|
||||||
|
let index_scheduler_path = self.db_path.join("tasks");
|
||||||
|
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
|
||||||
|
.with_context(|| {
|
||||||
|
format!("While trying to open {:?}", index_scheduler_path.display())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let mut sched_wtxn = env.write_txn()?;
|
||||||
|
|
||||||
|
let index_mapping: Database<Str, UuidCodec> =
|
||||||
|
try_opening_database(&env, &sched_wtxn, "index-mapping")?;
|
||||||
|
|
||||||
|
let index_stats: Database<UuidCodec, Unspecified> =
|
||||||
|
try_opening_database(&env, &sched_wtxn, "index-stats").with_context(|| {
|
||||||
|
format!("While trying to open {:?}", index_scheduler_path.display())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let index_count =
|
||||||
|
index_mapping.len(&sched_wtxn).context("while reading the number of indexes")?;
|
||||||
|
|
||||||
|
// FIXME: not ideal, we have to pre-populate all indexes to prevent double borrow of sched_wtxn
|
||||||
|
// 1. immutably for the iteration
|
||||||
|
// 2. mutably for updating index stats
|
||||||
|
let indexes: Vec<_> = index_mapping
|
||||||
|
.iter(&sched_wtxn)?
|
||||||
|
.map(|res| res.map(|(uid, uuid)| (uid.to_owned(), uuid)))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let mut rest_embedders = Vec::new();
|
||||||
|
|
||||||
|
let mut unwrapped_indexes = Vec::new();
|
||||||
|
|
||||||
|
// check that update can take place
|
||||||
|
for (index_index, result) in indexes.into_iter().enumerate() {
|
||||||
|
let (uid, uuid) = result?;
|
||||||
|
let index_path = self.db_path.join("indexes").join(uuid.to_string());
|
||||||
|
|
||||||
|
println!(
|
||||||
|
"[{}/{index_count}]Checking that update can take place for `{uid}` at `{}`",
|
||||||
|
index_index + 1,
|
||||||
|
index_path.display()
|
||||||
|
);
|
||||||
|
|
||||||
|
let index_env = unsafe {
|
||||||
|
// FIXME: fetch the 25 magic number from the index file
|
||||||
|
EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| {
|
||||||
|
format!("while opening index {uid} at '{}'", index_path.display())
|
||||||
|
})?
|
||||||
|
};
|
||||||
|
|
||||||
|
let index_txn = index_env.read_txn().with_context(|| {
|
||||||
|
format!(
|
||||||
|
"while obtaining a write transaction for index {uid} at {}",
|
||||||
|
index_path.display()
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
println!("\t- Checking for incompatible embedders (REST embedders)");
|
||||||
|
let rest_embedders_for_index = find_rest_embedders(&uid, &index_env, &index_txn)?;
|
||||||
|
|
||||||
|
if rest_embedders_for_index.is_empty() {
|
||||||
|
unwrapped_indexes.push((uid, uuid));
|
||||||
|
} else {
|
||||||
|
// no need to add to unwrapped indexes because we'll exit early
|
||||||
|
rest_embedders.push((uid, rest_embedders_for_index));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !rest_embedders.is_empty() {
|
||||||
|
let rest_embedders = rest_embedders
|
||||||
|
.into_iter()
|
||||||
|
.flat_map(|(index, embedders)| std::iter::repeat(index.clone()).zip(embedders))
|
||||||
|
.map(|(index, embedder)| format!("\t- embedder `{embedder}` in index `{index}`"))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join("\n");
|
||||||
|
bail!("The update cannot take place because there are REST embedder(s). Remove them before proceeding with the update:\n{rest_embedders}\n\n\
|
||||||
|
The database has not been modified and is still a valid v1.9 database.");
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("Update can take place, updating");
|
||||||
|
|
||||||
|
for (index_index, (uid, uuid)) in unwrapped_indexes.into_iter().enumerate() {
|
||||||
|
let index_path = self.db_path.join("indexes").join(uuid.to_string());
|
||||||
|
|
||||||
|
println!(
|
||||||
|
"[{}/{index_count}]Updating index `{uid}` at `{}`",
|
||||||
|
index_index + 1,
|
||||||
|
index_path.display()
|
||||||
|
);
|
||||||
|
|
||||||
|
let index_env = unsafe {
|
||||||
|
// FIXME: fetch the 25 magic number from the index file
|
||||||
|
EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| {
|
||||||
|
format!("while opening index {uid} at '{}'", index_path.display())
|
||||||
|
})?
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut index_wtxn = index_env.write_txn().with_context(|| {
|
||||||
|
format!(
|
||||||
|
"while obtaining a write transaction for index `{uid}` at `{}`",
|
||||||
|
index_path.display()
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
println!("\t- Updating index stats");
|
||||||
|
update_index_stats(index_stats, &uid, uuid, &mut sched_wtxn)?;
|
||||||
|
println!("\t- Updating date format");
|
||||||
|
update_date_format(&uid, &index_env, &mut index_wtxn)?;
|
||||||
|
|
||||||
|
index_wtxn.commit().with_context(|| {
|
||||||
|
format!(
|
||||||
|
"while committing the write txn for index `{uid}` at {}",
|
||||||
|
index_path.display()
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
|
||||||
|
sched_wtxn.commit().context("while committing the write txn for the index-scheduler")?;
|
||||||
|
|
||||||
|
println!("Upgrading database succeeded");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub mod v1_9 {
|
||||||
|
pub type FieldDistribution = std::collections::BTreeMap<String, u64>;
|
||||||
|
|
||||||
|
/// The statistics that can be computed from an `Index` object.
|
||||||
|
#[derive(serde::Serialize, serde::Deserialize, Debug)]
|
||||||
|
pub struct IndexStats {
|
||||||
|
/// Number of documents in the index.
|
||||||
|
pub number_of_documents: u64,
|
||||||
|
/// Size taken up by the index' DB, in bytes.
|
||||||
|
///
|
||||||
|
/// This includes the size taken by both the used and free pages of the DB, and as the free pages
|
||||||
|
/// are not returned to the disk after a deletion, this number is typically larger than
|
||||||
|
/// `used_database_size` that only includes the size of the used pages.
|
||||||
|
pub database_size: u64,
|
||||||
|
/// Size taken by the used pages of the index' DB, in bytes.
|
||||||
|
///
|
||||||
|
/// As the DB backend does not return to the disk the pages that are not currently used by the DB,
|
||||||
|
/// this value is typically smaller than `database_size`.
|
||||||
|
pub used_database_size: u64,
|
||||||
|
/// Association of every field name with the number of times it occurs in the documents.
|
||||||
|
pub field_distribution: FieldDistribution,
|
||||||
|
/// Creation date of the index.
|
||||||
|
pub created_at: time::OffsetDateTime,
|
||||||
|
/// Date of the last update of the index.
|
||||||
|
pub updated_at: time::OffsetDateTime,
|
||||||
|
}
|
||||||
|
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
|
pub struct IndexEmbeddingConfig {
|
||||||
|
pub name: String,
|
||||||
|
pub config: EmbeddingConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)]
|
||||||
|
pub struct EmbeddingConfig {
|
||||||
|
/// Options of the embedder, specific to each kind of embedder
|
||||||
|
pub embedder_options: EmbedderOptions,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Options of an embedder, specific to each kind of embedder.
|
||||||
|
#[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
|
||||||
|
pub enum EmbedderOptions {
|
||||||
|
HuggingFace(hf::EmbedderOptions),
|
||||||
|
OpenAi(openai::EmbedderOptions),
|
||||||
|
Ollama(ollama::EmbedderOptions),
|
||||||
|
UserProvided(manual::EmbedderOptions),
|
||||||
|
Rest(rest::EmbedderOptions),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for EmbedderOptions {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::OpenAi(openai::EmbedderOptions { api_key: None, dimensions: None })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mod hf {
|
||||||
|
#[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
|
||||||
|
pub struct EmbedderOptions {
|
||||||
|
pub model: String,
|
||||||
|
pub revision: Option<String>,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mod openai {
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
|
||||||
|
pub struct EmbedderOptions {
|
||||||
|
pub api_key: Option<String>,
|
||||||
|
pub dimensions: Option<usize>,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mod ollama {
|
||||||
|
#[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
|
||||||
|
pub struct EmbedderOptions {
|
||||||
|
pub embedding_model: String,
|
||||||
|
pub url: Option<String>,
|
||||||
|
pub api_key: Option<String>,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mod manual {
|
||||||
|
#[derive(Debug, Clone, Hash, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
|
||||||
|
pub struct EmbedderOptions {
|
||||||
|
pub dimensions: usize,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
mod rest {
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize, Hash)]
|
||||||
|
pub struct EmbedderOptions {
|
||||||
|
pub api_key: Option<String>,
|
||||||
|
pub dimensions: Option<usize>,
|
||||||
|
pub url: String,
|
||||||
|
pub input_field: Vec<String>,
|
||||||
|
// path to the array of embeddings
|
||||||
|
pub path_to_embeddings: Vec<String>,
|
||||||
|
// shape of a single embedding
|
||||||
|
pub embedding_object: Vec<String>,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type OffsetDateTime = time::OffsetDateTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub mod v1_10 {
|
||||||
|
use crate::v1_9;
|
||||||
|
|
||||||
|
pub type FieldDistribution = std::collections::BTreeMap<String, u64>;
|
||||||
|
|
||||||
|
/// The statistics that can be computed from an `Index` object.
|
||||||
|
#[derive(serde::Serialize, serde::Deserialize, Debug)]
|
||||||
|
pub struct IndexStats {
|
||||||
|
/// Number of documents in the index.
|
||||||
|
pub number_of_documents: u64,
|
||||||
|
/// Size taken up by the index' DB, in bytes.
|
||||||
|
///
|
||||||
|
/// This includes the size taken by both the used and free pages of the DB, and as the free pages
|
||||||
|
/// are not returned to the disk after a deletion, this number is typically larger than
|
||||||
|
/// `used_database_size` that only includes the size of the used pages.
|
||||||
|
pub database_size: u64,
|
||||||
|
/// Size taken by the used pages of the index' DB, in bytes.
|
||||||
|
///
|
||||||
|
/// As the DB backend does not return to the disk the pages that are not currently used by the DB,
|
||||||
|
/// this value is typically smaller than `database_size`.
|
||||||
|
pub used_database_size: u64,
|
||||||
|
/// Association of every field name with the number of times it occurs in the documents.
|
||||||
|
pub field_distribution: FieldDistribution,
|
||||||
|
/// Creation date of the index.
|
||||||
|
#[serde(with = "time::serde::rfc3339")]
|
||||||
|
pub created_at: time::OffsetDateTime,
|
||||||
|
/// Date of the last update of the index.
|
||||||
|
#[serde(with = "time::serde::rfc3339")]
|
||||||
|
pub updated_at: time::OffsetDateTime,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<v1_9::IndexStats> for IndexStats {
|
||||||
|
fn from(
|
||||||
|
v1_9::IndexStats {
|
||||||
|
number_of_documents,
|
||||||
|
database_size,
|
||||||
|
used_database_size,
|
||||||
|
field_distribution,
|
||||||
|
created_at,
|
||||||
|
updated_at,
|
||||||
|
}: v1_9::IndexStats,
|
||||||
|
) -> Self {
|
||||||
|
IndexStats {
|
||||||
|
number_of_documents,
|
||||||
|
database_size,
|
||||||
|
used_database_size,
|
||||||
|
field_distribution,
|
||||||
|
created_at,
|
||||||
|
updated_at,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(serde::Serialize, serde::Deserialize)]
|
||||||
|
#[serde(transparent)]
|
||||||
|
pub struct OffsetDateTime(#[serde(with = "time::serde::rfc3339")] pub time::OffsetDateTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_index_stats(
|
||||||
|
index_stats: Database<UuidCodec, Unspecified>,
|
||||||
|
index_uid: &str,
|
||||||
|
index_uuid: uuid::Uuid,
|
||||||
|
sched_wtxn: &mut RwTxn,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let ctx = || format!("while updating index stats for index `{index_uid}`");
|
||||||
|
|
||||||
|
let stats: Option<v1_9::IndexStats> = index_stats
|
||||||
|
.remap_data_type::<SerdeJson<v1_9::IndexStats>>()
|
||||||
|
.get(sched_wtxn, &index_uuid)
|
||||||
|
.with_context(ctx)?;
|
||||||
|
|
||||||
|
if let Some(stats) = stats {
|
||||||
|
let stats: v1_10::IndexStats = stats.into();
|
||||||
|
|
||||||
|
index_stats
|
||||||
|
.remap_data_type::<SerdeJson<v1_10::IndexStats>>()
|
||||||
|
.put(sched_wtxn, &index_uuid, &stats)
|
||||||
|
.with_context(ctx)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_date_format(
|
||||||
|
index_uid: &str,
|
||||||
|
index_env: &Env,
|
||||||
|
index_wtxn: &mut RwTxn,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let main = try_opening_poly_database(index_env, index_wtxn, db_name::MAIN)
|
||||||
|
.with_context(|| format!("while updating date format for index `{index_uid}`"))?;
|
||||||
|
|
||||||
|
date_round_trip(index_wtxn, index_uid, main, main_key::CREATED_AT_KEY)?;
|
||||||
|
date_round_trip(index_wtxn, index_uid, main, main_key::UPDATED_AT_KEY)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn find_rest_embedders(
|
||||||
|
index_uid: &str,
|
||||||
|
index_env: &Env,
|
||||||
|
index_txn: &RoTxn,
|
||||||
|
) -> anyhow::Result<Vec<String>> {
|
||||||
|
let main = try_opening_poly_database(index_env, index_txn, db_name::MAIN)
|
||||||
|
.with_context(|| format!("while checking REST embedders for index `{index_uid}`"))?;
|
||||||
|
|
||||||
|
let mut rest_embedders = vec![];
|
||||||
|
|
||||||
|
for config in main
|
||||||
|
.remap_types::<Str, SerdeJson<Vec<v1_9::IndexEmbeddingConfig>>>()
|
||||||
|
.get(index_txn, main_key::EMBEDDING_CONFIGS)?
|
||||||
|
.unwrap_or_default()
|
||||||
|
{
|
||||||
|
if let v1_9::EmbedderOptions::Rest(_) = config.config.embedder_options {
|
||||||
|
rest_embedders.push(config.name);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(rest_embedders)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn date_round_trip(
|
||||||
|
wtxn: &mut RwTxn,
|
||||||
|
index_uid: &str,
|
||||||
|
db: Database<Unspecified, Unspecified>,
|
||||||
|
key: &str,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let datetime =
|
||||||
|
db.remap_types::<Str, SerdeJson<v1_9::OffsetDateTime>>().get(wtxn, key).with_context(
|
||||||
|
|| format!("could not read `{key}` while updating date format for index `{index_uid}`"),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
if let Some(datetime) = datetime {
|
||||||
|
db.remap_types::<Str, SerdeJson<v1_10::OffsetDateTime>>()
|
||||||
|
.put(wtxn, key, &v1_10::OffsetDateTime(datetime))
|
||||||
|
.with_context(|| {
|
||||||
|
format!(
|
||||||
|
"could not write `{key}` while updating date format for index `{index_uid}`"
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Clears the task queue located at `db_path`.
|
/// Clears the task queue located at `db_path`.
|
||||||
fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> {
|
fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> {
|
||||||
|
Loading…
Reference in New Issue
Block a user