mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-22 12:54:26 +01:00
Check REST embedders before touching the DB
This commit is contained in:
parent
28da759f11
commit
f6abf01d2c
@ -170,12 +170,66 @@ impl OfflineUpgrade {
|
|||||||
.iter(&sched_wtxn)?
|
.iter(&sched_wtxn)?
|
||||||
.map(|res| res.map(|(uid, uuid)| (uid.to_owned(), uuid)))
|
.map(|res| res.map(|(uid, uuid)| (uid.to_owned(), uuid)))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
|
let mut rest_embedders = Vec::new();
|
||||||
|
|
||||||
|
let mut unwrapped_indexes = Vec::new();
|
||||||
|
|
||||||
|
// check that update can take place
|
||||||
for (index_index, result) in indexes.into_iter().enumerate() {
|
for (index_index, result) in indexes.into_iter().enumerate() {
|
||||||
let (uid, uuid) = result?;
|
let (uid, uuid) = result?;
|
||||||
let index_path = self.db_path.join("indexes").join(uuid.to_string());
|
let index_path = self.db_path.join("indexes").join(uuid.to_string());
|
||||||
|
|
||||||
println!(
|
println!(
|
||||||
"[{index_index}/{index_count}]Updating index {uid} at '{}'",
|
"[{}/{index_count}]Checking that update can take place for `{uid}` at `{}`",
|
||||||
|
index_index + 1,
|
||||||
|
index_path.display()
|
||||||
|
);
|
||||||
|
|
||||||
|
let index_env = unsafe {
|
||||||
|
// FIXME: fetch the 25 magic number from the index file
|
||||||
|
EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| {
|
||||||
|
format!("while opening index {uid} at '{}'", index_path.display())
|
||||||
|
})?
|
||||||
|
};
|
||||||
|
|
||||||
|
let index_txn = index_env.read_txn().with_context(|| {
|
||||||
|
format!(
|
||||||
|
"while obtaining a write transaction for index {uid} at {}",
|
||||||
|
index_path.display()
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
println!("\t- Checking for incompatible embedders (REST embedders)");
|
||||||
|
let rest_embedders_for_index = find_rest_embedders(&uid, &index_env, &index_txn)?;
|
||||||
|
|
||||||
|
if rest_embedders_for_index.is_empty() {
|
||||||
|
unwrapped_indexes.push((uid, uuid));
|
||||||
|
} else {
|
||||||
|
// no need to add to unwrapped indexes because we'll exit early
|
||||||
|
rest_embedders.push((uid, rest_embedders_for_index));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !rest_embedders.is_empty() {
|
||||||
|
let rest_embedders = rest_embedders
|
||||||
|
.into_iter()
|
||||||
|
.flat_map(|(index, embedders)| std::iter::repeat(index.clone()).zip(embedders))
|
||||||
|
.map(|(index, embedder)| format!("\t- embedder `{embedder}` in index `{index}`"))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join("\n");
|
||||||
|
bail!("The update cannot take place because there are REST embedder(s). Remove them before proceeding with the update:\n{rest_embedders}\n\n\
|
||||||
|
The database has not been modified and is still a valid v1.9 database.");
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("Update can take place, updating");
|
||||||
|
|
||||||
|
for (index_index, (uid, uuid)) in unwrapped_indexes.into_iter().enumerate() {
|
||||||
|
let index_path = self.db_path.join("indexes").join(uuid.to_string());
|
||||||
|
|
||||||
|
println!(
|
||||||
|
"[{}/{index_count}]Updating index `{uid}` at `{}`",
|
||||||
|
index_index + 1,
|
||||||
index_path.display()
|
index_path.display()
|
||||||
);
|
);
|
||||||
|
|
||||||
@ -188,22 +242,19 @@ impl OfflineUpgrade {
|
|||||||
|
|
||||||
let mut index_wtxn = index_env.write_txn().with_context(|| {
|
let mut index_wtxn = index_env.write_txn().with_context(|| {
|
||||||
format!(
|
format!(
|
||||||
"while obtaining a write transaction for index {uid} at {}",
|
"while obtaining a write transaction for index `{uid}` at `{}`",
|
||||||
index_path.display()
|
index_path.display()
|
||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
println!("\tUpdating index stats");
|
println!("\t- Updating index stats");
|
||||||
update_index_stats(index_stats, &uid, uuid, &mut sched_wtxn)?;
|
update_index_stats(index_stats, &uid, uuid, &mut sched_wtxn)?;
|
||||||
println!("\tUpdating date format");
|
println!("\t- Updating date format");
|
||||||
update_date_format(&uid, &index_env, &mut index_wtxn)?;
|
update_date_format(&uid, &index_env, &mut index_wtxn)?;
|
||||||
|
|
||||||
println!("\tChecking for incompatible embedders (REST embedders)");
|
|
||||||
check_rest_embedder(&uid, &index_env, &index_wtxn)?;
|
|
||||||
|
|
||||||
index_wtxn.commit().with_context(|| {
|
index_wtxn.commit().with_context(|| {
|
||||||
format!(
|
format!(
|
||||||
"while committing the write txn for index {uid} at {}",
|
"while committing the write txn for index `{uid}` at {}",
|
||||||
index_path.display()
|
index_path.display()
|
||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
@ -384,7 +435,7 @@ fn update_index_stats(
|
|||||||
index_uuid: uuid::Uuid,
|
index_uuid: uuid::Uuid,
|
||||||
sched_wtxn: &mut RwTxn,
|
sched_wtxn: &mut RwTxn,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let ctx = || format!("while updating index stats for index {index_uid}");
|
let ctx = || format!("while updating index stats for index `{index_uid}`");
|
||||||
|
|
||||||
let stats: Option<v1_9::IndexStats> = index_stats
|
let stats: Option<v1_9::IndexStats> = index_stats
|
||||||
.remap_data_type::<SerdeJson<v1_9::IndexStats>>()
|
.remap_data_type::<SerdeJson<v1_9::IndexStats>>()
|
||||||
@ -409,7 +460,7 @@ fn update_date_format(
|
|||||||
index_wtxn: &mut RwTxn,
|
index_wtxn: &mut RwTxn,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let main = try_opening_poly_database(index_env, index_wtxn, db_name::MAIN)
|
let main = try_opening_poly_database(index_env, index_wtxn, db_name::MAIN)
|
||||||
.with_context(|| format!("while updating date format for index {index_uid}"))?;
|
.with_context(|| format!("while updating date format for index `{index_uid}`"))?;
|
||||||
|
|
||||||
date_round_trip(index_wtxn, index_uid, main, main_key::CREATED_AT_KEY)?;
|
date_round_trip(index_wtxn, index_uid, main, main_key::CREATED_AT_KEY)?;
|
||||||
date_round_trip(index_wtxn, index_uid, main, main_key::UPDATED_AT_KEY)?;
|
date_round_trip(index_wtxn, index_uid, main, main_key::UPDATED_AT_KEY)?;
|
||||||
@ -417,9 +468,15 @@ fn update_date_format(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn check_rest_embedder(index_uid: &str, index_env: &Env, index_txn: &RoTxn) -> anyhow::Result<()> {
|
fn find_rest_embedders(
|
||||||
|
index_uid: &str,
|
||||||
|
index_env: &Env,
|
||||||
|
index_txn: &RoTxn,
|
||||||
|
) -> anyhow::Result<Vec<String>> {
|
||||||
let main = try_opening_poly_database(index_env, index_txn, db_name::MAIN)
|
let main = try_opening_poly_database(index_env, index_txn, db_name::MAIN)
|
||||||
.with_context(|| format!("while checking REST embedders for index {index_uid}"))?;
|
.with_context(|| format!("while checking REST embedders for index `{index_uid}`"))?;
|
||||||
|
|
||||||
|
let mut rest_embedders = vec![];
|
||||||
|
|
||||||
for config in main
|
for config in main
|
||||||
.remap_types::<Str, SerdeJson<Vec<v1_9::IndexEmbeddingConfig>>>()
|
.remap_types::<Str, SerdeJson<Vec<v1_9::IndexEmbeddingConfig>>>()
|
||||||
@ -427,16 +484,11 @@ fn check_rest_embedder(index_uid: &str, index_env: &Env, index_txn: &RoTxn) -> a
|
|||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
{
|
{
|
||||||
if let v1_9::EmbedderOptions::Rest(_) = config.config.embedder_options {
|
if let v1_9::EmbedderOptions::Rest(_) = config.config.embedder_options {
|
||||||
bail!(
|
rest_embedders.push(config.name);
|
||||||
"index {index_uid} has a REST embedder: {}. \
|
|
||||||
REST embedder are unsupported for upgrade. \
|
|
||||||
Remove the embedder and retry.",
|
|
||||||
config.name
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(rest_embedders)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn date_round_trip(
|
fn date_round_trip(
|
||||||
@ -447,14 +499,16 @@ fn date_round_trip(
|
|||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let datetime =
|
let datetime =
|
||||||
db.remap_types::<Str, SerdeJson<v1_9::OffsetDateTime>>().get(wtxn, key).with_context(
|
db.remap_types::<Str, SerdeJson<v1_9::OffsetDateTime>>().get(wtxn, key).with_context(
|
||||||
|| format!("could not read `{key}` while updating date format for index {index_uid}"),
|
|| format!("could not read `{key}` while updating date format for index `{index_uid}`"),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
if let Some(datetime) = datetime {
|
if let Some(datetime) = datetime {
|
||||||
db.remap_types::<Str, SerdeJson<v1_10::OffsetDateTime>>()
|
db.remap_types::<Str, SerdeJson<v1_10::OffsetDateTime>>()
|
||||||
.put(wtxn, key, &v1_10::OffsetDateTime(datetime))
|
.put(wtxn, key, &v1_10::OffsetDateTime(datetime))
|
||||||
.with_context(|| {
|
.with_context(|| {
|
||||||
format!("could not write `{key}` while updating date format for index {index_uid}")
|
format!(
|
||||||
|
"could not write `{key}` while updating date format for index `{index_uid}`"
|
||||||
|
)
|
||||||
})?;
|
})?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user