Convert update files from OBKV to ndjson

This commit is contained in:
ManyTheFish 2024-12-10 15:12:50 +01:00
parent e974be9518
commit 479607e5dd
5 changed files with 81 additions and 7 deletions

View file

@ -73,7 +73,7 @@ enum Command {
///
/// Supported upgrade paths:
///
/// - v1.9.x -> v1.10.x -> v1.11.x
/// - v1.9.x -> v1.10.x -> v1.11.x -> v1.12.x
OfflineUpgrade {
#[arg(long)]
target_version: String,

View file

@ -1,5 +1,6 @@
mod v1_10;
mod v1_11;
mod v1_12;
mod v1_9;
use std::path::{Path, PathBuf};
@ -8,6 +9,7 @@ use anyhow::{bail, Context};
use meilisearch_types::versioning::create_version_file;
use v1_10::v1_9_to_v1_10;
use v1_12::v1_11_to_v1_12;
use crate::upgrade::v1_11::v1_10_to_v1_11;
@ -22,6 +24,7 @@ impl OfflineUpgrade {
let upgrade_list = [
(v1_9_to_v1_10 as fn(&Path) -> Result<(), anyhow::Error>, "1", "10", "0"),
(v1_10_to_v1_11, "1", "11", "0"),
(v1_11_to_v1_12, "1", "12", "0"),
];
let (current_major, current_minor, current_patch) = &self.current_version;
@ -33,6 +36,7 @@ impl OfflineUpgrade {
) {
("1", "9", _) => 0,
("1", "10", _) => 1,
("1", "11", _) => 2,
_ => {
bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9 and v1.10")
}
@ -43,6 +47,7 @@ impl OfflineUpgrade {
let ends_at = match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) {
("1", "10", _) => 0,
("1", "11", _) => 1,
("1", "12", _) => 2,
(major, _, _) if major.starts_with('v') => {
bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.")
}

View file

@ -0,0 +1,63 @@
//! The breaking changes that happened between the v1.11 and the v1.12 are:
//! - The new indexer changed the update files format from OBKV to ndjson. https://github.com/meilisearch/meilisearch/pull/4900
use std::{io::BufWriter, path::Path};
use anyhow::Context;
use file_store::FileStore;
use indexmap::IndexMap;
use meilisearch_types::milli::documents::DocumentsBatchReader;
use serde_json::value::RawValue;
use tempfile::NamedTempFile;
pub fn v1_11_to_v1_12(db_path: &Path) -> anyhow::Result<()> {
println!("Upgrading from v1.11.0 to v1.12.0");
convert_update_files(db_path)?;
Ok(())
}
/// Convert the update files from OBKV to ndjson format.
///
/// 1) List all the update files using the file store.
/// 2) For each update file, read the update file into a DocumentsBatchReader.
/// 3) For each document in the update file, convert the document to a JSON object.
/// 4) Write the JSON object to a tmp file in the update files directory.
/// 5) Persist the tmp file replacing the old update file.
fn convert_update_files(db_path: &Path) -> anyhow::Result<()> {
let update_files_dir_path = db_path.join("update_files");
let file_store = FileStore::new(&update_files_dir_path)?;
for uuid in file_store.all_uuids()? {
let uuid = uuid?;
let update_file_path = file_store.get_update_path(uuid);
let update_file = file_store.get_update(uuid)?;
let mut file = NamedTempFile::new_in(&update_files_dir_path).map(BufWriter::new)?;
let reader = DocumentsBatchReader::from_reader(update_file)?;
let (mut cursor, index) = reader.into_cursor_and_fields_index();
while let Some(document) = cursor.next_document()? {
let mut json_document = IndexMap::new();
for (fid, value) in document {
let field_name = index
.name(fid)
.with_context(|| format!("while getting field name for fid {fid}"))?;
let value: &RawValue = serde_json::from_slice(value)?;
json_document.insert(field_name, value);
}
serde_json::to_writer(&mut file, &json_document)?;
}
let file = file
.into_inner()
.map_err(|e| e.into_error())
.context("while flushing update file bufwriter")?;
let _ = file.persist(update_file_path)?;
}
Ok(())
}