From b924e897f1096b0ce799c601ecdc5926e7a0424a Mon Sep 17 00:00:00 2001 From: Marin Postma Date: Wed, 26 May 2021 22:52:06 +0200 Subject: [PATCH] load index dump --- meilisearch-http/src/index/dump.rs | 120 ++++++++++++ meilisearch-http/src/index/mod.rs | 69 ++----- .../update_handler.rs | 2 +- .../index_controller/dump_actor/loaders/v2.rs | 174 +++--------------- .../src/index_controller/dump_actor/mod.rs | 19 +- .../src/index_controller/index_actor/actor.rs | 4 +- .../src/index_controller/index_actor/store.rs | 14 +- meilisearch-http/src/index_controller/mod.rs | 1 - .../update_actor/store/dump.rs | 53 +++++- .../update_actor/store/mod.rs | 7 +- .../index_controller/uuid_resolver/store.rs | 77 ++++---- 11 files changed, 261 insertions(+), 279 deletions(-) create mode 100644 meilisearch-http/src/index/dump.rs rename meilisearch-http/src/{index_controller => index}/update_handler.rs (97%) diff --git a/meilisearch-http/src/index/dump.rs b/meilisearch-http/src/index/dump.rs new file mode 100644 index 000000000..35f5159e5 --- /dev/null +++ b/meilisearch-http/src/index/dump.rs @@ -0,0 +1,120 @@ +use std::{fs::{create_dir_all, File}, path::Path, sync::Arc}; + +use anyhow::Context; +use heed::RoTxn; +use indexmap::IndexMap; +use milli::update::{IndexDocumentsMethod, UpdateFormat::JsonStream}; +use serde::{Deserialize, Serialize}; +use anyhow::bail; + +use crate::option::IndexerOpts; + +use super::update_handler::UpdateHandler; +use super::{Checked, Index, Settings}; + +#[derive(Serialize, Deserialize)] +struct DumpMeta { + settings: Settings, + primary_key: Option, +} + +const META_FILE_NAME: &'static str = "meta.json"; +const DATA_FILE_NAME: &'static str = "documents.jsonl"; + +impl Index { + pub fn dump(&self, path: impl AsRef) -> anyhow::Result<()> { + // acquire write txn make sure any ongoing write is finnished before we start. + let txn = self.env.write_txn()?; + + self.dump_documents(&txn, &path)?; + self.dump_meta(&txn, &path)?; + + Ok(()) + } + + fn dump_documents(&self, txn: &RoTxn, path: impl AsRef) -> anyhow::Result<()> { + println!("dumping documents"); + let document_file_path = path.as_ref().join(DATA_FILE_NAME); + let mut document_file = File::create(&document_file_path)?; + + let documents = self.all_documents(txn)?; + let fields_ids_map = self.fields_ids_map(txn)?; + + // dump documents + let mut json_map = IndexMap::new(); + for document in documents { + let (_, reader) = document?; + + for (fid, bytes) in reader.iter() { + if let Some(name) = fields_ids_map.name(fid) { + json_map.insert(name, serde_json::from_slice::(bytes)?); + } + } + + serde_json::to_writer(&mut document_file, &json_map)?; + std::io::Write::write(&mut document_file, b"\n")?; + + json_map.clear(); + } + + Ok(()) + } + + fn dump_meta(&self, txn: &RoTxn, path: impl AsRef) -> anyhow::Result<()> { + println!("dumping settings"); + let meta_file_path = path.as_ref().join(META_FILE_NAME); + let mut meta_file = File::create(&meta_file_path)?; + + let settings = self.settings_txn(txn)?; + let primary_key = self.primary_key(txn)?.map(String::from); + let meta = DumpMeta { settings, primary_key }; + + serde_json::to_writer(&mut meta_file, &meta)?; + + Ok(()) + } + + pub fn load_dump( + src: impl AsRef, + dst: impl AsRef, + size: u64, + indexing_options: &IndexerOpts, + ) -> anyhow::Result<()> { + let dir_name = src + .as_ref() + .file_name() + .with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?; + let dst_dir_path = dst.as_ref().join(dir_name); + create_dir_all(&dst_dir_path)?; + + let meta_path = src.as_ref().join(META_FILE_NAME); + let mut meta_file = File::open(meta_path)?; + let DumpMeta { settings, primary_key } = serde_json::from_reader(&mut meta_file)?; + let index = Self::open(&dst_dir_path, size as usize)?; + let mut txn = index.write_txn()?; + + let handler = UpdateHandler::new(&indexing_options)?; + + index.update_settings_txn(&mut txn, &settings, handler.update_builder(0))?; + + let document_file_path = src.as_ref().join(DATA_FILE_NAME); + let document_file = File::open(&document_file_path)?; + index.update_documents_txn( + &mut txn, + JsonStream, + IndexDocumentsMethod::UpdateDocuments, + Some(document_file), + handler.update_builder(0), + primary_key.as_deref(), + )?; + + txn.commit()?; + + match Arc::try_unwrap(index.0) { + Ok(inner) => inner.prepare_for_closing().wait(), + Err(_) => bail!("Could not close index properly."), + } + + Ok(()) + } +} diff --git a/meilisearch-http/src/index/mod.rs b/meilisearch-http/src/index/mod.rs index c4bf19856..331db07c4 100644 --- a/meilisearch-http/src/index/mod.rs +++ b/meilisearch-http/src/index/mod.rs @@ -1,11 +1,9 @@ -use std::{collections::{BTreeSet, HashSet}, io::Write, marker::PhantomData, path::{Path, PathBuf}}; +use std::{collections::{BTreeSet, HashSet}, marker::PhantomData, path::Path}; use std::ops::Deref; use std::sync::Arc; -use std::fs::File; use anyhow::{bail, Context}; -use heed::RoTxn; -use indexmap::IndexMap; +use heed::{EnvOpenOptions, RoTxn}; use milli::obkv_to_json; use serde_json::{Map, Value}; @@ -16,6 +14,8 @@ use serde::{de::Deserializer, Deserialize}; mod search; mod updates; +mod dump; +pub mod update_handler; pub type Document = Map; @@ -39,6 +39,14 @@ where } impl Index { + pub fn open(path: impl AsRef, size: usize) -> anyhow::Result { + std::fs::create_dir_all(&path)?; + let mut options = EnvOpenOptions::new(); + options.map_size(size); + let index = milli::Index::new(options, &path)?; + Ok(Index(Arc::new(index))) + } + pub fn settings(&self) -> anyhow::Result> { let txn = self.read_txn()?; self.settings_txn(&txn) @@ -167,57 +175,4 @@ impl Index { displayed_fields_ids.retain(|fid| attributes_to_retrieve_ids.contains(fid)); Ok(displayed_fields_ids) } - - pub fn dump(&self, path: PathBuf) -> anyhow::Result<()> { - // acquire write txn make sure any ongoing write is finnished before we start. - let txn = self.env.write_txn()?; - - self.dump_documents(&txn, &path)?; - self.dump_meta(&txn, &path)?; - - Ok(()) - } - - fn dump_documents(&self, txn: &RoTxn, path: impl AsRef) -> anyhow::Result<()> { - println!("dumping documents"); - let document_file_path = path.as_ref().join("documents.jsonl"); - let mut document_file = File::create(&document_file_path)?; - - let documents = self.all_documents(txn)?; - let fields_ids_map = self.fields_ids_map(txn)?; - - // dump documents - let mut json_map = IndexMap::new(); - for document in documents { - let (_, reader) = document?; - - for (fid, bytes) in reader.iter() { - if let Some(name) = fields_ids_map.name(fid) { - json_map.insert(name, serde_json::from_slice::(bytes)?); - } - } - - serde_json::to_writer(&mut document_file, &json_map)?; - document_file.write(b"\n")?; - - json_map.clear(); - } - - Ok(()) - } - - fn dump_meta(&self, txn: &RoTxn, path: impl AsRef) -> anyhow::Result<()> { - println!("dumping settings"); - let meta_file_path = path.as_ref().join("meta.json"); - let mut meta_file = File::create(&meta_file_path)?; - - let settings = self.settings_txn(txn)?; - let json = serde_json::json!({ - "settings": settings, - }); - - serde_json::to_writer(&mut meta_file, &json)?; - - Ok(()) - } } diff --git a/meilisearch-http/src/index_controller/update_handler.rs b/meilisearch-http/src/index/update_handler.rs similarity index 97% rename from meilisearch-http/src/index_controller/update_handler.rs rename to meilisearch-http/src/index/update_handler.rs index d0086aadd..6a303b4ce 100644 --- a/meilisearch-http/src/index_controller/update_handler.rs +++ b/meilisearch-http/src/index/update_handler.rs @@ -38,7 +38,7 @@ impl UpdateHandler { }) } - fn update_builder(&self, update_id: u64) -> UpdateBuilder { + pub fn update_builder(&self, update_id: u64) -> UpdateBuilder { // We prepare the update by using the update builder. let mut update_builder = UpdateBuilder::new(update_id); if let Some(max_nb_chunks) = self.max_nb_chunks { diff --git a/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs b/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs index ee7044fd1..ab4aa8cff 100644 --- a/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs +++ b/meilisearch-http/src/index_controller/dump_actor/loaders/v2.rs @@ -1,25 +1,27 @@ -use std::{fs::File, io::BufReader, marker::PhantomData, path::Path}; +use std::path::Path; use anyhow::Context; use chrono::{DateTime, Utc}; use log::info; use serde::{Deserialize, Serialize}; -use crate::index_controller::uuid_resolver::store::UuidStore; +use crate::{index::Index, index_controller::{update_actor::UpdateStore, uuid_resolver::HeedUuidStore}, option::IndexerOpts}; #[derive(Serialize, Deserialize, Debug)] -pub struct MetadataV2 { +pub struct MetadataV2 { db_version: String, - index_db_size: usize, - update_db_size: usize, + index_db_size: u64, + update_db_size: u64, dump_date: DateTime, - _pth: PhantomData, } -impl MetadataV2 -where U: UuidStore, -{ - pub fn load_dump(self, src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { +impl MetadataV2 { + pub fn load_dump( + self, + src: impl AsRef, + dst: impl AsRef, + indexing_options: &IndexerOpts, + ) -> anyhow::Result<()> { info!( "Loading dump from {}, dump database version: {}, dump version: V2", self.dump_date, self.db_version @@ -32,148 +34,26 @@ where U: UuidStore, let tmp_dst = tempfile::tempdir_in(dst_dir)?; - self.load_index_resolver(&src, tmp_dst.path())?; - load_updates(&src, tmp_dst.path())?; - load_indexes(&src, tmp_dst.path())?; - Ok(()) - } - - fn load_index_resolver( - &self, - src: impl AsRef, - dst: impl AsRef, - ) -> anyhow::Result<()> { info!("Loading index database."); let uuid_resolver_path = dst.as_ref().join("uuid_resolver/"); std::fs::create_dir_all(&uuid_resolver_path)?; + HeedUuidStore::load_dump(src.as_ref(), tmp_dst.as_ref())?; - U::load_dump(src.as_ref(), dst.as_ref())?; + info!("Loading updates."); + UpdateStore::load_dump(&src, &tmp_dst.as_ref(), self.update_db_size)?; + + info!("Loading indexes"); + let indexes_path = src.as_ref().join("indexes"); + let indexes = indexes_path.read_dir()?; + for index in indexes { + let index = index?; + Index::load_dump(&index.path(), &dst, self.index_db_size, indexing_options)?; + } + + // Persist and atomically rename the db + let persisted_dump = tmp_dst.into_path(); + std::fs::rename(&persisted_dump, &dst)?; Ok(()) } } - - -fn load_updates(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { - info!("Loading updates."); - todo!() -} - -fn load_indexes(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { - info!("Loading indexes"); - todo!() -} - -// Extract Settings from `settings.json` file present at provided `dir_path` -//fn import_settings(dir_path: &Path) -> anyhow::Result> { -//let path = dir_path.join("settings.json"); -//let file = File::open(path)?; -//let reader = BufReader::new(file); -//let metadata: Settings = serde_json::from_reader(reader)?; - -//Ok(metadata.check()) -//} - -//pub fn import_dump( -//_db_size: usize, -//update_db_size: usize, -//_uuid: Uuid, -//dump_path: impl AsRef, -//db_path: impl AsRef, -//_primary_key: Option<&str>, -//) -> anyhow::Result<()> { -//info!("Dump import started."); -//info!("Importing outstanding updates..."); - -//import_updates(&dump_path, &db_path, update_db_size)?; - -//info!("done importing updates"); - -//Ok(()) -////let index_path = db_path.join(&format!("indexes/index-{}", uuid)); -////std::fs::create_dir_all(&index_path)?; -////let mut options = EnvOpenOptions::new(); -////options.map_size(size); -////let index = milli::Index::new(options, index_path)?; -////let index = Index(Arc::new(index)); - -////let mut txn = index.write_txn()?; - -////info!("importing the settings..."); -////// extract `settings.json` file and import content -////let settings = import_settings(&dump_path)?; -////let update_builder = UpdateBuilder::new(0); -////index.update_settings_txn(&mut txn, &settings, update_builder)?; - -////// import the documents in the index -////let update_builder = UpdateBuilder::new(1); -////let file = File::open(&dump_path.join("documents.jsonl"))?; -////let reader = std::io::BufReader::new(file); - -////info!("importing the documents..."); -////// TODO: TAMO: currently we ignore any error caused by the importation of the documents because -////// if there is no documents nor primary key it'll throw an anyhow error, but we must remove -////// this before the merge on main -////index.update_documents_txn( -////&mut txn, -////UpdateFormat::JsonStream, -////IndexDocumentsMethod::ReplaceDocuments, -////Some(reader), -////update_builder, -////primary_key, -////)?; - -////txn.commit()?; - -////// the last step: we extract the original milli::Index and close it -////Arc::try_unwrap(index.0) -////.map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index") -////.unwrap() -////.prepare_for_closing() -////.wait(); - -////info!("importing the updates..."); -////import_updates(dump_path, db_path) -//} - -//fn import_updates( -//src_path: impl AsRef, -//dst_path: impl AsRef, -//_update_db_size: usize -//) -> anyhow::Result<()> { -//let dst_update_path = dst_path.as_ref().join("updates"); -//std::fs::create_dir_all(&dst_update_path)?; - -//let dst_update_files_path = dst_update_path.join("update_files"); -//std::fs::create_dir_all(&dst_update_files_path)?; - -//let options = EnvOpenOptions::new(); -//let (update_store, _) = UpdateStore::create(options, &dst_update_path)?; - -//let src_update_path = src_path.as_ref().join("updates"); -//let src_update_files_path = src_update_path.join("update_files"); -//let update_data = File::open(&src_update_path.join("data.jsonl"))?; -//let mut update_data = BufReader::new(update_data); - -//let mut wtxn = update_store.env.write_txn()?; -//let mut line = String::new(); -//loop { -//match update_data.read_line(&mut line) { -//Ok(_) => { -//let UpdateEntry { uuid, mut update } = serde_json::from_str(&line)?; - -//if let Some(path) = update.content_path_mut() { -//let dst_file_path = dst_update_files_path.join(&path); -//let src_file_path = src_update_files_path.join(&path); -//*path = dst_update_files_path.join(&path); -//std::fs::copy(src_file_path, dst_file_path)?; -//} - -//update_store.register_raw_updates(&mut wtxn, update, uuid)?; -//} -//_ => break, -//} -//} -//wtxn.commit()?; -//Ok(()) -//} diff --git a/meilisearch-http/src/index_controller/dump_actor/mod.rs b/meilisearch-http/src/index_controller/dump_actor/mod.rs index f0eeb1be3..6d661d75c 100644 --- a/meilisearch-http/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-http/src/index_controller/dump_actor/mod.rs @@ -1,8 +1,3 @@ -mod actor; -mod handle_impl; -mod message; -mod loaders; - use std::{fs::File, path::Path}; use log::error; @@ -18,6 +13,15 @@ pub use actor::DumpActor; pub use handle_impl::*; pub use message::DumpMsg; +use crate::option::IndexerOpts; + +use super::uuid_resolver::store::UuidStore; + +mod actor; +mod handle_impl; +mod loaders; +mod message; + pub type DumpResult = std::result::Result; #[derive(Error, Debug)] @@ -117,11 +121,12 @@ impl DumpInfo { } } -pub fn load_dump( +pub fn load_dump( dst_path: impl AsRef, src_path: impl AsRef, _index_db_size: u64, _update_db_size: u64, + indexer_opts: &IndexerOpts, ) -> anyhow::Result<()> { let meta_path = src_path.as_ref().join("metadat.json"); let mut meta_file = File::open(&meta_path)?; @@ -129,7 +134,7 @@ pub fn load_dump( match meta { Metadata::V1 { meta } => meta.load_dump(src_path, dst_path)?, - Metadata::V2 { meta } => meta.load_dump(src_path, dst_path)?, + Metadata::V2 { meta } => meta.load_dump(src_path.as_ref(), dst_path.as_ref(), indexer_opts)?, } Ok(()) diff --git a/meilisearch-http/src/index_controller/index_actor/actor.rs b/meilisearch-http/src/index_controller/index_actor/actor.rs index f6f7cdc28..2f136c011 100644 --- a/meilisearch-http/src/index_controller/index_actor/actor.rs +++ b/meilisearch-http/src/index_controller/index_actor/actor.rs @@ -10,9 +10,9 @@ use tokio::{fs, sync::mpsc}; use tokio::task::spawn_blocking; use uuid::Uuid; -use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings}; +use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings, update_handler::UpdateHandler}; use crate::index_controller::{ - get_arc_ownership_blocking, update_handler::UpdateHandler, Failed, IndexStats, Processed, + get_arc_ownership_blocking, Failed, IndexStats, Processed, Processing, }; use crate::option::IndexerOpts; diff --git a/meilisearch-http/src/index_controller/index_actor/store.rs b/meilisearch-http/src/index_controller/index_actor/store.rs index 3dee166a9..11791be48 100644 --- a/meilisearch-http/src/index_controller/index_actor/store.rs +++ b/meilisearch-http/src/index_controller/index_actor/store.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::Arc; -use heed::EnvOpenOptions; use tokio::fs; use tokio::sync::RwLock; use tokio::task::spawn_blocking; @@ -48,7 +47,7 @@ impl IndexStore for MapIndexStore { let index_size = self.index_size; let index = spawn_blocking(move || -> IndexResult { - let index = open_index(&path, index_size)?; + let index = Index::open(path, index_size)?; if let Some(primary_key) = primary_key { let mut txn = index.write_txn()?; index.put_primary_key(&mut txn, &primary_key)?; @@ -76,8 +75,7 @@ impl IndexStore for MapIndexStore { } let index_size = self.index_size; - let index = spawn_blocking(move || open_index(path, index_size)) - .await??; + let index = spawn_blocking(move || Index::open(path, index_size)).await??; self.index_store.write().await.insert(uuid, index.clone()); Ok(Some(index)) } @@ -91,11 +89,3 @@ impl IndexStore for MapIndexStore { Ok(index) } } - -fn open_index(path: impl AsRef, size: usize) -> IndexResult { - std::fs::create_dir_all(&path)?; - let mut options = EnvOpenOptions::new(); - options.map_size(size); - let index = milli::Index::new(options, &path)?; - Ok(Index(Arc::new(index))) -} diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index 4e40a9873..69415a1cd 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -29,7 +29,6 @@ mod dump_actor; mod index_actor; mod snapshot; mod update_actor; -mod update_handler; mod updates; mod uuid_resolver; diff --git a/meilisearch-http/src/index_controller/update_actor/store/dump.rs b/meilisearch-http/src/index_controller/update_actor/store/dump.rs index 82b8d0136..1f36931d1 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/dump.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/dump.rs @@ -1,12 +1,7 @@ -use std::{ - collections::HashSet, - fs::{copy, create_dir_all, File}, - io::Write, - path::{Path, PathBuf}, -}; +use std::{collections::HashSet, fs::{copy, create_dir_all, File}, io::{BufRead, BufReader, Write}, path::{Path, PathBuf}}; use anyhow::Context; -use heed::RoTxn; +use heed::{EnvOpenOptions, RoTxn}; use serde::{Deserialize, Serialize}; use uuid::Uuid; @@ -15,7 +10,7 @@ use super::UpdateStore; use crate::index_controller::{index_actor::IndexActorHandle, UpdateStatus}; #[derive(Serialize, Deserialize)] -pub struct UpdateEntry { +struct UpdateEntry { uuid: Uuid, update: UpdateStatus, } @@ -121,6 +116,48 @@ impl UpdateStore { Ok(()) } + + pub fn load_dump(src: impl AsRef, dst: impl AsRef, db_size: u64) -> anyhow::Result<()> { + let dst_updates_path = dst.as_ref().join("updates/"); + create_dir_all(&dst_updates_path)?; + let dst_update_files_path = dst_updates_path.join("update_files/"); + create_dir_all(&dst_update_files_path)?; + + let mut options = EnvOpenOptions::new(); + options.map_size(db_size as usize); + let (store, _) = UpdateStore::new(options, &dst_updates_path)?; + + let src_update_path = src.as_ref().join("updates"); + let src_update_files_path = src_update_path.join("update_files"); + let update_data = File::open(&src_update_path.join("data.jsonl"))?; + let mut update_data = BufReader::new(update_data); + + let mut wtxn = store.env.write_txn()?; + let mut line = String::new(); + loop { + match update_data.read_line(&mut line) { + Ok(0) => break, + Ok(_) => { + let UpdateEntry { uuid, mut update } = serde_json::from_str(&line)?; + + if let Some(path) = update.content_path_mut() { + let dst_file_path = dst_update_files_path.join(&path); + let src_file_path = src_update_files_path.join(&path); + *path = dst_update_files_path.join(&path); + std::fs::copy(src_file_path, dst_file_path)?; + } + + store.register_raw_updates(&mut wtxn, update, uuid)?; + } + _ => break, + } + + line.clear(); + } + wtxn.commit()?; + + Ok(()) + } } async fn dump_indexes(uuids: &HashSet, handle: impl IndexActorHandle, path: impl AsRef)-> anyhow::Result<()> { diff --git a/meilisearch-http/src/index_controller/update_actor/store/mod.rs b/meilisearch-http/src/index_controller/update_actor/store/mod.rs index 58ac24720..661b712ac 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/mod.rs @@ -100,7 +100,7 @@ pub struct UpdateStore { } impl UpdateStore { - pub fn create( + fn new( mut options: EnvOpenOptions, path: impl AsRef, ) -> anyhow::Result<(Self, mpsc::Receiver<()>)> { @@ -114,7 +114,6 @@ impl UpdateStore { let state = Arc::new(StateLock::from_state(State::Idle)); let (notification_sender, notification_receiver) = mpsc::channel(10); - // Send a first notification to trigger the process. Ok(( Self { @@ -134,10 +133,10 @@ impl UpdateStore { path: impl AsRef, index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static, ) -> anyhow::Result> { - let (update_store, mut notification_receiver) = Self::create(options, path)?; + let (update_store, mut notification_receiver) = Self::new(options, path)?; let update_store = Arc::new(update_store); - // trigger the update loop + // Send a first notification to trigger the process. let _ = update_store.notification_sender.send(()); // Init update loop to perform any pending updates at launch. diff --git a/meilisearch-http/src/index_controller/uuid_resolver/store.rs b/meilisearch-http/src/index_controller/uuid_resolver/store.rs index 0c6b66ddf..876c2454c 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/store.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/store.rs @@ -30,7 +30,6 @@ pub trait UuidStore: Sized { async fn snapshot(&self, path: PathBuf) -> Result>; async fn get_size(&self) -> Result; async fn dump(&self, path: PathBuf) -> Result>; - fn load_dump(src: &Path, dst: &Path) -> Result<()>; } #[derive(Clone)] @@ -46,14 +45,7 @@ impl HeedUuidStore { let mut options = EnvOpenOptions::new(); options.map_size(UUID_STORE_SIZE); // 1GB let env = options.open(path)?; - let db = env.create_database(None)?; - Ok(Self { env, db }) - } - - pub fn create_uuid(&self, name: String, err: bool) -> Result { - let env = self.env.clone(); - let db = self.db; - let mut txn = env.write_txn()?; + let db = env.create_database(None)?; Ok(Self { env, db }) } pub fn create_uuid(&self, name: String, err: bool) -> Result { let env = self.env.clone(); let db = self.db; let mut txn = env.write_txn()?; match db.get(&txn, &name)? { Some(uuid) => { if err { @@ -154,17 +146,51 @@ impl HeedUuidStore { let txn = self.env.read_txn()?; for entry in self.db.iter(&txn)? { let (uid, uuid) = entry?; - let uuid = Uuid::from_slice(entry.1)?; - uuids.insert(uuid); + let uid = uid.to_string(); + let uuid = Uuid::from_slice(uuid)?; + let entry = DumpEntry { uuid, uid }; serde_json::to_writer(&mut dump_file, &entry)?; dump_file.write(b"\n").unwrap(); + + uuids.insert(uuid); } Ok(uuids) } + + pub fn load_dump(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { + let uuid_resolver_path = dst.as_ref().join("uuid_resolver/"); + std::fs::create_dir_all(&uuid_resolver_path)?; + + let src_indexes = src.as_ref().join("index_uuids/data.jsonl"); + let indexes = File::open(&src_indexes)?; + let mut indexes = BufReader::new(indexes); + let mut line = String::new(); + + let db = Self::new(dst)?; + let mut txn = db.env.write_txn()?; + + loop { + match indexes.read_line(&mut line) { + Ok(0) => break, + Ok(_) => { + let DumpEntry { uuid, uid } = serde_json::from_str(&line)?; + db.db.put(&mut txn, &uid, uuid.as_bytes())?; + } + Err(e) => Err(e)?, + } + + line.clear(); + } + txn.commit()?; + + db.env.prepare_for_closing().wait(); + + Ok(()) + } } #[async_trait::async_trait] @@ -207,33 +233,4 @@ impl UuidStore for HeedUuidStore { let this = self.clone(); tokio::task::spawn_blocking(move || this.dump(path)).await? } - - async fn load_dump(src: &Path, dst: &Path) -> Result<()> { - let uuid_resolver_path = dst.join("uuid_resolver/"); - std::fs::create_dir_all(&uuid_resolver_path)?; - - let src_indexes = src.join("index_uuids/data.jsonl"); - let indexes = File::Open(&src_indexes)?; - let mut indexes = BufReader::new(indexes); - let mut line = String::new(); - - let db = Self::new(dst)?; - let mut txn = db.env.write_txn()?; - - loop { - match indexes.read_line(&mut line) { - Ok(0) => break, - Ok(_) => { - let DumpEntry { uuid, uid } = serde_json::from_str(&line)?; - db.db.put(&mut txn, &uid, uuid.as_bytes())?; - } - Err(e) => Err(e)?, - } - - line.clear(); - } - txn.commit()?; - - Ok(()) - } }