mod actor; mod handle_impl; mod message; mod v1; mod v2; use std::{fs::File, path::Path, sync::Arc}; use anyhow::bail; use heed::EnvOpenOptions; use log::{error, info}; use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; #[cfg(test)] use mockall::automock; use serde::{Deserialize, Serialize}; use serde_json::json; use tempfile::TempDir; use thiserror::Error; use super::IndexMetadata; use crate::helpers::compression; use crate::index::Index; use crate::index_controller::{uuid_resolver, UpdateStatus}; pub use actor::DumpActor; pub use handle_impl::*; pub use message::DumpMsg; pub type DumpResult = std::result::Result; #[derive(Error, Debug)] pub enum DumpError { #[error("error with index: {0}")] Error(#[from] anyhow::Error), #[error("Heed error: {0}")] HeedError(#[from] heed::Error), #[error("dump already running")] DumpAlreadyRunning, #[error("dump `{0}` does not exist")] DumpDoesNotExist(String), } #[derive(Debug, Serialize, Deserialize, Copy, Clone)] enum DumpVersion { V1, V2, } impl DumpVersion { const CURRENT: Self = Self::V2; /// Select the good importation function from the `DumpVersion` of metadata pub fn import_index( self, size: usize, dump_path: &Path, index_path: &Path, primary_key: Option<&str>, ) -> anyhow::Result<()> { match self { Self::V1 => v1::import_index(size, dump_path, index_path, primary_key), Self::V2 => v2::import_index(size, dump_path, index_path, primary_key), } } } #[async_trait::async_trait] #[cfg_attr(test, automock)] pub trait DumpActorHandle { /// Start the creation of a dump /// Implementation: [handle_impl::DumpActorHandleImpl::create_dump] async fn create_dump(&self) -> DumpResult; /// Return the status of an already created dump /// Implementation: [handle_impl::DumpActorHandleImpl::dump_status] async fn dump_info(&self, uid: String) -> DumpResult; } #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Metadata { indexes: Vec, db_version: String, dump_version: DumpVersion, } impl Metadata { /// Create a Metadata with the current dump version of meilisearch. pub fn new(indexes: Vec, db_version: String) -> Self { Metadata { indexes, db_version, dump_version: DumpVersion::CURRENT, } } /// Extract Metadata from `metadata.json` file present at provided `dir_path` fn from_path(dir_path: &Path) -> anyhow::Result { let path = dir_path.join("metadata.json"); let file = File::open(path)?; let reader = std::io::BufReader::new(file); let metadata = serde_json::from_reader(reader)?; Ok(metadata) } /// Write Metadata in `metadata.json` file at provided `dir_path` pub async fn to_path(&self, dir_path: &Path) -> anyhow::Result<()> { let path = dir_path.join("metadata.json"); tokio::fs::write(path, serde_json::to_string(self)?).await?; Ok(()) } } #[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] #[serde(rename_all = "snake_case")] pub enum DumpStatus { Done, InProgress, Failed, } #[derive(Debug, Serialize, Clone)] #[serde(rename_all = "camelCase")] pub struct DumpInfo { pub uid: String, pub status: DumpStatus, #[serde(skip_serializing_if = "Option::is_none", flatten)] pub error: Option, } impl DumpInfo { pub fn new(uid: String, status: DumpStatus) -> Self { Self { uid, status, error: None, } } pub fn with_error(&mut self, error: String) { self.status = DumpStatus::Failed; self.error = Some(json!(error)); } pub fn done(&mut self) { self.status = DumpStatus::Done; } pub fn dump_already_in_progress(&self) -> bool { self.status == DumpStatus::InProgress } } pub fn load_dump( db_path: impl AsRef, dump_path: impl AsRef, size: usize, ) -> anyhow::Result<()> { info!("Importing dump from {}...", dump_path.as_ref().display()); let db_path = db_path.as_ref(); let dump_path = dump_path.as_ref(); let uuid_resolver = uuid_resolver::HeedUuidStore::new(&db_path)?; // extract the dump in a temporary directory let tmp_dir = TempDir::new_in(db_path)?; let tmp_dir_path = tmp_dir.path(); compression::from_tar_gz(dump_path, tmp_dir_path)?; // read dump metadata let metadata = Metadata::from_path(&tmp_dir_path)?; // remove indexes which have same `uuid` than indexes to import and create empty indexes let existing_index_uids = uuid_resolver.list()?; info!("Deleting indexes already present in the db and provided in the dump..."); for idx in &metadata.indexes { if let Some((_, uuid)) = existing_index_uids.iter().find(|(s, _)| s == &idx.uid) { // if we find the index in the `uuid_resolver` it's supposed to exist on the file system // and we want to delete it let path = db_path.join(&format!("indexes/index-{}", uuid)); info!("Deleting {}", path.display()); use std::io::ErrorKind::*; match std::fs::remove_dir_all(path) { Ok(()) => (), // if an index was present in the metadata but missing of the fs we can ignore the // problem because we are going to create it later Err(e) if e.kind() == NotFound => (), Err(e) => bail!(e), } } else { // if the index does not exist in the `uuid_resolver` we create it uuid_resolver.create_uuid(idx.uid.clone(), false)?; } } // import each indexes content for idx in metadata.indexes { let dump_path = tmp_dir_path.join(&idx.uid); // this cannot fail since we created all the missing uuid in the previous loop let uuid = uuid_resolver.get_uuid(idx.uid)?.unwrap(); let index_path = db_path.join(&format!("indexes/index-{}", uuid)); // let update_path = db_path.join(&format!("updates")); info!( "Importing dump from {} into {}...", dump_path.display(), index_path.display() ); metadata .dump_version .import_index( size, &dump_path, &index_path, idx.meta.primary_key.as_ref().map(|s| s.as_ref()), ) .unwrap(); info!("Dump importation from {} succeed", dump_path.display()); info!("importing the updates"); use crate::index_controller::update_actor::UpdateStore; use std::io::BufRead; let update_path = db_path.join("updates"); let options = EnvOpenOptions::new(); // create an UpdateStore to import the updates std::fs::create_dir_all(&update_path)?; let (update_store, _) = UpdateStore::create(options, &update_path)?; let file = File::open(&dump_path.join("updates.jsonl"))?; let reader = std::io::BufReader::new(file); let mut wtxn = update_store.env.write_txn()?; for update in reader.lines() { let mut update: UpdateStatus = serde_json::from_str(&update?)?; if let Some(path) = update.content_path_mut() { *path = update_path.join("update_files").join(&path).into(); } update_store.register_raw_updates(&mut wtxn, update, uuid)?; } wtxn.commit()?; } // finally we can move all the unprocessed update file into our new DB let update_path = tmp_dir_path.join("update_files"); let db_update_path = db_path.join("updates/update_files"); eprintln!("path {:?} exists: {:?}", update_path, update_path.exists()); eprintln!( "path {:?} exists: {:?}", db_update_path, db_update_path.exists() ); let _ = std::fs::remove_dir_all(db_update_path); std::fs::rename( tmp_dir_path.join("update_files"), db_path.join("updates/update_files"), ) .unwrap(); info!("Dump importation from {} succeed", dump_path.display()); Ok(()) }