update the API a little bit

This commit is contained in:
Tamo 2022-10-13 16:21:54 +02:00 committed by Clément Renault
parent 72a906ae75
commit c051166bcc
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
8 changed files with 182 additions and 203 deletions

View File

@ -12,7 +12,7 @@ mod reader;
mod writer;
pub use error::Error;
pub use reader::open;
pub use reader::DumpReader;
pub use writer::DumpWriter;
const CURRENT_DUMP_VERSION: Version = Version::V6;
@ -193,27 +193,21 @@ pub(crate) mod test {
use big_s::S;
use maplit::btreeset;
use meilisearch_types::tasks::{Kind, Status};
use meilisearch_types::keys::{Action, Key};
use meilisearch_types::milli::{self, update::Setting};
use meilisearch_types::tasks::Status;
use meilisearch_types::{index_uid::IndexUid, star_or::StarOr};
use meilisearch_types::{
keys::{Action, Key},
tasks::Task,
};
use meilisearch_types::{
milli::{self, update::Setting},
tasks::KindWithContent,
};
use meilisearch_types::{
settings::{Checked, Settings},
tasks::Details,
};
use serde_json::{json, Map, Value};
use time::{macros::datetime, Duration};
use time::macros::datetime;
use uuid::Uuid;
use crate::{
reader::{self, Document},
DumpWriter, IndexMetadata, KindDump, TaskDump, Version,
DumpReader, DumpWriter, IndexMetadata, KindDump, TaskDump, Version,
};
pub fn create_test_instance_uid() -> Uuid {
@ -419,7 +413,7 @@ pub(crate) mod test {
#[test]
fn test_creating_and_read_dump() {
let mut file = create_test_dump();
let mut dump = reader::open(&mut file).unwrap();
let mut dump = DumpReader::open(&mut file).unwrap();
// ==== checking the top level infos
assert_eq!(dump.version(), Version::V6);

View File

@ -1,151 +1,4 @@
// pub mod v2;
// pub mod v3;
// pub mod v4;
use crate::Result;
use self::{
v4_to_v5::CompatV4ToV5,
v5_to_v6::{CompatIndexV5ToV6, CompatV5ToV6},
};
use super::{
v5::V5Reader,
v6::{self, V6IndexReader, V6Reader},
Document, UpdateFile,
};
pub mod v2_to_v3;
pub mod v3_to_v4;
pub mod v4_to_v5;
pub mod v5_to_v6;
pub enum Compat {
Current(V6Reader),
Compat(CompatV5ToV6),
}
impl Compat {
pub fn version(&self) -> crate::Version {
match self {
Compat::Current(current) => current.version(),
Compat::Compat(compat) => compat.version(),
}
}
pub fn date(&self) -> Option<time::OffsetDateTime> {
match self {
Compat::Current(current) => current.date(),
Compat::Compat(compat) => compat.date(),
}
}
pub fn instance_uid(&self) -> Result<Option<uuid::Uuid>> {
match self {
Compat::Current(current) => current.instance_uid(),
Compat::Compat(compat) => compat.instance_uid(),
}
}
pub fn indexes(&self) -> Result<Box<dyn Iterator<Item = Result<CompatIndex>> + '_>> {
match self {
Compat::Current(current) => {
let indexes = Box::new(current.indexes()?.map(|res| res.map(CompatIndex::from)))
as Box<dyn Iterator<Item = Result<CompatIndex>> + '_>;
Ok(indexes)
}
Compat::Compat(compat) => {
let indexes = Box::new(compat.indexes()?.map(|res| res.map(CompatIndex::from)))
as Box<dyn Iterator<Item = Result<CompatIndex>> + '_>;
Ok(indexes)
}
}
}
pub fn tasks(
&mut self,
) -> Box<dyn Iterator<Item = Result<(v6::Task, Option<Box<UpdateFile>>)>> + '_> {
match self {
Compat::Current(current) => current.tasks(),
Compat::Compat(compat) => compat.tasks(),
}
}
pub fn keys(&mut self) -> Box<dyn Iterator<Item = Result<v6::Key>> + '_> {
match self {
Compat::Current(current) => current.keys(),
Compat::Compat(compat) => compat.keys(),
}
}
}
impl From<V6Reader> for Compat {
fn from(value: V6Reader) -> Self {
Compat::Current(value)
}
}
impl From<CompatV5ToV6> for Compat {
fn from(value: CompatV5ToV6) -> Self {
Compat::Compat(value)
}
}
impl From<V5Reader> for Compat {
fn from(value: V5Reader) -> Self {
Compat::Compat(value.to_v6())
}
}
impl From<CompatV4ToV5> for Compat {
fn from(value: CompatV4ToV5) -> Self {
Compat::Compat(value.to_v6())
}
}
pub enum CompatIndex {
Current(v6::V6IndexReader),
Compat(CompatIndexV5ToV6),
}
impl CompatIndex {
pub fn new_v6(v6: v6::V6IndexReader) -> CompatIndex {
CompatIndex::Current(v6)
}
pub fn metadata(&self) -> &crate::IndexMetadata {
match self {
CompatIndex::Current(v6) => v6.metadata(),
CompatIndex::Compat(compat) => compat.metadata(),
}
}
pub fn documents(&mut self) -> Result<Box<dyn Iterator<Item = Result<Document>> + '_>> {
match self {
CompatIndex::Current(v6) => v6
.documents()
.map(|iter| Box::new(iter) as Box<dyn Iterator<Item = Result<Document>> + '_>),
CompatIndex::Compat(compat) => compat
.documents()
.map(|iter| Box::new(iter) as Box<dyn Iterator<Item = Result<Document>> + '_>),
}
}
pub fn settings(&mut self) -> Result<v6::Settings<v6::Checked>> {
match self {
CompatIndex::Current(v6) => v6.settings(),
CompatIndex::Compat(compat) => compat.settings(),
}
}
}
impl From<V6IndexReader> for CompatIndex {
fn from(value: V6IndexReader) -> Self {
CompatIndex::Current(value)
}
}
impl From<CompatIndexV5ToV6> for CompatIndex {
fn from(value: CompatIndexV5ToV6) -> Self {
CompatIndex::Compat(value)
}
}

View File

@ -1,15 +1,15 @@
use std::io::Read;
use std::{fs::File, io::BufReader};
use flate2::bufread::GzDecoder;
use serde::Deserialize;
use tempfile::TempDir;
use self::compat::v4_to_v5::CompatV4ToV5;
use self::compat::v5_to_v6::{CompatIndexV5ToV6, CompatV5ToV6};
use self::v5::V5Reader;
use self::v6::{V6IndexReader, V6Reader};
use crate::{Result, Version};
use self::compat::Compat;
use flate2::bufread::GzDecoder;
use serde::Deserialize;
use tempfile::TempDir;
mod compat;
@ -23,34 +23,165 @@ pub(self) mod v6;
pub type Document = serde_json::Map<String, serde_json::Value>;
pub type UpdateFile = dyn Iterator<Item = Result<Document>>;
pub fn open(dump: impl Read) -> Result<Compat> {
let path = TempDir::new()?;
let mut dump = BufReader::new(dump);
let gz = GzDecoder::new(&mut dump);
let mut archive = tar::Archive::new(gz);
archive.unpack(path.path())?;
pub enum DumpReader {
Current(V6Reader),
Compat(CompatV5ToV6),
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct MetadataVersion {
pub dump_version: Version,
impl DumpReader {
pub fn open(dump: impl Read) -> Result<DumpReader> {
let path = TempDir::new()?;
let mut dump = BufReader::new(dump);
let gz = GzDecoder::new(&mut dump);
let mut archive = tar::Archive::new(gz);
archive.unpack(path.path())?;
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct MetadataVersion {
pub dump_version: Version,
}
let mut meta_file = File::open(path.path().join("metadata.json"))?;
let MetadataVersion { dump_version } = serde_json::from_reader(&mut meta_file)?;
match dump_version {
// Version::V1 => Ok(Box::new(v1::Reader::open(path)?)),
Version::V1 => todo!(),
Version::V2 => Ok(v2::V2Reader::open(path)?
.to_v3()
.to_v4()
.to_v5()
.to_v6()
.into()),
Version::V3 => Ok(v3::V3Reader::open(path)?.to_v4().to_v5().to_v6().into()),
Version::V4 => Ok(v4::V4Reader::open(path)?.to_v5().to_v6().into()),
Version::V5 => Ok(v5::V5Reader::open(path)?.to_v6().into()),
Version::V6 => Ok(v6::V6Reader::open(path)?.into()),
}
}
let mut meta_file = File::open(path.path().join("metadata.json"))?;
let MetadataVersion { dump_version } = serde_json::from_reader(&mut meta_file)?;
match dump_version {
// Version::V1 => Ok(Box::new(v1::Reader::open(path)?)),
Version::V1 => todo!(),
Version::V2 => Ok(v2::V2Reader::open(path)?
.to_v3()
.to_v4()
.to_v5()
.to_v6()
.into()),
Version::V3 => Ok(v3::V3Reader::open(path)?.to_v4().to_v5().to_v6().into()),
Version::V4 => Ok(v4::V4Reader::open(path)?.to_v5().to_v6().into()),
Version::V5 => Ok(v5::V5Reader::open(path)?.to_v6().into()),
Version::V6 => Ok(v6::V6Reader::open(path)?.into()),
pub fn version(&self) -> crate::Version {
match self {
DumpReader::Current(current) => current.version(),
DumpReader::Compat(compat) => compat.version(),
}
}
pub fn date(&self) -> Option<time::OffsetDateTime> {
match self {
DumpReader::Current(current) => current.date(),
DumpReader::Compat(compat) => compat.date(),
}
}
pub fn instance_uid(&self) -> Result<Option<uuid::Uuid>> {
match self {
DumpReader::Current(current) => current.instance_uid(),
DumpReader::Compat(compat) => compat.instance_uid(),
}
}
pub fn indexes(&self) -> Result<Box<dyn Iterator<Item = Result<DumpIndexReader>> + '_>> {
match self {
DumpReader::Current(current) => {
let indexes = Box::new(current.indexes()?.map(|res| res.map(DumpIndexReader::from)))
as Box<dyn Iterator<Item = Result<DumpIndexReader>> + '_>;
Ok(indexes)
}
DumpReader::Compat(compat) => {
let indexes = Box::new(compat.indexes()?.map(|res| res.map(DumpIndexReader::from)))
as Box<dyn Iterator<Item = Result<DumpIndexReader>> + '_>;
Ok(indexes)
}
}
}
pub fn tasks(
&mut self,
) -> Box<dyn Iterator<Item = Result<(v6::Task, Option<Box<UpdateFile>>)>> + '_> {
match self {
DumpReader::Current(current) => current.tasks(),
DumpReader::Compat(compat) => compat.tasks(),
}
}
pub fn keys(&mut self) -> Box<dyn Iterator<Item = Result<v6::Key>> + '_> {
match self {
DumpReader::Current(current) => current.keys(),
DumpReader::Compat(compat) => compat.keys(),
}
}
}
impl From<V6Reader> for DumpReader {
fn from(value: V6Reader) -> Self {
DumpReader::Current(value)
}
}
impl From<CompatV5ToV6> for DumpReader {
fn from(value: CompatV5ToV6) -> Self {
DumpReader::Compat(value)
}
}
impl From<V5Reader> for DumpReader {
fn from(value: V5Reader) -> Self {
DumpReader::Compat(value.to_v6())
}
}
impl From<CompatV4ToV5> for DumpReader {
fn from(value: CompatV4ToV5) -> Self {
DumpReader::Compat(value.to_v6())
}
}
pub enum DumpIndexReader {
Current(v6::V6IndexReader),
Compat(CompatIndexV5ToV6),
}
impl DumpIndexReader {
pub fn new_v6(v6: v6::V6IndexReader) -> DumpIndexReader {
DumpIndexReader::Current(v6)
}
pub fn metadata(&self) -> &crate::IndexMetadata {
match self {
DumpIndexReader::Current(v6) => v6.metadata(),
DumpIndexReader::Compat(compat) => compat.metadata(),
}
}
pub fn documents(&mut self) -> Result<Box<dyn Iterator<Item = Result<Document>> + '_>> {
match self {
DumpIndexReader::Current(v6) => v6
.documents()
.map(|iter| Box::new(iter) as Box<dyn Iterator<Item = Result<Document>> + '_>),
DumpIndexReader::Compat(compat) => compat
.documents()
.map(|iter| Box::new(iter) as Box<dyn Iterator<Item = Result<Document>> + '_>),
}
}
pub fn settings(&mut self) -> Result<v6::Settings<v6::Checked>> {
match self {
DumpIndexReader::Current(v6) => v6.settings(),
DumpIndexReader::Compat(compat) => compat.settings(),
}
}
}
impl From<V6IndexReader> for DumpIndexReader {
fn from(value: V6IndexReader) -> Self {
DumpIndexReader::Current(value)
}
}
impl From<CompatIndexV5ToV6> for DumpIndexReader {
fn from(value: CompatIndexV5ToV6) -> Self {
DumpIndexReader::Compat(value)
}
}
@ -63,7 +194,7 @@ pub(crate) mod test {
#[test]
fn import_dump_v5() {
let dump = File::open("tests/assets/v5.dump").unwrap();
let mut dump = open(dump).unwrap();
let mut dump = DumpReader::open(dump).unwrap();
// top level infos
insta::assert_display_snapshot!(dump.date().unwrap(), @"2022-10-04 15:55:10.344982459 +00:00:00");
@ -153,7 +284,7 @@ pub(crate) mod test {
#[test]
fn import_dump_v4() {
let dump = File::open("tests/assets/v4.dump").unwrap();
let mut dump = open(dump).unwrap();
let mut dump = DumpReader::open(dump).unwrap();
// top level infos
insta::assert_display_snapshot!(dump.date().unwrap(), @"2022-10-06 12:53:49.131989609 +00:00:00");
@ -242,7 +373,7 @@ pub(crate) mod test {
#[test]
fn import_dump_v3() {
let dump = File::open("tests/assets/v3.dump").unwrap();
let mut dump = open(dump).unwrap();
let mut dump = DumpReader::open(dump).unwrap();
// top level infos
insta::assert_display_snapshot!(dump.date().unwrap(), @"2022-10-07 11:39:03.709153554 +00:00:00");
@ -351,7 +482,7 @@ pub(crate) mod test {
#[test]
fn import_dump_v2() {
let dump = File::open("tests/assets/v2.dump").unwrap();
let mut dump = open(dump).unwrap();
let mut dump = DumpReader::open(dump).unwrap();
// top level infos
insta::assert_display_snapshot!(dump.date().unwrap(), @"2022-10-09 20:27:59.904096267 +00:00:00");

View File

@ -11,8 +11,6 @@ use uuid::Uuid;
use crate::{Error, IndexMetadata, Result, Version};
mod tasks;
pub use meilisearch_types::milli;
use super::Document;

View File

@ -1 +0,0 @@

View File

@ -8,7 +8,6 @@ use flate2::{write::GzEncoder, Compression};
use meilisearch_types::{
keys::Key,
settings::{Checked, Settings},
tasks::Task,
};
use serde_json::{Map, Value};
use tempfile::TempDir;

View File

@ -503,11 +503,12 @@ impl IndexScheduler {
let mut tasks = dump.create_tasks_queue()?;
for ret in self.all_tasks.iter(&rtxn)? {
let (_, task) = ret?;
let mut dump_content_file = tasks.push_task((&task).into())?;
let content_file = task.content_uuid().map(|uuid| uuid.clone());
let mut dump_content_file = tasks.push_task(&task.into())?;
// 2.1. Dump the `content_file` associated with the task if there is one.
if let Some(content_file) = task.content_uuid() {
let content_file = self.file_store.get_update(*content_file)?;
if let Some(content_file) = content_file {
let content_file = self.file_store.get_update(content_file)?;
let reader = DocumentsBatchReader::from_reader(content_file)
.map_err(milli::Error::from)?;

View File

@ -220,6 +220,10 @@ impl IndexScheduler {
Ok(this)
}
pub fn import_dump(&self, dump_path: PathBuf) -> Result<()> {
todo!()
}
/// This function will execute in a different thread and must be called only once.
fn run(&self) {
let run = Self {