diff --git a/Cargo.lock b/Cargo.lock index f28ffa237..f2e7bb60d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1144,9 +1144,13 @@ checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" name = "dump" version = "0.29.0" dependencies = [ + "anyhow", "flate2", "index", + "index-scheduler", "insta", + "log", + "meilisearch-auth", "serde", "serde_json", "tar", diff --git a/dump/Cargo.toml b/dump/Cargo.toml index 79bf64518..0f418c55d 100644 --- a/dump/Cargo.toml +++ b/dump/Cargo.toml @@ -15,6 +15,10 @@ flate2 = "1.0.22" thiserror = "1.0.30" time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] } tar = "0.4.38" +anyhow = "1.0.65" +log = "0.4.17" +index-scheduler = { path = "../index-scheduler" } +meilisearch-auth = { path = "../meilisearch-auth" } [dev-dependencies] insta = { version = "1.19.1", features = ["json", "redactions"] } diff --git a/dump/src/error.rs b/dump/src/error.rs index d2c4cbfbb..78912e1a7 100644 --- a/dump/src/error.rs +++ b/dump/src/error.rs @@ -2,6 +2,11 @@ use thiserror::Error; #[derive(Debug, Error)] pub enum Error { + #[error("The version 1 of the dumps is not supported anymore. You can re-export your dump from a version between 0.21 and 0.24, or start fresh from a version 0.25 onwards.")] + DumpV1Unsupported, + #[error("Bad index name")] + BadIndexName, + #[error(transparent)] Io(#[from] std::io::Error), #[error(transparent)] diff --git a/dump/src/lib.rs b/dump/src/lib.rs index 0cbe26605..8b25b6443 100644 --- a/dump/src/lib.rs +++ b/dump/src/lib.rs @@ -1,23 +1,32 @@ use serde::{Deserialize, Serialize}; use time::OffsetDateTime; -// mod dump; mod error; +mod reader; mod writer; pub use error::Error; pub use writer::DumpWriter; -const CURRENT_DUMP_VERSION: &str = "V6"; - -pub struct DumpReader; +const CURRENT_DUMP_VERSION: Version = Version::V6; type Result = std::result::Result; #[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] struct Metadata { - pub dump_version: String, + pub dump_version: Version, pub db_version: String, + #[serde(with = "time::serde::rfc3339")] pub dump_date: OffsetDateTime, } + +#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)] +pub enum Version { + V1, + V2, + V3, + V4, + V5, + V6, +} diff --git a/dump/src/reader/mod.rs b/dump/src/reader/mod.rs index f2f5019ac..fe4096a8b 100644 --- a/dump/src/reader/mod.rs +++ b/dump/src/reader/mod.rs @@ -17,18 +17,20 @@ use crate::{Result, Version}; // pub mod error; // mod compat; // mod loaders; -mod v1; -// mod v6; +// mod v1; +mod v6; pub fn open( dump_path: &Path, ) -> Result< - impl DumpReader< - Document = serde_json::Value, - Settings = Settings, - Task = TaskView, - UpdateFile = (), - Key = Key, + Box< + dyn DumpReader< + Document = serde_json::Map, + Settings = Settings, + Task = TaskView, + UpdateFile = File, + Key = Key, + >, >, > { let path = TempDir::new()?; @@ -54,10 +56,21 @@ pub fn open( Version::V3 => todo!(), Version::V4 => todo!(), Version::V5 => todo!(), - Version::V6 => todo!(), - }; + Version::V6 => { + let dump_reader = Box::new(v6::V6Reader::open(path)?) + as Box< + dyn DumpReader< + Document = serde_json::Map, + Settings = Settings, + Task = TaskView, + UpdateFile = File, + Key = Key, + >, + >; - todo!() + Ok(dump_reader) + } + } } pub trait DumpReader { @@ -73,7 +86,7 @@ pub trait DumpReader { fn version(&self) -> Version; /// Return at which date the index was created. - fn date(&self) -> Result>; + fn date(&self) -> Option; /// Return an iterator over each indexes. fn indexes( @@ -81,18 +94,20 @@ pub trait DumpReader { ) -> Result< Box< dyn Iterator< - Item = Box>, + Item = Result< + Box>, + >, >, >, >; /// Return all the tasks in the dump with a possible update file. fn tasks( - &self, - ) -> Result)>>>>; + &mut self, + ) -> Box)>> + '_>; /// Return all the keys. - fn keys(&self) -> Result>>; + fn keys(&mut self) -> Box> + '_>; } pub trait IndexReader { @@ -100,6 +115,6 @@ pub trait IndexReader { type Settings; fn name(&self) -> &str; - fn documents(&self) -> Result>>; - fn settings(&self) -> Result; + fn documents(&mut self) -> Result> + '_>>; + fn settings(&mut self) -> Result; } diff --git a/dump/src/reader/v1/mod.rs b/dump/src/reader/v1/mod.rs index 9f4a9cdd7..f638262cc 100644 --- a/dump/src/reader/v1/mod.rs +++ b/dump/src/reader/v1/mod.rs @@ -5,7 +5,6 @@ use std::{ path::Path, }; -use serde::Deserialize; use tempfile::TempDir; use time::OffsetDateTime; @@ -26,9 +25,9 @@ pub struct V1Reader { struct V1IndexReader { name: String, - documents: File, - settings: File, - updates: File, + documents: BufReader, + settings: BufReader, + updates: BufReader, current_update: Option, } @@ -37,9 +36,9 @@ impl V1IndexReader { pub fn new(name: String, path: &Path) -> Result { let mut ret = V1IndexReader { name, - documents: File::open(path.join("documents.jsonl"))?, - settings: File::open(path.join("settings.json"))?, - updates: File::open(path.join("updates.jsonl"))?, + documents: BufReader::new(File::open(path.join("documents.jsonl"))?), + settings: BufReader::new(File::open(path.join("settings.json"))?), + updates: BufReader::new(File::open(path.join("updates.jsonl"))?), current_update: None, }; ret.next_update(); @@ -48,10 +47,7 @@ impl V1IndexReader { } pub fn next_update(&mut self) -> Result> { - let mut tasks = self.updates; - let mut reader = BufReader::new(&mut tasks); - - let current_update = if let Some(line) = reader.lines().next() { + let current_update = if let Some(line) = self.updates.lines().next() { Some(serde_json::from_str(&line?)?) } else { None @@ -90,10 +86,6 @@ impl V1Reader { }) } - pub fn date(&self) -> Result> { - Ok(None) - } - fn next_update(&mut self) -> Result> { if let Some((idx, _)) = self .indexes @@ -111,14 +103,14 @@ impl V1Reader { } impl IndexReader for &V1IndexReader { - type Document = serde_json::Value; + type Document = serde_json::Map; type Settings = settings::Settings; fn name(&self) -> &str { todo!() } - fn documents(&self) -> Result>> { + fn documents(&self) -> Result>>> { todo!() } @@ -128,16 +120,16 @@ impl IndexReader for &V1IndexReader { } impl DumpReader for V1Reader { - type Document = serde_json::Value; + type Document = serde_json::Map; type Settings = settings::Settings; type Task = update::UpdateStatus; - type UpdateFile = (); + type UpdateFile = Infallible; type Key = Infallible; - fn date(&self) -> Result> { - Ok(None) + fn date(&self) -> Option { + None } fn version(&self) -> Version { @@ -149,29 +141,33 @@ impl DumpReader for V1Reader { ) -> Result< Box< dyn Iterator< - Item = Box< - dyn super::IndexReader, + Item = Result< + Box< + dyn super::IndexReader< + Document = Self::Document, + Settings = Self::Settings, + >, + >, >, >, >, > { Ok(Box::new(self.indexes.iter().map(|index| { - Box::new(index) - as Box> + let index = Box::new(index) + as Box>; + Ok(index) }))) } - fn tasks( - &self, - ) -> Result)>>>> { - Ok(Box::new(std::iter::from_fn(|| { + fn tasks(&self) -> Box)>>> { + Box::new(std::iter::from_fn(|| { self.next_update() .transpose() .map(|result| result.map(|task| (task, None))) - }))) + })) } - fn keys(&self) -> Result>> { - Ok(Box::new(std::iter::empty())) + fn keys(&self) -> Box>> { + Box::new(std::iter::empty()) } } diff --git a/dump/src/reader/v6.rs b/dump/src/reader/v6.rs index 84cefe350..339f88b55 100644 --- a/dump/src/reader/v6.rs +++ b/dump/src/reader/v6.rs @@ -1,16 +1,170 @@ use std::{ - fs::{self}, + fs::{self, File}, + io::{BufRead, BufReader}, path::Path, }; +use index::Unchecked; +use tempfile::TempDir; use time::OffsetDateTime; -use crate::Result; +use crate::{Error, Result, Version}; + +use super::{DumpReader, IndexReader}; type Metadata = crate::Metadata; pub fn date(dump: &Path) -> Result { let metadata = fs::read(dump.join("metadata.json"))?; - let metadata: Metadata = serde_json::from_reader(metadata)?; + let metadata: Metadata = serde_json::from_reader(&*metadata)?; Ok(metadata.dump_date) } + +pub struct V6Reader { + dump: TempDir, + metadata: Metadata, + tasks: BufReader, + keys: BufReader, +} + +struct V6IndexReader { + name: String, + documents: BufReader, + settings: BufReader, +} + +impl V6IndexReader { + pub fn new(name: String, path: &Path) -> Result { + let ret = V6IndexReader { + name, + documents: BufReader::new(File::open(path.join("documents.jsonl"))?), + settings: BufReader::new(File::open(path.join("settings.json"))?), + }; + + Ok(ret) + } +} + +impl V6Reader { + pub fn open(dump: TempDir) -> Result { + let meta_file = fs::read(dump.path().join("metadata.json"))?; + let metadata = serde_json::from_reader(&*meta_file)?; + + Ok(V6Reader { + metadata, + tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?), + keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?), + dump, + }) + } +} + +impl DumpReader for V6Reader { + type Document = serde_json::Map; + type Settings = index::Settings; + + type Task = index_scheduler::TaskView; + type UpdateFile = File; + + type Key = meilisearch_auth::Key; + + fn version(&self) -> Version { + Version::V6 + } + + fn date(&self) -> Option { + Some(self.metadata.dump_date) + } + + fn indexes( + &self, + ) -> Result< + Box< + dyn Iterator< + Item = Result< + Box< + dyn super::IndexReader< + Document = Self::Document, + Settings = Self::Settings, + >, + >, + >, + >, + >, + > { + let entries = fs::read_dir(self.dump.path().join("indexes"))?; + Ok(Box::new( + entries + .map(|entry| -> Result> { + let entry = entry?; + if entry.file_type()?.is_dir() { + let index = Box::new(V6IndexReader::new( + entry + .file_name() + .to_str() + .ok_or(Error::BadIndexName)? + .to_string(), + &entry.path(), + )?) + as Box< + dyn IndexReader< + Document = Self::Document, + Settings = Self::Settings, + >, + >; + Ok(Some(index)) + } else { + Ok(None) + } + }) + .filter_map(|entry| entry.transpose()), + )) + } + + fn tasks( + &mut self, + ) -> Box)>> + '_> { + Box::new((&mut self.tasks).lines().map(|line| -> Result<_> { + let task: index_scheduler::TaskView = serde_json::from_str(&line?)?; + let update_file_path = self + .dump + .path() + .join("tasks") + .join("update_files") + .join(task.uid.to_string()); + + if update_file_path.exists() { + Ok((task, Some(File::open(update_file_path)?))) + } else { + Ok((task, None)) + } + })) + } + + fn keys(&mut self) -> Box> + '_> { + Box::new( + (&mut self.keys) + .lines() + .map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }), + ) + } +} + +impl IndexReader for V6IndexReader { + type Document = serde_json::Map; + type Settings = index::Settings; + + fn name(&self) -> &str { + &self.name + } + + fn documents(&mut self) -> Result> + '_>> { + Ok(Box::new((&mut self.documents).lines().map( + |line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }, + ))) + } + + fn settings(&mut self) -> Result { + Ok(serde_json::from_reader(&mut self.settings)?) + } +} diff --git a/dump/src/writer.rs b/dump/src/writer.rs index 76f5d95ca..0273de210 100644 --- a/dump/src/writer.rs +++ b/dump/src/writer.rs @@ -5,15 +5,13 @@ use std::{ }; use flate2::{write::GzEncoder, Compression}; -use serde::{Deserialize, Serialize}; +use serde::Serialize; use tempfile::TempDir; -use thiserror::Error; use time::OffsetDateTime; use uuid::Uuid; use crate::{Metadata, Result, CURRENT_DUMP_VERSION}; -#[must_use] pub struct DumpWriter { dir: TempDir, } @@ -27,7 +25,7 @@ impl DumpWriter { )?; let metadata = Metadata { - dump_version: CURRENT_DUMP_VERSION.to_string(), + dump_version: CURRENT_DUMP_VERSION, db_version: env!("CARGO_PKG_VERSION").to_string(), dump_date: OffsetDateTime::now_utc(), }; @@ -45,17 +43,14 @@ impl DumpWriter { IndexWriter::new(self.dir.path().join("indexes").join(index_name)) } - #[must_use] pub fn create_keys(&self) -> Result { KeyWriter::new(self.dir.path().to_path_buf()) } - #[must_use] pub fn create_tasks_queue(&self) -> Result { TaskWriter::new(self.dir.path().join("tasks")) } - #[must_use] pub fn persist_to(self, mut writer: impl Write) -> Result<()> { let gz_encoder = GzEncoder::new(&mut writer, Compression::default()); let mut tar_encoder = tar::Builder::new(gz_encoder); @@ -68,7 +63,6 @@ impl DumpWriter { } } -#[must_use] pub struct KeyWriter { file: File, } @@ -86,7 +80,6 @@ impl KeyWriter { } } -#[must_use] pub struct TaskWriter { queue: File, update_files: PathBuf, @@ -124,7 +117,6 @@ impl TaskWriter { } } -#[must_use] pub struct IndexWriter { documents: File, settings: File, @@ -149,7 +141,6 @@ impl IndexWriter { Ok(()) } - #[must_use] pub fn settings(mut self, settings: impl Serialize) -> Result<()> { self.settings.write_all(&serde_json::to_vec(&settings)?)?; Ok(())