implement the dump reader v6

This commit is contained in:
Tamo 2022-10-03 16:12:01 +02:00 committed by Clément Renault
parent 699ae1b190
commit 7bd6f63001
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
8 changed files with 247 additions and 69 deletions

4
Cargo.lock generated
View File

@ -1144,9 +1144,13 @@ checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1"
name = "dump" name = "dump"
version = "0.29.0" version = "0.29.0"
dependencies = [ dependencies = [
"anyhow",
"flate2", "flate2",
"index", "index",
"index-scheduler",
"insta", "insta",
"log",
"meilisearch-auth",
"serde", "serde",
"serde_json", "serde_json",
"tar", "tar",

View File

@ -15,6 +15,10 @@ flate2 = "1.0.22"
thiserror = "1.0.30" thiserror = "1.0.30"
time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] } time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] }
tar = "0.4.38" tar = "0.4.38"
anyhow = "1.0.65"
log = "0.4.17"
index-scheduler = { path = "../index-scheduler" }
meilisearch-auth = { path = "../meilisearch-auth" }
[dev-dependencies] [dev-dependencies]
insta = { version = "1.19.1", features = ["json", "redactions"] } insta = { version = "1.19.1", features = ["json", "redactions"] }

View File

@ -2,6 +2,11 @@ use thiserror::Error;
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum Error { pub enum Error {
#[error("The version 1 of the dumps is not supported anymore. You can re-export your dump from a version between 0.21 and 0.24, or start fresh from a version 0.25 onwards.")]
DumpV1Unsupported,
#[error("Bad index name")]
BadIndexName,
#[error(transparent)] #[error(transparent)]
Io(#[from] std::io::Error), Io(#[from] std::io::Error),
#[error(transparent)] #[error(transparent)]

View File

@ -1,23 +1,32 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use time::OffsetDateTime; use time::OffsetDateTime;
// mod dump;
mod error; mod error;
mod reader;
mod writer; mod writer;
pub use error::Error; pub use error::Error;
pub use writer::DumpWriter; pub use writer::DumpWriter;
const CURRENT_DUMP_VERSION: &str = "V6"; const CURRENT_DUMP_VERSION: Version = Version::V6;
pub struct DumpReader;
type Result<T> = std::result::Result<T, Error>; type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
struct Metadata { struct Metadata {
pub dump_version: String, pub dump_version: Version,
pub db_version: String, pub db_version: String,
#[serde(with = "time::serde::rfc3339")]
pub dump_date: OffsetDateTime, pub dump_date: OffsetDateTime,
} }
#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)]
pub enum Version {
V1,
V2,
V3,
V4,
V5,
V6,
}

View File

@ -17,18 +17,20 @@ use crate::{Result, Version};
// pub mod error; // pub mod error;
// mod compat; // mod compat;
// mod loaders; // mod loaders;
mod v1; // mod v1;
// mod v6; mod v6;
pub fn open( pub fn open(
dump_path: &Path, dump_path: &Path,
) -> Result< ) -> Result<
impl DumpReader< Box<
Document = serde_json::Value, dyn DumpReader<
Settings = Settings<Unchecked>, Document = serde_json::Map<String, serde_json::Value>,
Task = TaskView, Settings = Settings<Unchecked>,
UpdateFile = (), Task = TaskView,
Key = Key, UpdateFile = File,
Key = Key,
>,
>, >,
> { > {
let path = TempDir::new()?; let path = TempDir::new()?;
@ -54,10 +56,21 @@ pub fn open(
Version::V3 => todo!(), Version::V3 => todo!(),
Version::V4 => todo!(), Version::V4 => todo!(),
Version::V5 => todo!(), Version::V5 => todo!(),
Version::V6 => todo!(), Version::V6 => {
}; let dump_reader = Box::new(v6::V6Reader::open(path)?)
as Box<
dyn DumpReader<
Document = serde_json::Map<String, serde_json::Value>,
Settings = Settings<Unchecked>,
Task = TaskView,
UpdateFile = File,
Key = Key,
>,
>;
todo!() Ok(dump_reader)
}
}
} }
pub trait DumpReader { pub trait DumpReader {
@ -73,7 +86,7 @@ pub trait DumpReader {
fn version(&self) -> Version; fn version(&self) -> Version;
/// Return at which date the index was created. /// Return at which date the index was created.
fn date(&self) -> Result<Option<OffsetDateTime>>; fn date(&self) -> Option<OffsetDateTime>;
/// Return an iterator over each indexes. /// Return an iterator over each indexes.
fn indexes( fn indexes(
@ -81,18 +94,20 @@ pub trait DumpReader {
) -> Result< ) -> Result<
Box< Box<
dyn Iterator< dyn Iterator<
Item = Box<dyn IndexReader<Document = Self::Document, Settings = Self::Settings>>, Item = Result<
Box<dyn IndexReader<Document = Self::Document, Settings = Self::Settings>>,
>,
>, >,
>, >,
>; >;
/// Return all the tasks in the dump with a possible update file. /// Return all the tasks in the dump with a possible update file.
fn tasks( fn tasks(
&self, &mut self,
) -> Result<Box<dyn Iterator<Item = Result<(Self::Task, Option<Self::UpdateFile>)>>>>; ) -> Box<dyn Iterator<Item = Result<(Self::Task, Option<Self::UpdateFile>)>> + '_>;
/// Return all the keys. /// Return all the keys.
fn keys(&self) -> Result<Box<dyn Iterator<Item = Self::Key>>>; fn keys(&mut self) -> Box<dyn Iterator<Item = Result<Self::Key>> + '_>;
} }
pub trait IndexReader { pub trait IndexReader {
@ -100,6 +115,6 @@ pub trait IndexReader {
type Settings; type Settings;
fn name(&self) -> &str; fn name(&self) -> &str;
fn documents(&self) -> Result<Box<dyn Iterator<Item = Self::Document>>>; fn documents(&mut self) -> Result<Box<dyn Iterator<Item = Result<Self::Document>> + '_>>;
fn settings(&self) -> Result<Self::Settings>; fn settings(&mut self) -> Result<Self::Settings>;
} }

View File

@ -5,7 +5,6 @@ use std::{
path::Path, path::Path,
}; };
use serde::Deserialize;
use tempfile::TempDir; use tempfile::TempDir;
use time::OffsetDateTime; use time::OffsetDateTime;
@ -26,9 +25,9 @@ pub struct V1Reader {
struct V1IndexReader { struct V1IndexReader {
name: String, name: String,
documents: File, documents: BufReader<File>,
settings: File, settings: BufReader<File>,
updates: File, updates: BufReader<File>,
current_update: Option<UpdateStatus>, current_update: Option<UpdateStatus>,
} }
@ -37,9 +36,9 @@ impl V1IndexReader {
pub fn new(name: String, path: &Path) -> Result<Self> { pub fn new(name: String, path: &Path) -> Result<Self> {
let mut ret = V1IndexReader { let mut ret = V1IndexReader {
name, name,
documents: File::open(path.join("documents.jsonl"))?, documents: BufReader::new(File::open(path.join("documents.jsonl"))?),
settings: File::open(path.join("settings.json"))?, settings: BufReader::new(File::open(path.join("settings.json"))?),
updates: File::open(path.join("updates.jsonl"))?, updates: BufReader::new(File::open(path.join("updates.jsonl"))?),
current_update: None, current_update: None,
}; };
ret.next_update(); ret.next_update();
@ -48,10 +47,7 @@ impl V1IndexReader {
} }
pub fn next_update(&mut self) -> Result<Option<UpdateStatus>> { pub fn next_update(&mut self) -> Result<Option<UpdateStatus>> {
let mut tasks = self.updates; let current_update = if let Some(line) = self.updates.lines().next() {
let mut reader = BufReader::new(&mut tasks);
let current_update = if let Some(line) = reader.lines().next() {
Some(serde_json::from_str(&line?)?) Some(serde_json::from_str(&line?)?)
} else { } else {
None None
@ -90,10 +86,6 @@ impl V1Reader {
}) })
} }
pub fn date(&self) -> Result<Option<OffsetDateTime>> {
Ok(None)
}
fn next_update(&mut self) -> Result<Option<UpdateStatus>> { fn next_update(&mut self) -> Result<Option<UpdateStatus>> {
if let Some((idx, _)) = self if let Some((idx, _)) = self
.indexes .indexes
@ -111,14 +103,14 @@ impl V1Reader {
} }
impl IndexReader for &V1IndexReader { impl IndexReader for &V1IndexReader {
type Document = serde_json::Value; type Document = serde_json::Map<String, serde_json::Value>;
type Settings = settings::Settings; type Settings = settings::Settings;
fn name(&self) -> &str { fn name(&self) -> &str {
todo!() todo!()
} }
fn documents(&self) -> Result<Box<dyn Iterator<Item = Self::Document>>> { fn documents(&self) -> Result<Box<dyn Iterator<Item = Result<Self::Document>>>> {
todo!() todo!()
} }
@ -128,16 +120,16 @@ impl IndexReader for &V1IndexReader {
} }
impl DumpReader for V1Reader { impl DumpReader for V1Reader {
type Document = serde_json::Value; type Document = serde_json::Map<String, serde_json::Value>;
type Settings = settings::Settings; type Settings = settings::Settings;
type Task = update::UpdateStatus; type Task = update::UpdateStatus;
type UpdateFile = (); type UpdateFile = Infallible;
type Key = Infallible; type Key = Infallible;
fn date(&self) -> Result<Option<OffsetDateTime>> { fn date(&self) -> Option<OffsetDateTime> {
Ok(None) None
} }
fn version(&self) -> Version { fn version(&self) -> Version {
@ -149,29 +141,33 @@ impl DumpReader for V1Reader {
) -> Result< ) -> Result<
Box< Box<
dyn Iterator< dyn Iterator<
Item = Box< Item = Result<
dyn super::IndexReader<Document = Self::Document, Settings = Self::Settings>, Box<
dyn super::IndexReader<
Document = Self::Document,
Settings = Self::Settings,
>,
>,
>, >,
>, >,
>, >,
> { > {
Ok(Box::new(self.indexes.iter().map(|index| { Ok(Box::new(self.indexes.iter().map(|index| {
Box::new(index) let index = Box::new(index)
as Box<dyn IndexReader<Document = Self::Document, Settings = Self::Settings>> as Box<dyn IndexReader<Document = Self::Document, Settings = Self::Settings>>;
Ok(index)
}))) })))
} }
fn tasks( fn tasks(&self) -> Box<dyn Iterator<Item = Result<(Self::Task, Option<Self::UpdateFile>)>>> {
&self, Box::new(std::iter::from_fn(|| {
) -> Result<Box<dyn Iterator<Item = Result<(Self::Task, Option<Self::UpdateFile>)>>>> {
Ok(Box::new(std::iter::from_fn(|| {
self.next_update() self.next_update()
.transpose() .transpose()
.map(|result| result.map(|task| (task, None))) .map(|result| result.map(|task| (task, None)))
}))) }))
} }
fn keys(&self) -> Result<Box<dyn Iterator<Item = Self::Key>>> { fn keys(&self) -> Box<dyn Iterator<Item = Result<Self::Key>>> {
Ok(Box::new(std::iter::empty())) Box::new(std::iter::empty())
} }
} }

View File

@ -1,16 +1,170 @@
use std::{ use std::{
fs::{self}, fs::{self, File},
io::{BufRead, BufReader},
path::Path, path::Path,
}; };
use index::Unchecked;
use tempfile::TempDir;
use time::OffsetDateTime; use time::OffsetDateTime;
use crate::Result; use crate::{Error, Result, Version};
use super::{DumpReader, IndexReader};
type Metadata = crate::Metadata; type Metadata = crate::Metadata;
pub fn date(dump: &Path) -> Result<OffsetDateTime> { pub fn date(dump: &Path) -> Result<OffsetDateTime> {
let metadata = fs::read(dump.join("metadata.json"))?; let metadata = fs::read(dump.join("metadata.json"))?;
let metadata: Metadata = serde_json::from_reader(metadata)?; let metadata: Metadata = serde_json::from_reader(&*metadata)?;
Ok(metadata.dump_date) Ok(metadata.dump_date)
} }
pub struct V6Reader {
dump: TempDir,
metadata: Metadata,
tasks: BufReader<File>,
keys: BufReader<File>,
}
struct V6IndexReader {
name: String,
documents: BufReader<File>,
settings: BufReader<File>,
}
impl V6IndexReader {
pub fn new(name: String, path: &Path) -> Result<Self> {
let ret = V6IndexReader {
name,
documents: BufReader::new(File::open(path.join("documents.jsonl"))?),
settings: BufReader::new(File::open(path.join("settings.json"))?),
};
Ok(ret)
}
}
impl V6Reader {
pub fn open(dump: TempDir) -> Result<Self> {
let meta_file = fs::read(dump.path().join("metadata.json"))?;
let metadata = serde_json::from_reader(&*meta_file)?;
Ok(V6Reader {
metadata,
tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?),
keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?),
dump,
})
}
}
impl DumpReader for V6Reader {
type Document = serde_json::Map<String, serde_json::Value>;
type Settings = index::Settings<Unchecked>;
type Task = index_scheduler::TaskView;
type UpdateFile = File;
type Key = meilisearch_auth::Key;
fn version(&self) -> Version {
Version::V6
}
fn date(&self) -> Option<OffsetDateTime> {
Some(self.metadata.dump_date)
}
fn indexes(
&self,
) -> Result<
Box<
dyn Iterator<
Item = Result<
Box<
dyn super::IndexReader<
Document = Self::Document,
Settings = Self::Settings,
>,
>,
>,
>,
>,
> {
let entries = fs::read_dir(self.dump.path().join("indexes"))?;
Ok(Box::new(
entries
.map(|entry| -> Result<Option<_>> {
let entry = entry?;
if entry.file_type()?.is_dir() {
let index = Box::new(V6IndexReader::new(
entry
.file_name()
.to_str()
.ok_or(Error::BadIndexName)?
.to_string(),
&entry.path(),
)?)
as Box<
dyn IndexReader<
Document = Self::Document,
Settings = Self::Settings,
>,
>;
Ok(Some(index))
} else {
Ok(None)
}
})
.filter_map(|entry| entry.transpose()),
))
}
fn tasks(
&mut self,
) -> Box<dyn Iterator<Item = Result<(Self::Task, Option<Self::UpdateFile>)>> + '_> {
Box::new((&mut self.tasks).lines().map(|line| -> Result<_> {
let task: index_scheduler::TaskView = serde_json::from_str(&line?)?;
let update_file_path = self
.dump
.path()
.join("tasks")
.join("update_files")
.join(task.uid.to_string());
if update_file_path.exists() {
Ok((task, Some(File::open(update_file_path)?)))
} else {
Ok((task, None))
}
}))
}
fn keys(&mut self) -> Box<dyn Iterator<Item = Result<Self::Key>> + '_> {
Box::new(
(&mut self.keys)
.lines()
.map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }),
)
}
}
impl IndexReader for V6IndexReader {
type Document = serde_json::Map<String, serde_json::Value>;
type Settings = index::Settings<Unchecked>;
fn name(&self) -> &str {
&self.name
}
fn documents(&mut self) -> Result<Box<dyn Iterator<Item = Result<Self::Document>> + '_>> {
Ok(Box::new((&mut self.documents).lines().map(
|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) },
)))
}
fn settings(&mut self) -> Result<Self::Settings> {
Ok(serde_json::from_reader(&mut self.settings)?)
}
}

View File

@ -5,15 +5,13 @@ use std::{
}; };
use flate2::{write::GzEncoder, Compression}; use flate2::{write::GzEncoder, Compression};
use serde::{Deserialize, Serialize}; use serde::Serialize;
use tempfile::TempDir; use tempfile::TempDir;
use thiserror::Error;
use time::OffsetDateTime; use time::OffsetDateTime;
use uuid::Uuid; use uuid::Uuid;
use crate::{Metadata, Result, CURRENT_DUMP_VERSION}; use crate::{Metadata, Result, CURRENT_DUMP_VERSION};
#[must_use]
pub struct DumpWriter { pub struct DumpWriter {
dir: TempDir, dir: TempDir,
} }
@ -27,7 +25,7 @@ impl DumpWriter {
)?; )?;
let metadata = Metadata { let metadata = Metadata {
dump_version: CURRENT_DUMP_VERSION.to_string(), dump_version: CURRENT_DUMP_VERSION,
db_version: env!("CARGO_PKG_VERSION").to_string(), db_version: env!("CARGO_PKG_VERSION").to_string(),
dump_date: OffsetDateTime::now_utc(), dump_date: OffsetDateTime::now_utc(),
}; };
@ -45,17 +43,14 @@ impl DumpWriter {
IndexWriter::new(self.dir.path().join("indexes").join(index_name)) IndexWriter::new(self.dir.path().join("indexes").join(index_name))
} }
#[must_use]
pub fn create_keys(&self) -> Result<KeyWriter> { pub fn create_keys(&self) -> Result<KeyWriter> {
KeyWriter::new(self.dir.path().to_path_buf()) KeyWriter::new(self.dir.path().to_path_buf())
} }
#[must_use]
pub fn create_tasks_queue(&self) -> Result<TaskWriter> { pub fn create_tasks_queue(&self) -> Result<TaskWriter> {
TaskWriter::new(self.dir.path().join("tasks")) TaskWriter::new(self.dir.path().join("tasks"))
} }
#[must_use]
pub fn persist_to(self, mut writer: impl Write) -> Result<()> { pub fn persist_to(self, mut writer: impl Write) -> Result<()> {
let gz_encoder = GzEncoder::new(&mut writer, Compression::default()); let gz_encoder = GzEncoder::new(&mut writer, Compression::default());
let mut tar_encoder = tar::Builder::new(gz_encoder); let mut tar_encoder = tar::Builder::new(gz_encoder);
@ -68,7 +63,6 @@ impl DumpWriter {
} }
} }
#[must_use]
pub struct KeyWriter { pub struct KeyWriter {
file: File, file: File,
} }
@ -86,7 +80,6 @@ impl KeyWriter {
} }
} }
#[must_use]
pub struct TaskWriter { pub struct TaskWriter {
queue: File, queue: File,
update_files: PathBuf, update_files: PathBuf,
@ -124,7 +117,6 @@ impl TaskWriter {
} }
} }
#[must_use]
pub struct IndexWriter { pub struct IndexWriter {
documents: File, documents: File,
settings: File, settings: File,
@ -149,7 +141,6 @@ impl IndexWriter {
Ok(()) Ok(())
} }
#[must_use]
pub fn settings(mut self, settings: impl Serialize) -> Result<()> { pub fn settings(mut self, settings: impl Serialize) -> Result<()> {
self.settings.write_all(&serde_json::to_vec(&settings)?)?; self.settings.write_all(&serde_json::to_vec(&settings)?)?;
Ok(()) Ok(())