MeiliSearch/dump/src/writer.rs
Tamo 7b8641a7af
fix the dump tests
The issue was linked to the fact that the debug implementation of the PhantomData wasn't the same between rust stable and rust nightly.
This was causing an issue while snapshsotting the settings and this commit fix it by representing the settings as json which already ignores the PhantomData
2022-11-23 16:59:20 +01:00

350 lines
11 KiB
Rust

use std::fs::{self, File};
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use flate2::write::GzEncoder;
use flate2::Compression;
use meilisearch_types::keys::Key;
use meilisearch_types::settings::{Checked, Settings};
use serde_json::{Map, Value};
use tempfile::TempDir;
use time::OffsetDateTime;
use uuid::Uuid;
use crate::reader::Document;
use crate::{IndexMetadata, Metadata, Result, TaskDump, CURRENT_DUMP_VERSION};
pub struct DumpWriter {
dir: TempDir,
}
impl DumpWriter {
pub fn new(instance_uuid: Option<Uuid>) -> Result<DumpWriter> {
let dir = TempDir::new()?;
if let Some(instance_uuid) = instance_uuid {
fs::write(
dir.path().join("instance_uid.uuid"),
&instance_uuid.as_hyphenated().to_string(),
)?;
}
let metadata = Metadata {
dump_version: CURRENT_DUMP_VERSION,
db_version: env!("CARGO_PKG_VERSION").to_string(),
dump_date: OffsetDateTime::now_utc(),
};
fs::write(dir.path().join("metadata.json"), serde_json::to_string(&metadata)?)?;
std::fs::create_dir(&dir.path().join("indexes"))?;
Ok(DumpWriter { dir })
}
pub fn create_index(&self, index_name: &str, metadata: &IndexMetadata) -> Result<IndexWriter> {
IndexWriter::new(self.dir.path().join("indexes").join(index_name), metadata)
}
pub fn create_keys(&self) -> Result<KeyWriter> {
KeyWriter::new(self.dir.path().to_path_buf())
}
pub fn create_tasks_queue(&self) -> Result<TaskWriter> {
TaskWriter::new(self.dir.path().join("tasks"))
}
pub fn persist_to(self, mut writer: impl Write) -> Result<()> {
let gz_encoder = GzEncoder::new(&mut writer, Compression::default());
let mut tar_encoder = tar::Builder::new(gz_encoder);
tar_encoder.append_dir_all(".", self.dir.path())?;
let gz_encoder = tar_encoder.into_inner()?;
gz_encoder.finish()?;
writer.flush()?;
Ok(())
}
}
pub struct KeyWriter {
keys: BufWriter<File>,
}
impl KeyWriter {
pub(crate) fn new(path: PathBuf) -> Result<Self> {
let keys = File::create(path.join("keys.jsonl"))?;
Ok(KeyWriter { keys: BufWriter::new(keys) })
}
pub fn push_key(&mut self, key: &Key) -> Result<()> {
self.keys.write_all(&serde_json::to_vec(key)?)?;
self.keys.write_all(b"\n")?;
Ok(())
}
pub fn flush(mut self) -> Result<()> {
self.keys.flush()?;
Ok(())
}
}
pub struct TaskWriter {
queue: BufWriter<File>,
update_files: PathBuf,
}
impl TaskWriter {
pub(crate) fn new(path: PathBuf) -> Result<Self> {
std::fs::create_dir(&path)?;
let queue = File::create(path.join("queue.jsonl"))?;
let update_files = path.join("update_files");
std::fs::create_dir(&update_files)?;
Ok(TaskWriter { queue: BufWriter::new(queue), update_files })
}
/// Pushes tasks in the dump.
/// If the tasks has an associated `update_file` it'll use the `task_id` as its name.
pub fn push_task(&mut self, task: &TaskDump) -> Result<UpdateFile> {
self.queue.write_all(&serde_json::to_vec(task)?)?;
self.queue.write_all(b"\n")?;
Ok(UpdateFile::new(self.update_files.join(format!("{}.jsonl", task.uid))))
}
pub fn flush(mut self) -> Result<()> {
self.queue.flush()?;
Ok(())
}
}
pub struct UpdateFile {
path: PathBuf,
writer: Option<BufWriter<File>>,
}
impl UpdateFile {
pub(crate) fn new(path: PathBuf) -> UpdateFile {
UpdateFile { path, writer: None }
}
pub fn push_document(&mut self, document: &Document) -> Result<()> {
if let Some(writer) = self.writer.as_mut() {
writer.write_all(&serde_json::to_vec(document)?)?;
writer.write_all(b"\n")?;
} else {
let file = File::create(&self.path).unwrap();
self.writer = Some(BufWriter::new(file));
self.push_document(document)?;
}
Ok(())
}
pub fn flush(self) -> Result<()> {
if let Some(mut writer) = self.writer {
writer.flush()?;
}
Ok(())
}
}
pub struct IndexWriter {
documents: BufWriter<File>,
settings: File,
}
impl IndexWriter {
pub(self) fn new(path: PathBuf, metadata: &IndexMetadata) -> Result<Self> {
std::fs::create_dir(&path)?;
let metadata_file = File::create(path.join("metadata.json"))?;
serde_json::to_writer(metadata_file, metadata)?;
let documents = File::create(path.join("documents.jsonl"))?;
let settings = File::create(path.join("settings.json"))?;
Ok(IndexWriter { documents: BufWriter::new(documents), settings })
}
pub fn push_document(&mut self, document: &Map<String, Value>) -> Result<()> {
serde_json::to_writer(&mut self.documents, document)?;
self.documents.write_all(b"\n")?;
Ok(())
}
pub fn flush(&mut self) -> Result<()> {
self.documents.flush()?;
Ok(())
}
pub fn settings(mut self, settings: &Settings<Checked>) -> Result<()> {
self.settings.write_all(&serde_json::to_vec(&settings)?)?;
Ok(())
}
}
#[cfg(test)]
pub(crate) mod test {
use std::fmt::Write;
use std::io::BufReader;
use std::path::Path;
use std::str::FromStr;
use flate2::bufread::GzDecoder;
use meili_snap::insta;
use meilisearch_types::settings::Unchecked;
use super::*;
use crate::reader::Document;
use crate::test::{
create_test_api_keys, create_test_documents, create_test_dump, create_test_instance_uid,
create_test_settings, create_test_tasks,
};
fn create_directory_hierarchy(dir: &Path) -> String {
let mut ret = String::new();
writeln!(ret, ".").unwrap();
ret.push_str(&_create_directory_hierarchy(dir, 0));
ret
}
fn _create_directory_hierarchy(dir: &Path, depth: usize) -> String {
let mut ret = String::new();
// the entries are not guarenteed to be returned in the same order thus we need to sort them.
let mut entries =
fs::read_dir(dir).unwrap().collect::<std::result::Result<Vec<_>, _>>().unwrap();
// I want the directories first and then sort by name.
entries.sort_by(|a, b| {
let (aft, bft) = (a.file_type().unwrap(), b.file_type().unwrap());
if aft.is_dir() && bft.is_dir() {
a.file_name().cmp(&b.file_name())
} else if aft.is_file() && bft.is_dir() {
std::cmp::Ordering::Greater
} else if bft.is_file() && aft.is_dir() {
std::cmp::Ordering::Less
} else {
a.file_name().cmp(&b.file_name())
}
});
for (idx, entry) in entries.iter().enumerate() {
let mut ident = String::new();
for _ in 0..depth {
ident.push('│');
ident.push_str(&" ".repeat(4));
}
if idx == entries.len() - 1 {
ident.push('└');
} else {
ident.push('├');
}
ident.push_str(&"-".repeat(4));
let name = entry.file_name().into_string().unwrap();
let file_type = entry.file_type().unwrap();
let is_dir = if file_type.is_dir() { "/" } else { "" };
assert!(!file_type.is_symlink());
writeln!(ret, "{ident} {name}{is_dir}").unwrap();
if file_type.is_dir() {
ret.push_str(&_create_directory_hierarchy(&entry.path(), depth + 1));
}
}
ret
}
#[test]
fn test_creating_dump() {
let file = create_test_dump();
let mut file = BufReader::new(file);
// ============ ensuring we wrote everything in the correct place.
let dump = tempfile::tempdir().unwrap();
let gz = GzDecoder::new(&mut file);
let mut tar = tar::Archive::new(gz);
tar.unpack(dump.path()).unwrap();
let dump_path = dump.path();
// ==== checking global file hierarchy (we want to be sure there isn't too many files or too few)
insta::assert_display_snapshot!(create_directory_hierarchy(dump_path), @r###"
.
├---- indexes/
│ └---- doggos/
│ │ ├---- documents.jsonl
│ │ ├---- metadata.json
│ │ └---- settings.json
├---- tasks/
│ ├---- update_files/
│ │ └---- 1.jsonl
│ └---- queue.jsonl
├---- instance_uid.uuid
├---- keys.jsonl
└---- metadata.json
"###);
// ==== checking the top level infos
let metadata = fs::read_to_string(dump_path.join("metadata.json")).unwrap();
let metadata: Metadata = serde_json::from_str(&metadata).unwrap();
insta::assert_json_snapshot!(metadata, { ".dumpDate" => "[date]" }, @r###"
{
"dumpVersion": "V6",
"dbVersion": "0.30.0",
"dumpDate": "[date]"
}
"###);
let instance_uid = fs::read_to_string(dump_path.join("instance_uid.uuid")).unwrap();
assert_eq!(Uuid::from_str(&instance_uid).unwrap(), create_test_instance_uid());
// ==== checking the index
let docs = fs::read_to_string(dump_path.join("indexes/doggos/documents.jsonl")).unwrap();
for (document, expected) in docs.lines().zip(create_test_documents()) {
assert_eq!(serde_json::from_str::<Map<String, Value>>(document).unwrap(), expected);
}
let test_settings =
fs::read_to_string(dump_path.join("indexes/doggos/settings.json")).unwrap();
assert_eq!(
serde_json::from_str::<Settings<Unchecked>>(&test_settings).unwrap(),
create_test_settings().into_unchecked()
);
let metadata = fs::read_to_string(dump_path.join("indexes/doggos/metadata.json")).unwrap();
let metadata: IndexMetadata = serde_json::from_str(&metadata).unwrap();
insta::assert_json_snapshot!(metadata, { ".createdAt" => "[date]", ".updatedAt" => "[date]" }, @r###"
{
"uid": "doggo",
"primaryKey": null,
"createdAt": "[date]",
"updatedAt": "[date]"
}
"###);
// ==== checking the task queue
let tasks_queue = fs::read_to_string(dump_path.join("tasks/queue.jsonl")).unwrap();
for (task, expected) in tasks_queue.lines().zip(create_test_tasks()) {
assert_eq!(serde_json::from_str::<TaskDump>(task).unwrap(), expected.0);
if let Some(expected_update) = expected.1 {
let path = dump_path.join(format!("tasks/update_files/{}.jsonl", expected.0.uid));
println!("trying to open {}", path.display());
let update = fs::read_to_string(path).unwrap();
let documents: Vec<Document> =
update.lines().map(|line| serde_json::from_str(line).unwrap()).collect();
assert_eq!(documents, expected_update);
}
}
// ==== checking the keys
let keys = fs::read_to_string(dump_path.join("keys.jsonl")).unwrap();
for (key, expected) in keys.lines().zip(create_test_api_keys()) {
assert_eq!(serde_json::from_str::<Key>(key).unwrap(), expected);
}
}
}