MeiliSearch/meilisearch-lib/src/index/dump.rs

189 lines
6.1 KiB
Rust
Raw Normal View History

2021-09-28 11:59:55 +02:00
use std::fs::{create_dir_all, File};
use std::io::{BufReader, Seek, SeekFrom, Write};
2021-05-31 16:40:59 +02:00
use std::path::Path;
2021-09-28 11:59:55 +02:00
use anyhow::Context;
use heed::{EnvOpenOptions, RoTxn};
2021-05-26 22:52:06 +02:00
use indexmap::IndexMap;
2021-09-28 11:59:55 +02:00
use milli::documents::DocumentBatchReader;
2021-05-26 22:52:06 +02:00
use serde::{Deserialize, Serialize};
2021-09-28 11:59:55 +02:00
use serde_json::Value;
2021-05-26 22:52:06 +02:00
2021-09-29 10:17:52 +02:00
use crate::document_formats::read_ndjson;
2021-09-28 11:59:55 +02:00
use crate::index::update_handler::UpdateHandler;
use crate::index::updates::apply_settings_to_builder;
use crate::index_controller::{asc_ranking_rule, desc_ranking_rule};
2021-05-26 22:52:06 +02:00
2021-06-17 14:36:32 +02:00
use super::error::Result;
2021-09-14 18:39:02 +02:00
use super::{Index, Settings, Unchecked};
2021-05-26 22:52:06 +02:00
#[derive(Serialize, Deserialize)]
struct DumpMeta {
2021-05-27 14:30:20 +02:00
settings: Settings<Unchecked>,
2021-05-26 22:52:06 +02:00
primary_key: Option<String>,
}
2021-05-31 16:03:39 +02:00
const META_FILE_NAME: &str = "meta.json";
const DATA_FILE_NAME: &str = "documents.jsonl";
2021-05-26 22:52:06 +02:00
impl Index {
pub fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
// acquire write txn make sure any ongoing write is finished before we start.
2021-05-26 22:52:06 +02:00
let txn = self.env.write_txn()?;
2021-09-28 11:59:55 +02:00
let path = path
.as_ref()
.join(format!("indexes/{}", self.uuid.to_string()));
create_dir_all(&path)?;
2021-05-26 22:52:06 +02:00
self.dump_documents(&txn, &path)?;
self.dump_meta(&txn, &path)?;
Ok(())
}
fn dump_documents(&self, txn: &RoTxn, path: impl AsRef<Path>) -> Result<()> {
2021-05-26 22:52:06 +02:00
let document_file_path = path.as_ref().join(DATA_FILE_NAME);
let mut document_file = File::create(&document_file_path)?;
2021-06-17 14:36:32 +02:00
let documents = self.all_documents(txn)?;
2021-05-26 22:52:06 +02:00
let fields_ids_map = self.fields_ids_map(txn)?;
// dump documents
let mut json_map = IndexMap::new();
for document in documents {
let (_, reader) = document?;
for (fid, bytes) in reader.iter() {
if let Some(name) = fields_ids_map.name(fid) {
json_map.insert(name, serde_json::from_slice::<serde_json::Value>(bytes)?);
}
}
serde_json::to_writer(&mut document_file, &json_map)?;
2021-05-31 16:40:59 +02:00
document_file.write_all(b"\n")?;
2021-05-26 22:52:06 +02:00
json_map.clear();
}
Ok(())
}
fn dump_meta(&self, txn: &RoTxn, path: impl AsRef<Path>) -> Result<()> {
2021-05-26 22:52:06 +02:00
let meta_file_path = path.as_ref().join(META_FILE_NAME);
let mut meta_file = File::create(&meta_file_path)?;
2021-05-27 14:30:20 +02:00
let settings = self.settings_txn(txn)?.into_unchecked();
2021-05-26 22:52:06 +02:00
let primary_key = self.primary_key(txn)?.map(String::from);
2021-05-31 10:58:51 +02:00
let meta = DumpMeta {
settings,
primary_key,
};
2021-05-26 22:52:06 +02:00
serde_json::to_writer(&mut meta_file, &meta)?;
Ok(())
}
pub fn load_dump(
2021-09-28 11:59:55 +02:00
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
size: usize,
update_handler: &UpdateHandler,
2021-06-15 17:39:07 +02:00
) -> anyhow::Result<()> {
2021-09-28 11:59:55 +02:00
let dir_name = src
.as_ref()
.file_name()
.with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?;
let dst_dir_path = dst.as_ref().join("indexes").join(dir_name);
create_dir_all(&dst_dir_path)?;
let meta_path = src.as_ref().join(META_FILE_NAME);
let mut meta_file = File::open(meta_path)?;
// We first deserialize the dump meta into a serde_json::Value and change
// the custom ranking rules settings from the old format to the new format.
let mut meta: Value = serde_json::from_reader(&mut meta_file)?;
if let Some(ranking_rules) = meta.pointer_mut("/settings/rankingRules") {
convert_custom_ranking_rules(ranking_rules);
}
// Then we serialize it back into a vec to deserialize it
// into a `DumpMeta` struct with the newly patched `rankingRules` format.
let patched_meta = serde_json::to_vec(&meta)?;
let DumpMeta {
settings,
primary_key,
} = serde_json::from_slice(&patched_meta)?;
let settings = settings.check();
let mut options = EnvOpenOptions::new();
options.map_size(size);
let index = milli::Index::new(options, &dst_dir_path)?;
let mut txn = index.write_txn()?;
// Apply settings first
let builder = update_handler.update_builder(0);
let mut builder = builder.settings(&mut txn, &index);
if let Some(primary_key) = primary_key {
builder.set_primary_key(primary_key);
}
apply_settings_to_builder(&settings, &mut builder);
builder.execute(|_, _| ())?;
let document_file_path = src.as_ref().join(DATA_FILE_NAME);
let reader = BufReader::new(File::open(&document_file_path)?);
let mut tmp_doc_file = tempfile::tempfile()?;
2021-09-29 10:17:52 +02:00
read_ndjson(reader, &mut tmp_doc_file)?;
2021-09-28 11:59:55 +02:00
tmp_doc_file.seek(SeekFrom::Start(0))?;
let documents_reader = DocumentBatchReader::from_reader(tmp_doc_file)?;
//If the document file is empty, we don't perform the document addition, to prevent
//a primary key error to be thrown.
if !documents_reader.is_empty() {
2021-09-28 22:22:59 +02:00
let builder = update_handler
.update_builder(0)
.index_documents(&mut txn, &index);
2021-09-28 11:59:55 +02:00
builder.execute(documents_reader, |_, _| ())?;
}
txn.commit()?;
index.prepare_for_closing().wait();
Ok(())
2021-05-26 22:52:06 +02:00
}
}
2021-09-28 11:59:55 +02:00
/// Converts the ranking rules from the format `asc(_)`, `desc(_)` to the format `_:asc`, `_:desc`.
///
/// This is done for compatibility reasons, and to avoid a new dump version,
/// since the new syntax was introduced soon after the new dump version.
fn convert_custom_ranking_rules(ranking_rules: &mut Value) {
*ranking_rules = match ranking_rules.take() {
Value::Array(values) => values
.into_iter()
.filter_map(|value| match value {
Value::String(s) if s.starts_with("asc") => asc_ranking_rule(&s)
.map(|f| format!("{}:asc", f))
.map(Value::String),
Value::String(s) if s.starts_with("desc") => desc_ranking_rule(&s)
.map(|f| format!("{}:desc", f))
.map(Value::String),
otherwise => Some(otherwise),
})
.collect(),
otherwise => otherwise,
}
}