move the import of the updates in the v2 and ignore the v1 for now

This commit is contained in:
tamo 2021-05-11 00:20:55 +02:00
parent 7d748fa384
commit 8b7735c20a
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
3 changed files with 41 additions and 33 deletions

View File

@ -16,11 +16,12 @@ use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use tempfile::TempDir; use tempfile::TempDir;
use thiserror::Error; use thiserror::Error;
use uuid::Uuid;
use super::IndexMetadata; use super::IndexMetadata;
use crate::helpers::compression; use crate::helpers::compression;
use crate::index::Index; use crate::index::Index;
use crate::index_controller::{uuid_resolver, UpdateStatus}; use crate::index_controller::uuid_resolver;
pub use actor::DumpActor; pub use actor::DumpActor;
pub use handle_impl::*; pub use handle_impl::*;
@ -53,13 +54,14 @@ impl DumpVersion {
pub fn import_index( pub fn import_index(
self, self,
size: usize, size: usize,
uuid: Uuid,
dump_path: &Path, dump_path: &Path,
index_path: &Path, db_path: &Path,
primary_key: Option<&str>, primary_key: Option<&str>,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
match self { match self {
Self::V1 => v1::import_index(size, dump_path, index_path, primary_key), Self::V1 => v1::import_index(size, uuid, dump_path, db_path, primary_key),
Self::V2 => v2::import_index(size, dump_path, index_path, primary_key), Self::V2 => v2::import_index(size, uuid, dump_path, db_path, primary_key),
} }
} }
} }
@ -200,46 +202,23 @@ pub fn load_dump(
let dump_path = tmp_dir_path.join(&idx.uid); let dump_path = tmp_dir_path.join(&idx.uid);
// this cannot fail since we created all the missing uuid in the previous loop // this cannot fail since we created all the missing uuid in the previous loop
let uuid = uuid_resolver.get_uuid(idx.uid)?.unwrap(); let uuid = uuid_resolver.get_uuid(idx.uid)?.unwrap();
let index_path = db_path.join(&format!("indexes/index-{}", uuid));
// let update_path = db_path.join(&format!("updates"));
info!( info!(
"Importing dump from {} into {}...", "Importing dump from {} into {}...",
dump_path.display(), dump_path.display(),
index_path.display() db_path.display()
); );
metadata metadata
.dump_version .dump_version
.import_index( .import_index(
size, size,
uuid,
&dump_path, &dump_path,
&index_path, &db_path,
idx.meta.primary_key.as_ref().map(|s| s.as_ref()), idx.meta.primary_key.as_ref().map(|s| s.as_ref()),
) )
.unwrap(); .unwrap();
info!("Dump importation from {} succeed", dump_path.display()); info!("Dump importation from {} succeed", dump_path.display());
info!("importing the updates");
use crate::index_controller::update_actor::UpdateStore;
use std::io::BufRead;
let update_path = db_path.join("updates");
let options = EnvOpenOptions::new();
// create an UpdateStore to import the updates
std::fs::create_dir_all(&update_path)?;
let (update_store, _) = UpdateStore::create(options, &update_path)?;
let file = File::open(&dump_path.join("updates.jsonl"))?;
let reader = std::io::BufReader::new(file);
let mut wtxn = update_store.env.write_txn()?;
for update in reader.lines() {
let mut update: UpdateStatus = serde_json::from_str(&update?)?;
if let Some(path) = update.content_path_mut() {
*path = update_path.join("update_files").join(&path).into();
}
update_store.register_raw_updates(&mut wtxn, update, uuid)?;
}
wtxn.commit()?;
} }
// finally we can move all the unprocessed update file into our new DB // finally we can move all the unprocessed update file into our new DB

View File

@ -79,7 +79,8 @@ fn import_settings(dir_path: &Path) -> anyhow::Result<Settings> {
} }
pub fn import_index(size: usize, dump_path: &Path, index_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> { pub fn import_index(size: usize, uuid: Uuid, dump_path: &Path, db_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> {
let index_path = db_path.join(&format!("indexes/index-{}", uuid));
info!("Importing a dump from an old version of meilisearch with dump version 1"); info!("Importing a dump from an old version of meilisearch with dump version 1");
std::fs::create_dir_all(&index_path)?; std::fs::create_dir_all(&index_path)?;

View File

@ -1,4 +1,8 @@
use heed::EnvOpenOptions; use heed::EnvOpenOptions;
use log::info;
use uuid::Uuid;
use crate::index_controller::{UpdateStatus, update_actor::UpdateStore};
use std::io::BufRead;
use milli::{update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}}; use milli::{update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}};
use crate::index::{Checked, Index}; use crate::index::{Checked, Index};
use crate::index_controller::Settings; use crate::index_controller::Settings;
@ -14,13 +18,15 @@ fn import_settings(dir_path: &Path) -> anyhow::Result<Settings<Checked>> {
Ok(metadata) Ok(metadata)
} }
pub fn import_index(size: usize, dump_path: &Path, index_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> { pub fn import_index(size: usize, uuid: Uuid, dump_path: &Path, db_path: &Path, primary_key: Option<&str>) -> anyhow::Result<()> {
let index_path = db_path.join(&format!("indexes/index-{}", uuid));
std::fs::create_dir_all(&index_path)?; std::fs::create_dir_all(&index_path)?;
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(size); options.map_size(size);
let index = milli::Index::new(options, index_path)?; let index = milli::Index::new(options, index_path)?;
let index = Index(Arc::new(index)); let index = Index(Arc::new(index));
info!("importing the settings...");
// extract `settings.json` file and import content // extract `settings.json` file and import content
let settings = import_settings(&dump_path)?; let settings = import_settings(&dump_path)?;
let update_builder = UpdateBuilder::new(0); let update_builder = UpdateBuilder::new(0);
@ -31,6 +37,7 @@ pub fn import_index(size: usize, dump_path: &Path, index_path: &Path, primary_ke
let file = File::open(&dump_path.join("documents.jsonl"))?; let file = File::open(&dump_path.join("documents.jsonl"))?;
let reader = std::io::BufReader::new(file); let reader = std::io::BufReader::new(file);
info!("importing the documents...");
// TODO: TAMO: currently we ignore any error caused by the importation of the documents because // TODO: TAMO: currently we ignore any error caused by the importation of the documents because
// if there is no documents nor primary key it'll throw an anyhow error, but we must remove // if there is no documents nor primary key it'll throw an anyhow error, but we must remove
// this before the merge on main // this before the merge on main
@ -49,6 +56,27 @@ pub fn import_index(size: usize, dump_path: &Path, index_path: &Path, primary_ke
.prepare_for_closing() .prepare_for_closing()
.wait(); .wait();
Ok(()) info!("importing the updates...");
import_updates(uuid, dump_path, db_path)
} }
fn import_updates(uuid: Uuid, dump_path: &Path, db_path: &Path) -> anyhow::Result<()> {
let update_path = db_path.join("updates");
let options = EnvOpenOptions::new();
// create an UpdateStore to import the updates
std::fs::create_dir_all(&update_path)?;
let (update_store, _) = UpdateStore::create(options, &update_path)?;
let file = File::open(&dump_path.join("updates.jsonl"))?;
let reader = std::io::BufReader::new(file);
let mut wtxn = update_store.env.write_txn()?;
for update in reader.lines() {
let mut update: UpdateStatus = serde_json::from_str(&update?)?;
if let Some(path) = update.content_path_mut() {
*path = update_path.join("update_files").join(&path).into();
}
update_store.register_raw_updates(&mut wtxn, update, uuid)?;
}
wtxn.commit()?;
Ok(())
}