From 6a1964f146e217f9d8dfa3365ce8169f75c82463 Mon Sep 17 00:00:00 2001 From: mpostma Date: Tue, 28 Sep 2021 11:59:55 +0200 Subject: [PATCH] restore dumps --- meilisearch-lib/src/compression.rs | 6 +- meilisearch-lib/src/document_formats.rs | 52 ++++++ meilisearch-lib/src/index/dump.rs | 174 ++++++++++-------- meilisearch-lib/src/index/updates.rs | 110 +++++------ .../index_controller/dump_actor/loaders/v2.rs | 16 +- .../src/index_controller/dump_actor/mod.rs | 3 +- .../index_controller/index_resolver/mod.rs | 36 +++- .../src/index_controller/update_file_store.rs | 71 +++++-- .../src/index_controller/updates/message.rs | 7 +- .../src/index_controller/updates/mod.rs | 12 +- .../index_controller/updates/store/dump.rs | 164 +++++++---------- .../src/index_controller/updates/store/mod.rs | 44 ++--- meilisearch-lib/src/lib.rs | 1 + 13 files changed, 395 insertions(+), 301 deletions(-) create mode 100644 meilisearch-lib/src/document_formats.rs diff --git a/meilisearch-lib/src/compression.rs b/meilisearch-lib/src/compression.rs index f9620eb2a..cd60854c6 100644 --- a/meilisearch-lib/src/compression.rs +++ b/meilisearch-lib/src/compression.rs @@ -1,9 +1,9 @@ -use std::fs::{create_dir_all, File}; +use std::fs::File; use std::io::Write; use std::path::Path; -use flate2::{read::GzDecoder, write::GzEncoder, Compression}; -use tar::{Archive, Builder}; +use flate2::{write::GzEncoder, Compression}; +use tar::Builder; pub fn to_tar_gz(src: impl AsRef, dest: impl AsRef) -> anyhow::Result<()> { let mut f = File::create(dest)?; diff --git a/meilisearch-lib/src/document_formats.rs b/meilisearch-lib/src/document_formats.rs new file mode 100644 index 000000000..8540ce4b2 --- /dev/null +++ b/meilisearch-lib/src/document_formats.rs @@ -0,0 +1,52 @@ +use std::{fmt, io::{Read, Seek, Write}}; + +use milli::documents::DocumentBatchBuilder; +use serde_json::{Deserializer, Map, Value}; + +type Result = std::result::Result; + +#[derive(Debug)] +pub enum PayloadType { + Jsonl, +} + +impl fmt::Display for PayloadType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + PayloadType::Jsonl => write!(f, "ndjson"), + } + } +} + +#[derive(thiserror::Error, Debug)] +pub enum DocumentFormatError { + #[error("Internal error: {0}")] + Internal(Box), + #[error("{0}. The {1} payload provided is malformed.")] + MalformedPayload(Box, PayloadType), +} + +internal_error!( + DocumentFormatError: milli::documents::Error +); + +macro_rules! malformed { + ($type:path, $e:expr) => { + $e.map_err(|e| DocumentFormatError::MalformedPayload(Box::new(e), $type)) + }; +} + +/// read jsonl from input and write an obkv batch to writer. +pub fn read_jsonl(input: impl Read, writer: impl Write + Seek) -> Result<()> { + let mut builder = DocumentBatchBuilder::new(writer)?; + let stream = Deserializer::from_reader(input).into_iter::>(); + + for value in stream { + let value = malformed!(PayloadType::Jsonl, value)?; + builder.add_documents(&value)?; + } + + builder.finish()?; + + Ok(()) +} diff --git a/meilisearch-lib/src/index/dump.rs b/meilisearch-lib/src/index/dump.rs index 018ae6d2f..8049df500 100644 --- a/meilisearch-lib/src/index/dump.rs +++ b/meilisearch-lib/src/index/dump.rs @@ -1,12 +1,18 @@ -use std::fs::File; -use std::io::Write; +use std::fs::{create_dir_all, File}; +use std::io::{BufReader, Seek, SeekFrom, Write}; use std::path::Path; -use heed::RoTxn; +use anyhow::Context; +use heed::{EnvOpenOptions, RoTxn}; use indexmap::IndexMap; +use milli::documents::DocumentBatchReader; use serde::{Deserialize, Serialize}; +use serde_json::Value; -use crate::options::IndexerOpts; +use crate::document_formats::read_jsonl; +use crate::index::update_handler::UpdateHandler; +use crate::index::updates::apply_settings_to_builder; +use crate::index_controller::{asc_ranking_rule, desc_ranking_rule}; use super::error::Result; use super::{Index, Settings, Unchecked}; @@ -24,6 +30,11 @@ impl Index { pub fn dump(&self, path: impl AsRef) -> Result<()> { // acquire write txn make sure any ongoing write is finished before we start. let txn = self.env.write_txn()?; + let path = path + .as_ref() + .join(format!("indexes/{}", self.uuid.to_string())); + + create_dir_all(&path)?; self.dump_documents(&txn, &path)?; self.dump_meta(&txn, &path)?; @@ -75,92 +86,101 @@ impl Index { } pub fn load_dump( - _src: impl AsRef, - _dst: impl AsRef, - _size: usize, - _indexing_options: &IndexerOpts, + src: impl AsRef, + dst: impl AsRef, + size: usize, + update_handler: &UpdateHandler, ) -> anyhow::Result<()> { - //let dir_name = src - //.as_ref() - //.file_name() - //.with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?; + let dir_name = src + .as_ref() + .file_name() + .with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?; - //let dst_dir_path = dst.as_ref().join("indexes").join(dir_name); - //create_dir_all(&dst_dir_path)?; + let dst_dir_path = dst.as_ref().join("indexes").join(dir_name); + create_dir_all(&dst_dir_path)?; - //let meta_path = src.as_ref().join(META_FILE_NAME); - //let mut meta_file = File::open(meta_path)?; + let meta_path = src.as_ref().join(META_FILE_NAME); + let mut meta_file = File::open(meta_path)?; - //// We first deserialize the dump meta into a serde_json::Value and change - //// the custom ranking rules settings from the old format to the new format. - //let mut meta: Value = serde_json::from_reader(&mut meta_file)?; - //if let Some(ranking_rules) = meta.pointer_mut("/settings/rankingRules") { - //convert_custom_ranking_rules(ranking_rules); - //} + // We first deserialize the dump meta into a serde_json::Value and change + // the custom ranking rules settings from the old format to the new format. + let mut meta: Value = serde_json::from_reader(&mut meta_file)?; + if let Some(ranking_rules) = meta.pointer_mut("/settings/rankingRules") { + convert_custom_ranking_rules(ranking_rules); + } - //// Then we serialize it back into a vec to deserialize it - //// into a `DumpMeta` struct with the newly patched `rankingRules` format. - //let patched_meta = serde_json::to_vec(&meta)?; + // Then we serialize it back into a vec to deserialize it + // into a `DumpMeta` struct with the newly patched `rankingRules` format. + let patched_meta = serde_json::to_vec(&meta)?; - //let DumpMeta { - //settings, - //primary_key, - //} = serde_json::from_slice(&patched_meta)?; - //let settings = settings.check(); - //let index = Self::open(&dst_dir_path, size)?; - //let mut txn = index.write_txn()?; + let DumpMeta { + settings, + primary_key, + } = serde_json::from_slice(&patched_meta)?; + let settings = settings.check(); - //let handler = UpdateHandler::new(indexing_options)?; + let mut options = EnvOpenOptions::new(); + options.map_size(size); + let index = milli::Index::new(options, &dst_dir_path)?; - //index.update_settings_txn(&mut txn, &settings, handler.update_builder(0))?; + let mut txn = index.write_txn()?; - //let document_file_path = src.as_ref().join(DATA_FILE_NAME); - //let reader = File::open(&document_file_path)?; - //let mut reader = BufReader::new(reader); - //reader.fill_buf()?; - // If the document file is empty, we don't perform the document addition, to prevent - // a primary key error to be thrown. + // Apply settings first + let builder = update_handler.update_builder(0); + let mut builder = builder.settings(&mut txn, &index); - todo!("fix obk document dumps") - //if !reader.buffer().is_empty() { - //index.update_documents_txn( - //&mut txn, - //IndexDocumentsMethod::UpdateDocuments, - //Some(reader), - //handler.update_builder(0), - //primary_key.as_deref(), - //)?; - //} + if let Some(primary_key) = primary_key { + builder.set_primary_key(primary_key); + } - //txn.commit()?; + apply_settings_to_builder(&settings, &mut builder); - //match Arc::try_unwrap(index.0) { - //Ok(inner) => inner.prepare_for_closing().wait(), - //Err(_) => bail!("Could not close index properly."), - //} + builder.execute(|_, _| ())?; - //Ok(()) + let document_file_path = src.as_ref().join(DATA_FILE_NAME); + let reader = BufReader::new(File::open(&document_file_path)?); + + let mut tmp_doc_file = tempfile::tempfile()?; + + read_jsonl(reader, &mut tmp_doc_file)?; + + tmp_doc_file.seek(SeekFrom::Start(0))?; + + let documents_reader = DocumentBatchReader::from_reader(tmp_doc_file)?; + + //If the document file is empty, we don't perform the document addition, to prevent + //a primary key error to be thrown. + if !documents_reader.is_empty() { + let builder = update_handler.update_builder(0).index_documents(&mut txn, &index); + builder.execute(documents_reader, |_, _| ())?; + } + + txn.commit()?; + + index.prepare_for_closing().wait(); + + Ok(()) } } -// /// Converts the ranking rules from the format `asc(_)`, `desc(_)` to the format `_:asc`, `_:desc`. -// /// -// /// This is done for compatibility reasons, and to avoid a new dump version, -// /// since the new syntax was introduced soon after the new dump version. -//fn convert_custom_ranking_rules(ranking_rules: &mut Value) { - //*ranking_rules = match ranking_rules.take() { - //Value::Array(values) => values - //.into_iter() - //.filter_map(|value| match value { - //Value::String(s) if s.starts_with("asc") => asc_ranking_rule(&s) - //.map(|f| format!("{}:asc", f)) - //.map(Value::String), - //Value::String(s) if s.starts_with("desc") => desc_ranking_rule(&s) - //.map(|f| format!("{}:desc", f)) - //.map(Value::String), - //otherwise => Some(otherwise), - //}) - //.collect(), - //otherwise => otherwise, - //} -//} +/// Converts the ranking rules from the format `asc(_)`, `desc(_)` to the format `_:asc`, `_:desc`. +/// +/// This is done for compatibility reasons, and to avoid a new dump version, +/// since the new syntax was introduced soon after the new dump version. +fn convert_custom_ranking_rules(ranking_rules: &mut Value) { + *ranking_rules = match ranking_rules.take() { + Value::Array(values) => values + .into_iter() + .filter_map(|value| match value { + Value::String(s) if s.starts_with("asc") => asc_ranking_rule(&s) + .map(|f| format!("{}:asc", f)) + .map(Value::String), + Value::String(s) if s.starts_with("desc") => desc_ranking_rule(&s) + .map(|f| format!("{}:desc", f)) + .map(Value::String), + otherwise => Some(otherwise), + }) + .collect(), + otherwise => otherwise, + } +} diff --git a/meilisearch-lib/src/index/updates.rs b/meilisearch-lib/src/index/updates.rs index 28d2734f0..44558fdae 100644 --- a/meilisearch-lib/src/index/updates.rs +++ b/meilisearch-lib/src/index/updates.rs @@ -266,59 +266,7 @@ impl Index { // We must use the write transaction of the update here. let mut builder = update_builder.settings(txn, self); - match settings.searchable_attributes { - Setting::Set(ref names) => builder.set_searchable_fields(names.clone()), - Setting::Reset => builder.reset_searchable_fields(), - Setting::NotSet => (), - } - - match settings.displayed_attributes { - Setting::Set(ref names) => builder.set_displayed_fields(names.clone()), - Setting::Reset => builder.reset_displayed_fields(), - Setting::NotSet => (), - } - - match settings.filterable_attributes { - Setting::Set(ref facets) => { - builder.set_filterable_fields(facets.clone().into_iter().collect()) - } - Setting::Reset => builder.reset_filterable_fields(), - Setting::NotSet => (), - } - - match settings.sortable_attributes { - Setting::Set(ref fields) => { - builder.set_sortable_fields(fields.iter().cloned().collect()) - } - Setting::Reset => builder.reset_sortable_fields(), - Setting::NotSet => (), - } - - match settings.ranking_rules { - Setting::Set(ref criteria) => builder.set_criteria(criteria.clone()), - Setting::Reset => builder.reset_criteria(), - Setting::NotSet => (), - } - - match settings.stop_words { - Setting::Set(ref stop_words) => builder.set_stop_words(stop_words.clone()), - Setting::Reset => builder.reset_stop_words(), - Setting::NotSet => (), - } - - match settings.synonyms { - Setting::Set(ref synonyms) => { - builder.set_synonyms(synonyms.clone().into_iter().collect()) - } - Setting::Reset => builder.reset_synonyms(), - Setting::NotSet => (), - } - - match settings.distinct_attribute { - Setting::Set(ref attr) => builder.set_distinct_field(attr.clone()), - Setting::Reset => builder.reset_distinct_field(), - Setting::NotSet => (), - } + apply_settings_to_builder(settings, &mut builder); builder.execute(|indexing_step, update_id| { debug!("update {}: {:?}", update_id, indexing_step) @@ -328,6 +276,62 @@ impl Index { } } +pub fn apply_settings_to_builder(settings: &Settings, builder: &mut milli::update::Settings) { + match settings.searchable_attributes { + Setting::Set(ref names) => builder.set_searchable_fields(names.clone()), + Setting::Reset => builder.reset_searchable_fields(), + Setting::NotSet => (), + } + + match settings.displayed_attributes { + Setting::Set(ref names) => builder.set_displayed_fields(names.clone()), + Setting::Reset => builder.reset_displayed_fields(), + Setting::NotSet => (), + } + + match settings.filterable_attributes { + Setting::Set(ref facets) => { + builder.set_filterable_fields(facets.clone().into_iter().collect()) + } + Setting::Reset => builder.reset_filterable_fields(), + Setting::NotSet => (), + } + + match settings.sortable_attributes { + Setting::Set(ref fields) => { + builder.set_sortable_fields(fields.iter().cloned().collect()) + } + Setting::Reset => builder.reset_sortable_fields(), + Setting::NotSet => (), + } + + match settings.ranking_rules { + Setting::Set(ref criteria) => builder.set_criteria(criteria.clone()), + Setting::Reset => builder.reset_criteria(), + Setting::NotSet => (), + } + + match settings.stop_words { + Setting::Set(ref stop_words) => builder.set_stop_words(stop_words.clone()), + Setting::Reset => builder.reset_stop_words(), + Setting::NotSet => (), + } + + match settings.synonyms { + Setting::Set(ref synonyms) => { + builder.set_synonyms(synonyms.clone().into_iter().collect()) + } + Setting::Reset => builder.reset_synonyms(), + Setting::NotSet => (), + } + + match settings.distinct_attribute { + Setting::Set(ref attr) => builder.set_distinct_field(attr.clone()), + Setting::Reset => builder.reset_distinct_field(), + Setting::NotSet => (), + } +} + #[cfg(test)] mod test { use super::*; diff --git a/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs b/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs index 94b7321ae..8280e9613 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/loaders/v2.rs @@ -4,8 +4,8 @@ use chrono::{DateTime, Utc}; use log::info; use serde::{Deserialize, Serialize}; -use crate::index::Index; -use crate::index_controller::index_resolver::uuid_store::HeedUuidStore; +use crate::index_controller::index_resolver::IndexResolver; +use crate::index_controller::update_file_store::UpdateFileStore; use crate::index_controller::updates::store::UpdateStore; use crate::options::IndexerOpts; @@ -41,19 +41,11 @@ impl MetadataV2 { self.dump_date, self.db_version ); - info!("Loading index database."); - HeedUuidStore::load_dump(src.as_ref(), &dst)?; - - info!("Loading updates."); + IndexResolver::load_dump(src.as_ref(), &dst, index_db_size, indexing_options)?; + UpdateFileStore::load_dump(src.as_ref(), &dst)?; UpdateStore::load_dump(&src, &dst, update_db_size)?; info!("Loading indexes."); - let indexes_path = src.as_ref().join("indexes"); - let indexes = indexes_path.read_dir()?; - for index in indexes { - let index = index?; - Index::load_dump(&index.path(), &dst, index_db_size, indexing_options)?; - } Ok(()) } diff --git a/meilisearch-lib/src/index_controller/dump_actor/mod.rs b/meilisearch-lib/src/index_controller/dump_actor/mod.rs index b7c61f568..c2410107d 100644 --- a/meilisearch-lib/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-lib/src/index_controller/dump_actor/mod.rs @@ -115,6 +115,7 @@ pub fn load_dump( let tmp_src = tempfile::tempdir_in(".")?; let tmp_src_path = tmp_src.path(); + println!("importing to {}", dst_path.as_ref().display()); crate::from_tar_gz(&src_path, tmp_src_path)?; let meta_path = tmp_src_path.join(META_FILE_NAME); @@ -179,7 +180,7 @@ impl DumpTask { let uuids = self.index_resolver.dump(temp_dump_path.clone()).await?; - UpdateMsg::dump(&self.update_handle, uuids.into_iter().collect(), temp_dump_path.clone()).await?; + UpdateMsg::dump(&self.update_handle, uuids, temp_dump_path.clone()).await?; let dump_path = tokio::task::spawn_blocking(move || -> Result { let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?; diff --git a/meilisearch-lib/src/index_controller/index_resolver/mod.rs b/meilisearch-lib/src/index_controller/index_resolver/mod.rs index 5721fce8a..dcd1ed512 100644 --- a/meilisearch-lib/src/index_controller/index_resolver/mod.rs +++ b/meilisearch-lib/src/index_controller/index_resolver/mod.rs @@ -1,6 +1,5 @@ pub mod uuid_store; mod index_store; -//mod message; pub mod error; use std::path::Path; @@ -10,7 +9,7 @@ use uuid_store::{UuidStore, HeedUuidStore}; use index_store::{IndexStore, MapIndexStore}; use error::{Result, IndexResolverError}; -use crate::{index::Index, options::IndexerOpts}; +use crate::{index::{Index, update_handler::UpdateHandler}, options::IndexerOpts}; pub type HardStateIndexResolver = IndexResolver; @@ -25,6 +24,28 @@ pub struct IndexResolver { index_store: I, } +impl IndexResolver { + pub fn load_dump( + src: impl AsRef, + dst: impl AsRef, + index_db_size: usize, + indexer_opts: &IndexerOpts, + ) -> anyhow::Result<()> { + HeedUuidStore::load_dump(&src, &dst)?; + + let indexes_path = src.as_ref().join("indexes"); + let indexes = indexes_path.read_dir()?; + + let update_handler = UpdateHandler::new(indexer_opts).unwrap(); + for index in indexes { + let index = index?; + Index::load_dump(&index.path(), &dst, index_db_size, &update_handler)?; + } + + Ok(()) + } +} + impl IndexResolver where U: UuidStore, I: IndexStore, @@ -39,8 +60,14 @@ where U: UuidStore, } } - pub async fn dump(&self, _path: impl AsRef) -> Result> { - todo!() + pub async fn dump(&self, path: impl AsRef) -> Result> { + let uuids = self.index_uuid_store.dump(path.as_ref().to_owned()).await?; + let mut indexes = Vec::new(); + for uuid in uuids { + indexes.push(self.get_index_by_uuid(uuid).await?); + } + + Ok(indexes) } pub async fn get_size(&self) -> Result { @@ -51,7 +78,6 @@ where U: UuidStore, pub async fn snapshot(&self, path: impl AsRef) -> Result> { let uuids = self.index_uuid_store.snapshot(path.as_ref().to_owned()).await?; let mut indexes = Vec::new(); - for uuid in uuids { indexes.push(self.get_index_by_uuid(uuid).await?); } diff --git a/meilisearch-lib/src/index_controller/update_file_store.rs b/meilisearch-lib/src/index_controller/update_file_store.rs index f21560f73..d7b3e2560 100644 --- a/meilisearch-lib/src/index_controller/update_file_store.rs +++ b/meilisearch-lib/src/index_controller/update_file_store.rs @@ -1,14 +1,17 @@ -use std::fs::File; +use std::fs::{File, create_dir_all}; +use std::io::{BufReader, BufWriter, Write}; use std::path::{Path, PathBuf}; use std::ops::{Deref, DerefMut}; -//use milli::documents::DocumentBatchReader; -//use serde_json::Map; +use milli::documents::DocumentBatchReader; +use serde_json::Map; use tempfile::NamedTempFile; use uuid::Uuid; const UPDATE_FILES_PATH: &str = "updates/updates_files"; +use crate::document_formats::read_jsonl; + use super::error::Result; pub struct UpdateFile { @@ -42,6 +45,27 @@ pub struct UpdateFileStore { } impl UpdateFileStore { + pub fn load_dump(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { + let src_update_files_path = src.as_ref().join(UPDATE_FILES_PATH); + let dst_update_files_path = dst.as_ref().join(UPDATE_FILES_PATH); + + create_dir_all(&dst_update_files_path).unwrap(); + + let entries = std::fs::read_dir(src_update_files_path).unwrap(); + + for entry in entries { + let entry = entry.unwrap(); + let update_file = BufReader::new(File::open(entry.path()).unwrap()); + let file_uuid = entry.file_name(); + let file_uuid = file_uuid.to_str().ok_or_else(|| anyhow::anyhow!("invalid update file name"))?; + let dst_path = dst_update_files_path.join(file_uuid); + let dst_file = BufWriter::new(File::create(dst_path)?); + read_jsonl(update_file, dst_file)?; + } + + Ok(()) + } + pub fn new(path: impl AsRef) -> Result { let path = path.as_ref().join(UPDATE_FILES_PATH); std::fs::create_dir_all(&path).unwrap(); @@ -78,27 +102,34 @@ impl UpdateFileStore { } /// Peform a dump of the given update file uuid into the provided snapshot path. - pub fn dump(&self, _uuid: Uuid, _snapshot_path: impl AsRef) -> Result<()> { - todo!() - //let update_file_path = self.path.join(uuid.to_string()); - //let snapshot_file_path: snapshot_path.as_ref().join(format!("update_files/uuid", uuid)); + pub fn dump(&self, uuid: Uuid, dump_path: impl AsRef) -> Result<()> { + let uuid_string = uuid.to_string(); + let update_file_path = self.path.join(&uuid_string); + let mut dst = dump_path.as_ref().join(UPDATE_FILES_PATH); + std::fs::create_dir_all(&dst).unwrap(); + dst.push(&uuid_string); - //let update_file = File::open(update_file_path).unwrap(); + let update_file = File::open(update_file_path).unwrap(); + let mut dst_file = NamedTempFile::new().unwrap(); + let mut document_reader = DocumentBatchReader::from_reader(update_file).unwrap(); + let mut document_buffer = Map::new(); + // TODO: we need to find a way to do this more efficiently. (create a custom serializer to + // jsonl for example...) + while let Some((index, document)) = document_reader.next_document_with_index().unwrap() { + for (field_id, content) in document.iter() { + let field_name = index.get_by_left(&field_id).unwrap(); + let content = serde_json::from_slice(content).unwrap(); + document_buffer.insert(field_name.to_string(), content); + } - //let mut document_reader = DocumentBatchReader::from_reader(update_file).unwrap(); + serde_json::to_writer(&mut dst_file, &document_buffer).unwrap(); + dst_file.write(b"\n").unwrap(); + document_buffer.clear(); + } - //let mut document_buffer = Map::new(); - //// TODO: we need to find a way to do this more efficiently. (create a custom serializer to - //// jsonl for example...) - //while let Some((index, document)) = document_reader.next_document_with_index().unwrap() { - //for (field_id, content) in document.iter() { - //let field_name = index.get_by_left(&field_id).unwrap(); - //let content = serde_json::from_slice(content).unwrap(); - //document_buffer.insert(field_name.to_string(), content); - //} + dst_file.persist(dst).unwrap(); - //} - //Ok(()) + Ok(()) } } diff --git a/meilisearch-lib/src/index_controller/updates/message.rs b/meilisearch-lib/src/index_controller/updates/message.rs index f96c707fd..22a920e12 100644 --- a/meilisearch-lib/src/index_controller/updates/message.rs +++ b/meilisearch-lib/src/index_controller/updates/message.rs @@ -1,4 +1,3 @@ -use std::collections::HashSet; use std::path::PathBuf; use tokio::sync::{mpsc, oneshot}; @@ -35,7 +34,7 @@ pub enum UpdateMsg { ret: oneshot::Sender>, }, Dump { - uuids: HashSet, + indexes: Vec, path: PathBuf, ret: oneshot::Sender>, }, @@ -54,11 +53,11 @@ impl UpdateMsg { pub async fn dump( sender: &mpsc::Sender, - uuids: HashSet, + indexes: Vec, path: PathBuf, ) -> Result<()> { let (ret, rcv) = oneshot::channel(); - let msg = Self::Dump { path, uuids, ret }; + let msg = Self::Dump { path, indexes, ret }; sender.send(msg).await?; rcv.await? } diff --git a/meilisearch-lib/src/index_controller/updates/mod.rs b/meilisearch-lib/src/index_controller/updates/mod.rs index 63716928f..733bda8e6 100644 --- a/meilisearch-lib/src/index_controller/updates/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/mod.rs @@ -3,7 +3,6 @@ mod message; pub mod status; pub mod store; -use std::collections::HashSet; use std::io; use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicBool; @@ -104,7 +103,6 @@ pub struct UpdateLoop { store: Arc, inbox: Option>, update_file_store: UpdateFileStore, - index_resolver: Arc, must_exit: Arc, } @@ -133,7 +131,6 @@ impl UpdateLoop { inbox, must_exit, update_file_store, - index_resolver, }) } @@ -184,8 +181,8 @@ impl UpdateLoop { GetInfo { ret } => { let _ = ret.send(self.handle_get_info().await); } - Dump { uuids, path, ret } => { - let _ = ret.send(self.handle_dump(uuids, path).await); + Dump { indexes, path, ret } => { + let _ = ret.send(self.handle_dump(indexes, path).await); } } }) @@ -278,12 +275,11 @@ impl UpdateLoop { Ok(()) } - async fn handle_dump(&self, uuids: HashSet, path: PathBuf) -> Result<()> { - let index_handle = self.index_resolver.clone(); + async fn handle_dump(&self, indexes: Vec, path: PathBuf) -> Result<()> { let update_store = self.store.clone(); tokio::task::spawn_blocking(move || -> Result<()> { - update_store.dump(&uuids, path.to_path_buf(), index_handle)?; + update_store.dump(&indexes, path.to_path_buf())?; Ok(()) }) .await??; diff --git a/meilisearch-lib/src/index_controller/updates/store/dump.rs b/meilisearch-lib/src/index_controller/updates/store/dump.rs index 996bc3432..68380a9d4 100644 --- a/meilisearch-lib/src/index_controller/updates/store/dump.rs +++ b/meilisearch-lib/src/index_controller/updates/store/dump.rs @@ -1,11 +1,17 @@ -use std::{collections::HashSet, fs::{create_dir_all, File}, io::Write, path::{Path, PathBuf}, sync::Arc}; +use std::collections::HashSet; +use std::path::{Path, PathBuf}; +use std::io::{BufReader, Write}; +use std::fs::{File, create_dir_all}; -use heed::RoTxn; +use heed::{EnvOpenOptions, RoTxn}; +use rayon::prelude::*; use serde::{Deserialize, Serialize}; +use serde_json::Deserializer; +use tempfile::{NamedTempFile, TempDir}; use uuid::Uuid; use super::{Result, State, UpdateStore}; -use crate::index_controller::{index_resolver::HardStateIndexResolver, updates::status::UpdateStatus}; +use crate::{RegisterUpdate, index::Index, index_controller::{update_file_store::UpdateFileStore, updates::status::{Enqueued, UpdateStatus}}}; #[derive(Serialize, Deserialize)] struct UpdateEntry { @@ -16,9 +22,8 @@ struct UpdateEntry { impl UpdateStore { pub fn dump( &self, - uuids: &HashSet, + indexes: &[Index], path: PathBuf, - handle: Arc, ) -> Result<()> { let state_lock = self.state.write(); state_lock.swap(State::Dumping); @@ -26,15 +31,11 @@ impl UpdateStore { // txn must *always* be acquired after state lock, or it will dead lock. let txn = self.env.write_txn()?; - let dump_path = path.join("updates"); - create_dir_all(&dump_path)?; + let uuids = indexes.iter().map(|i| i.uuid).collect(); - self.dump_updates(&txn, uuids, &dump_path)?; + self.dump_updates(&txn, &uuids, &path)?; - let fut = dump_indexes(uuids, handle, &path); - tokio::runtime::Handle::current().block_on(fut)?; - - state_lock.swap(State::Idle); + indexes.par_iter().try_for_each(|index| index.dump(&path)).unwrap(); Ok(()) } @@ -45,58 +46,59 @@ impl UpdateStore { uuids: &HashSet, path: impl AsRef, ) -> Result<()> { - //let dump_data_path = path.as_ref().join("data.jsonl"); - //let mut dump_data_file = File::create(dump_data_path)?; + let mut dump_data_file = NamedTempFile::new()?; - //let update_files_path = path.as_ref().join(super::UPDATE_DIR); - //create_dir_all(&update_files_path)?; + self.dump_pending(txn, uuids, &mut dump_data_file, &path)?; + self.dump_completed(txn, uuids, &mut dump_data_file)?; - //self.dump_pending(txn, uuids, &mut dump_data_file, &path)?; - //self.dump_completed(txn, uuids, &mut dump_data_file)?; + let mut dst_path = path.as_ref().join("updates"); + create_dir_all(&dst_path)?; + dst_path.push("data.jsonl"); + dump_data_file.persist(dst_path).unwrap(); - //Ok(()) - todo!() + Ok(()) } fn dump_pending( &self, - _txn: &RoTxn, - _uuids: &HashSet, - _file: &mut File, - _dst_path: impl AsRef, + txn: &RoTxn, + uuids: &HashSet, + mut file: impl Write, + dst_path: impl AsRef, ) -> Result<()> { - todo!() - //let pendings = self.pending_queue.iter(txn)?.lazily_decode_data(); + let pendings = self.pending_queue.iter(txn)?.lazily_decode_data(); - //for pending in pendings { - //let ((_, uuid, _), data) = pending?; - //if uuids.contains(&uuid) { - //let update = data.decode()?; + for pending in pendings { + let ((_, uuid, _), data) = pending?; + if uuids.contains(&uuid) { + let update = data.decode()?; - //if let Some(ref update_uuid) = update.content { - //let src = super::update_uuid_to_file_path(&self.path, *update_uuid); - //let dst = super::update_uuid_to_file_path(&dst_path, *update_uuid); - //std::fs::copy(src, dst)?; - //} + if let Enqueued { + meta: RegisterUpdate::DocumentAddition { + content_uuid, .. + }, .. + } = update { + self.update_file_store.dump(content_uuid, &dst_path).unwrap(); + } - //let update_json = UpdateEntry { - //uuid, - //update: update.into(), - //}; + let update_json = UpdateEntry { + uuid, + update: update.into(), + }; - //serde_json::to_writer(&mut file, &update_json)?; - //file.write_all(b"\n")?; - //} - //} + serde_json::to_writer(&mut file, &update_json)?; + file.write_all(b"\n")?; + } + } - //Ok(()) + Ok(()) } fn dump_completed( &self, txn: &RoTxn, uuids: &HashSet, - mut file: &mut File, + mut file: impl Write, ) -> Result<()> { let updates = self.updates.iter(txn)?.lazily_decode_data(); @@ -116,65 +118,35 @@ impl UpdateStore { } pub fn load_dump( - _src: impl AsRef, - _dst: impl AsRef, - _db_size: usize, + src: impl AsRef, + dst: impl AsRef, + db_size: usize, ) -> anyhow::Result<()> { - todo!() - //let dst_update_path = dst.as_ref().join("updates/"); - //create_dir_all(&dst_update_path)?; - //let mut options = EnvOpenOptions::new(); - //options.map_size(db_size as usize); - //let (store, _) = UpdateStore::new(options, &dst_update_path)?; + println!("target path: {}", dst.as_ref().display()); - //let src_update_path = src.as_ref().join("updates"); - //let update_data = File::open(&src_update_path.join("data.jsonl"))?; - //let mut update_data = BufReader::new(update_data); + let mut options = EnvOpenOptions::new(); + options.map_size(db_size as usize); - //std::fs::create_dir_all(dst_update_path.join("update_files/"))?; + // create a dummy update fiel store, since it is not needed right now. + let tmp = TempDir::new().unwrap(); + let update_file_store = UpdateFileStore::new(tmp.path()).unwrap(); + let (store, _) = UpdateStore::new(options, &dst, update_file_store)?; - //let mut wtxn = store.env.write_txn()?; - //let mut line = String::new(); - //loop { - //match update_data.read_line(&mut line) { - //Ok(0) => break, - //Ok(_) => { - //let UpdateEntry { uuid, update } = serde_json::from_str(&line)?; - //store.register_raw_updates(&mut wtxn, &update, uuid)?; + let src_update_path = src.as_ref().join("updates"); + let update_data = File::open(&src_update_path.join("data.jsonl"))?; + let update_data = BufReader::new(update_data); - //// Copy ascociated update path if it exists - //if let UpdateStatus::Enqueued(Enqueued { - //content: Some(uuid), - //.. - //}) = update - //{ - //let src = update_uuid_to_file_path(&src_update_path, uuid); - //let dst = update_uuid_to_file_path(&dst_update_path, uuid); - //std::fs::copy(src, dst)?; - //} - //} - //_ => break, - //} + let stream = Deserializer::from_reader(update_data).into_iter::(); + let mut wtxn = store.env.write_txn()?; - //line.clear(); - //} + for entry in stream { + let UpdateEntry { uuid, update } = entry?; + store.register_raw_updates(&mut wtxn, &update, uuid)?; + } - //wtxn.commit()?; + wtxn.commit()?; - //Ok(()) + Ok(()) } } - -async fn dump_indexes( - _uuids: &HashSet, - _handle: Arc, - _path: impl AsRef, -) -> Result<()> { - todo!() - //for uuid in uuids { - //IndexMsg::dump(&handle, *uuid, path.as_ref().to_owned()).await?; - //} - - //Ok(()) -} diff --git a/meilisearch-lib/src/index_controller/updates/store/mod.rs b/meilisearch-lib/src/index_controller/updates/store/mod.rs index b7bf1b457..01e7fd989 100644 --- a/meilisearch-lib/src/index_controller/updates/store/mod.rs +++ b/meilisearch-lib/src/index_controller/updates/store/mod.rs @@ -262,28 +262,28 @@ impl UpdateStore { // /// Push already processed update in the UpdateStore without triggering the notification // /// process. This is useful for the dumps. - //pub fn register_raw_updates( - //&self, - //wtxn: &mut heed::RwTxn, - //update: &UpdateStatus, - //index_uuid: Uuid, - //) -> heed::Result<()> { - //match update { - //UpdateStatus::Enqueued(enqueued) => { - //let (global_id, _update_id) = self.next_update_id(wtxn, index_uuid)?; - //self.pending_queue.remap_key_type::().put( - //wtxn, - //&(global_id, index_uuid, enqueued.id()), - //enqueued, - //)?; - //} - //_ => { - //let _update_id = self.next_update_id_raw(wtxn, index_uuid)?; - //self.updates.put(wtxn, &(index_uuid, update.id()), update)?; - //} - //} - //Ok(()) - //} + pub fn register_raw_updates( + &self, + wtxn: &mut heed::RwTxn, + update: &UpdateStatus, + index_uuid: Uuid, + ) -> heed::Result<()> { + match update { + UpdateStatus::Enqueued(enqueued) => { + let (global_id, _update_id) = self.next_update_id(wtxn, index_uuid)?; + self.pending_queue.remap_key_type::().put( + wtxn, + &(global_id, index_uuid, enqueued.id()), + enqueued, + )?; + } + _ => { + let _update_id = self.next_update_id_raw(wtxn, index_uuid)?; + self.updates.put(wtxn, &(index_uuid, update.id()), update)?; + } + } + Ok(()) + } /// Executes the user provided function on the next pending update (the one with the lowest id). /// This is asynchronous as it let the user process the update with a read-only txn and diff --git a/meilisearch-lib/src/lib.rs b/meilisearch-lib/src/lib.rs index 23538099c..93fd2f094 100644 --- a/meilisearch-lib/src/lib.rs +++ b/meilisearch-lib/src/lib.rs @@ -8,6 +8,7 @@ pub mod index_controller; pub use index_controller::{IndexController as MeiliSearch, updates::RegisterUpdate}; mod compression; +mod document_formats; use walkdir::WalkDir;