5246: Fix dump import r=Kerollmops a=dureuill

- Fix: handle the change of format of the update files
  - Correctly handle update files as JSON stream rather than obkv when exporting a dump with enqueued tasks
  - Correctly recreate update files as JSON stream rather than obkv when importing a dump
  - As the dump format itself didn't change, all dumps are still compatible
- Temporary workaround for https://github.com/meilisearch/meilisearch/issues/5247: set the batch uid of tasks to `null` at dump export time.
- Changes to meilitool
  - Export dump with update files in new format if DB >= v1.12
  - offline upgrade now supports upgrading from [1.9.0-1.12.5] to [1.10.0-1.12.5].
  - offline upgrade supports no-op upgrades and has better error messages 

Co-authored-by: Louis Dureuil <louis@meilisearch.com>
Co-authored-by: ManyTheFish <many@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2025-01-20 13:03:49 +00:00 committed by GitHub
commit 1c78447226
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 115 additions and 39 deletions

View File

@ -29,7 +29,7 @@ use bumpalo::Bump;
use dump::IndexMetadata;
use meilisearch_types::batches::BatchId;
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader, PrimaryKey};
use meilisearch_types::milli::documents::PrimaryKey;
use meilisearch_types::milli::heed::CompactionOption;
use meilisearch_types::milli::progress::Progress;
use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction};
@ -819,6 +819,13 @@ impl IndexScheduler {
t.started_at = Some(started_at);
t.finished_at = Some(finished_at);
}
// Patch the task to remove the batch uid, because as of v1.12.5 batches are not persisted.
// This prevent from referencing *future* batches not actually associated with the task.
//
// See <https://github.com/meilisearch/meilisearch/issues/5247> for details.
t.batch_uid = None;
let mut dump_content_file = dump_tasks.push_task(&t.into())?;
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
@ -829,21 +836,18 @@ impl IndexScheduler {
if status == Status::Enqueued {
let content_file = self.file_store.get_update(content_file)?;
let reader = DocumentsBatchReader::from_reader(content_file)
.map_err(|e| Error::from_milli(e.into(), None))?;
let (mut cursor, documents_batch_index) =
reader.into_cursor_and_fields_index();
while let Some(doc) = cursor
.next_document()
.map_err(|e| Error::from_milli(e.into(), None))?
for document in
serde_json::de::Deserializer::from_reader(content_file).into_iter()
{
dump_content_file.push_document(
&obkv_to_object(doc, &documents_batch_index)
.map_err(|e| Error::from_milli(e, None))?,
)?;
let document = document.map_err(|e| {
Error::from_milli(
milli::InternalError::SerdeJson(e).into(),
None,
)
})?;
dump_content_file.push_document(&document)?;
}
dump_content_file.flush()?;
}
}

View File

@ -55,7 +55,6 @@ use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFea
use meilisearch_types::heed::byteorder::BE;
use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str, I128};
use meilisearch_types::heed::{self, Database, Env, PutFlags, RoTxn, RwTxn};
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::milli::index::IndexEmbeddingConfig;
use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs};
@ -2017,14 +2016,19 @@ impl<'a> Dump<'a> {
task: TaskDump,
content_file: Option<Box<UpdateFile>>,
) -> Result<Task> {
let task_has_no_docs = matches!(task.kind, KindDump::DocumentImport { documents_count, .. } if documents_count == 0);
let content_uuid = match content_file {
Some(content_file) if task.status == Status::Enqueued => {
let (uuid, mut file) = self.index_scheduler.create_update_file(false)?;
let mut builder = DocumentsBatchBuilder::new(&mut file);
let (uuid, file) = self.index_scheduler.create_update_file(false)?;
let mut writer = io::BufWriter::new(file);
for doc in content_file {
builder.append_json_object(&doc?)?;
let doc = doc?;
serde_json::to_writer(&mut writer, &doc).map_err(|e| {
Error::from_milli(milli::InternalError::SerdeJson(e).into(), None)
})?;
}
builder.into_inner()?;
let file = writer.into_inner().map_err(|e| e.into_error())?;
file.persist()?;
Some(uuid)
@ -2032,6 +2036,12 @@ impl<'a> Dump<'a> {
// If the task isn't `Enqueued` then just generate a recognisable `Uuid`
// in case we try to open it later.
_ if task.status != Status::Enqueued => Some(Uuid::nil()),
None if task.status == Status::Enqueued && task_has_no_docs => {
let (uuid, file) = self.index_scheduler.create_update_file(false)?;
file.persist()?;
Some(uuid)
}
_ => None,
};

View File

@ -88,7 +88,7 @@ fn main() -> anyhow::Result<()> {
match command {
Command::ClearTaskQueue => clear_task_queue(db_path),
Command::ExportADump { dump_dir, skip_enqueued_tasks } => {
export_a_dump(db_path, dump_dir, skip_enqueued_tasks)
export_a_dump(db_path, dump_dir, skip_enqueued_tasks, detected_version)
}
Command::OfflineUpgrade { target_version } => {
let target_version = parse_version(&target_version).context("While parsing `--target-version`. Make sure `--target-version` is in the format MAJOR.MINOR.PATCH")?;
@ -187,6 +187,7 @@ fn export_a_dump(
db_path: PathBuf,
dump_dir: PathBuf,
skip_enqueued_tasks: bool,
detected_version: (String, String, String),
) -> Result<(), anyhow::Error> {
let started_at = OffsetDateTime::now_utc();
@ -238,9 +239,6 @@ fn export_a_dump(
if skip_enqueued_tasks {
eprintln!("Skip dumping the enqueued tasks...");
} else {
eprintln!("Dumping the enqueued tasks...");
// 3. dump the tasks
let mut dump_tasks = dump.create_tasks_queue()?;
let mut count = 0;
for ret in all_tasks.iter(&rtxn)? {
@ -254,18 +252,39 @@ fn export_a_dump(
if status == Status::Enqueued {
let content_file = file_store.get_update(content_file_uuid)?;
let reader =
DocumentsBatchReader::from_reader(content_file).with_context(|| {
format!("While reading content file {:?}", content_file_uuid)
})?;
let (mut cursor, documents_batch_index) = reader.into_cursor_and_fields_index();
while let Some(doc) = cursor.next_document().with_context(|| {
format!("While iterating on content file {:?}", content_file_uuid)
})? {
dump_content_file
.push_document(&obkv_to_object(doc, &documents_batch_index)?)?;
if (
detected_version.0.as_str(),
detected_version.1.as_str(),
detected_version.2.as_str(),
) < ("1", "12", "0")
{
eprintln!("Dumping the enqueued tasks reading them in obkv format...");
let reader =
DocumentsBatchReader::from_reader(content_file).with_context(|| {
format!("While reading content file {:?}", content_file_uuid)
})?;
let (mut cursor, documents_batch_index) =
reader.into_cursor_and_fields_index();
while let Some(doc) = cursor.next_document().with_context(|| {
format!("While iterating on content file {:?}", content_file_uuid)
})? {
dump_content_file
.push_document(&obkv_to_object(doc, &documents_batch_index)?)?;
}
} else {
eprintln!(
"Dumping the enqueued tasks reading them in JSON stream format..."
);
for document in
serde_json::de::Deserializer::from_reader(content_file).into_iter()
{
let document = document.with_context(|| {
format!("While reading content file {:?}", content_file_uuid)
})?;
dump_content_file.push_document(&document)?;
}
}
dump_content_file.flush()?;
count += 1;
}

View File

@ -20,6 +20,34 @@ pub struct OfflineUpgrade {
impl OfflineUpgrade {
pub fn upgrade(self) -> anyhow::Result<()> {
// Adding a version?
//
// 1. Update the LAST_SUPPORTED_UPGRADE_FROM_VERSION and LAST_SUPPORTED_UPGRADE_TO_VERSION.
// 2. Add new version to the upgrade list if necessary
// 3. Use `no_upgrade` as index for versions that are compatible.
if self.current_version == self.target_version {
println!("Database is already at the target version. Exiting.");
return Ok(());
}
if self.current_version > self.target_version {
bail!(
"Cannot downgrade from {}.{}.{} to {}.{}.{}. Downgrade not supported",
self.current_version.0,
self.current_version.1,
self.current_version.2,
self.target_version.0,
self.target_version.1,
self.target_version.2
);
}
const FIRST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.9.0";
const LAST_SUPPORTED_UPGRADE_FROM_VERSION: &str = "1.12.5";
const FIRST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.10.0";
const LAST_SUPPORTED_UPGRADE_TO_VERSION: &str = "1.12.5";
let upgrade_list = [
(
v1_9_to_v1_10 as fn(&Path, &str, &str, &str) -> Result<(), anyhow::Error>,
@ -32,6 +60,8 @@ impl OfflineUpgrade {
(v1_12_to_v1_12_3, "1", "12", "3"),
];
let no_upgrade: usize = upgrade_list.len();
let (current_major, current_minor, current_patch) = &self.current_version;
let start_at = match (
@ -42,9 +72,12 @@ impl OfflineUpgrade {
("1", "9", _) => 0,
("1", "10", _) => 1,
("1", "11", _) => 2,
("1", "12", x) if x == "0" || x == "1" || x == "2" => 3,
("1", "12", "0" | "1" | "2") => 3,
("1", "12", "3" | "4" | "5") => no_upgrade,
_ => {
bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from v1.9 and v1.10")
bail!("Unsupported current version {current_major}.{current_minor}.{current_patch}. Can only upgrade from versions in range [{}-{}]",
FIRST_SUPPORTED_UPGRADE_FROM_VERSION,
LAST_SUPPORTED_UPGRADE_FROM_VERSION);
}
};
@ -53,18 +86,28 @@ impl OfflineUpgrade {
let ends_at = match (target_major.as_str(), target_minor.as_str(), target_patch.as_str()) {
("1", "10", _) => 0,
("1", "11", _) => 1,
("1", "12", x) if x == "0" || x == "1" || x == "2" => 2,
("1", "12", "3") => 3,
("1", "12", "0" | "1" | "2") => 2,
("1", "12", "3" | "4" | "5") => 3,
(major, _, _) if major.starts_with('v') => {
bail!("Target version must not starts with a `v`. Instead of writing `v1.9.0` write `1.9.0` for example.")
}
_ => {
bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to v1.10 and v1.11")
bail!("Unsupported target version {target_major}.{target_minor}.{target_patch}. Can only upgrade to versions in range [{}-{}]",
FIRST_SUPPORTED_UPGRADE_TO_VERSION,
LAST_SUPPORTED_UPGRADE_TO_VERSION);
}
};
println!("Starting the upgrade from {current_major}.{current_minor}.{current_patch} to {target_major}.{target_minor}.{target_patch}");
if start_at == no_upgrade {
println!("No upgrade operation to perform, writing VERSION file");
create_version_file(&self.db_path, target_major, target_minor, target_patch)
.context("while writing VERSION file after the upgrade")?;
println!("Success");
return Ok(());
}
#[allow(clippy::needless_range_loop)]
for index in start_at..=ends_at {
let (func, major, minor, patch) = upgrade_list[index];