mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-04 12:27:13 +02:00
implement the dump reader v6
This commit is contained in:
parent
b89ac5d7ac
commit
e3bc87bf22
8 changed files with 247 additions and 69 deletions
|
@ -17,18 +17,20 @@ use crate::{Result, Version};
|
|||
// pub mod error;
|
||||
// mod compat;
|
||||
// mod loaders;
|
||||
mod v1;
|
||||
// mod v6;
|
||||
// mod v1;
|
||||
mod v6;
|
||||
|
||||
pub fn open(
|
||||
dump_path: &Path,
|
||||
) -> Result<
|
||||
impl DumpReader<
|
||||
Document = serde_json::Value,
|
||||
Settings = Settings<Unchecked>,
|
||||
Task = TaskView,
|
||||
UpdateFile = (),
|
||||
Key = Key,
|
||||
Box<
|
||||
dyn DumpReader<
|
||||
Document = serde_json::Map<String, serde_json::Value>,
|
||||
Settings = Settings<Unchecked>,
|
||||
Task = TaskView,
|
||||
UpdateFile = File,
|
||||
Key = Key,
|
||||
>,
|
||||
>,
|
||||
> {
|
||||
let path = TempDir::new()?;
|
||||
|
@ -54,10 +56,21 @@ pub fn open(
|
|||
Version::V3 => todo!(),
|
||||
Version::V4 => todo!(),
|
||||
Version::V5 => todo!(),
|
||||
Version::V6 => todo!(),
|
||||
};
|
||||
Version::V6 => {
|
||||
let dump_reader = Box::new(v6::V6Reader::open(path)?)
|
||||
as Box<
|
||||
dyn DumpReader<
|
||||
Document = serde_json::Map<String, serde_json::Value>,
|
||||
Settings = Settings<Unchecked>,
|
||||
Task = TaskView,
|
||||
UpdateFile = File,
|
||||
Key = Key,
|
||||
>,
|
||||
>;
|
||||
|
||||
todo!()
|
||||
Ok(dump_reader)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait DumpReader {
|
||||
|
@ -73,7 +86,7 @@ pub trait DumpReader {
|
|||
fn version(&self) -> Version;
|
||||
|
||||
/// Return at which date the index was created.
|
||||
fn date(&self) -> Result<Option<OffsetDateTime>>;
|
||||
fn date(&self) -> Option<OffsetDateTime>;
|
||||
|
||||
/// Return an iterator over each indexes.
|
||||
fn indexes(
|
||||
|
@ -81,18 +94,20 @@ pub trait DumpReader {
|
|||
) -> Result<
|
||||
Box<
|
||||
dyn Iterator<
|
||||
Item = Box<dyn IndexReader<Document = Self::Document, Settings = Self::Settings>>,
|
||||
Item = Result<
|
||||
Box<dyn IndexReader<Document = Self::Document, Settings = Self::Settings>>,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
>;
|
||||
|
||||
/// Return all the tasks in the dump with a possible update file.
|
||||
fn tasks(
|
||||
&self,
|
||||
) -> Result<Box<dyn Iterator<Item = Result<(Self::Task, Option<Self::UpdateFile>)>>>>;
|
||||
&mut self,
|
||||
) -> Box<dyn Iterator<Item = Result<(Self::Task, Option<Self::UpdateFile>)>> + '_>;
|
||||
|
||||
/// Return all the keys.
|
||||
fn keys(&self) -> Result<Box<dyn Iterator<Item = Self::Key>>>;
|
||||
fn keys(&mut self) -> Box<dyn Iterator<Item = Result<Self::Key>> + '_>;
|
||||
}
|
||||
|
||||
pub trait IndexReader {
|
||||
|
@ -100,6 +115,6 @@ pub trait IndexReader {
|
|||
type Settings;
|
||||
|
||||
fn name(&self) -> &str;
|
||||
fn documents(&self) -> Result<Box<dyn Iterator<Item = Self::Document>>>;
|
||||
fn settings(&self) -> Result<Self::Settings>;
|
||||
fn documents(&mut self) -> Result<Box<dyn Iterator<Item = Result<Self::Document>> + '_>>;
|
||||
fn settings(&mut self) -> Result<Self::Settings>;
|
||||
}
|
||||
|
|
|
@ -5,7 +5,6 @@ use std::{
|
|||
path::Path,
|
||||
};
|
||||
|
||||
use serde::Deserialize;
|
||||
use tempfile::TempDir;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
|
@ -26,9 +25,9 @@ pub struct V1Reader {
|
|||
|
||||
struct V1IndexReader {
|
||||
name: String,
|
||||
documents: File,
|
||||
settings: File,
|
||||
updates: File,
|
||||
documents: BufReader<File>,
|
||||
settings: BufReader<File>,
|
||||
updates: BufReader<File>,
|
||||
|
||||
current_update: Option<UpdateStatus>,
|
||||
}
|
||||
|
@ -37,9 +36,9 @@ impl V1IndexReader {
|
|||
pub fn new(name: String, path: &Path) -> Result<Self> {
|
||||
let mut ret = V1IndexReader {
|
||||
name,
|
||||
documents: File::open(path.join("documents.jsonl"))?,
|
||||
settings: File::open(path.join("settings.json"))?,
|
||||
updates: File::open(path.join("updates.jsonl"))?,
|
||||
documents: BufReader::new(File::open(path.join("documents.jsonl"))?),
|
||||
settings: BufReader::new(File::open(path.join("settings.json"))?),
|
||||
updates: BufReader::new(File::open(path.join("updates.jsonl"))?),
|
||||
current_update: None,
|
||||
};
|
||||
ret.next_update();
|
||||
|
@ -48,10 +47,7 @@ impl V1IndexReader {
|
|||
}
|
||||
|
||||
pub fn next_update(&mut self) -> Result<Option<UpdateStatus>> {
|
||||
let mut tasks = self.updates;
|
||||
let mut reader = BufReader::new(&mut tasks);
|
||||
|
||||
let current_update = if let Some(line) = reader.lines().next() {
|
||||
let current_update = if let Some(line) = self.updates.lines().next() {
|
||||
Some(serde_json::from_str(&line?)?)
|
||||
} else {
|
||||
None
|
||||
|
@ -90,10 +86,6 @@ impl V1Reader {
|
|||
})
|
||||
}
|
||||
|
||||
pub fn date(&self) -> Result<Option<OffsetDateTime>> {
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
fn next_update(&mut self) -> Result<Option<UpdateStatus>> {
|
||||
if let Some((idx, _)) = self
|
||||
.indexes
|
||||
|
@ -111,14 +103,14 @@ impl V1Reader {
|
|||
}
|
||||
|
||||
impl IndexReader for &V1IndexReader {
|
||||
type Document = serde_json::Value;
|
||||
type Document = serde_json::Map<String, serde_json::Value>;
|
||||
type Settings = settings::Settings;
|
||||
|
||||
fn name(&self) -> &str {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn documents(&self) -> Result<Box<dyn Iterator<Item = Self::Document>>> {
|
||||
fn documents(&self) -> Result<Box<dyn Iterator<Item = Result<Self::Document>>>> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
|
@ -128,16 +120,16 @@ impl IndexReader for &V1IndexReader {
|
|||
}
|
||||
|
||||
impl DumpReader for V1Reader {
|
||||
type Document = serde_json::Value;
|
||||
type Document = serde_json::Map<String, serde_json::Value>;
|
||||
type Settings = settings::Settings;
|
||||
|
||||
type Task = update::UpdateStatus;
|
||||
type UpdateFile = ();
|
||||
type UpdateFile = Infallible;
|
||||
|
||||
type Key = Infallible;
|
||||
|
||||
fn date(&self) -> Result<Option<OffsetDateTime>> {
|
||||
Ok(None)
|
||||
fn date(&self) -> Option<OffsetDateTime> {
|
||||
None
|
||||
}
|
||||
|
||||
fn version(&self) -> Version {
|
||||
|
@ -149,29 +141,33 @@ impl DumpReader for V1Reader {
|
|||
) -> Result<
|
||||
Box<
|
||||
dyn Iterator<
|
||||
Item = Box<
|
||||
dyn super::IndexReader<Document = Self::Document, Settings = Self::Settings>,
|
||||
Item = Result<
|
||||
Box<
|
||||
dyn super::IndexReader<
|
||||
Document = Self::Document,
|
||||
Settings = Self::Settings,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
> {
|
||||
Ok(Box::new(self.indexes.iter().map(|index| {
|
||||
Box::new(index)
|
||||
as Box<dyn IndexReader<Document = Self::Document, Settings = Self::Settings>>
|
||||
let index = Box::new(index)
|
||||
as Box<dyn IndexReader<Document = Self::Document, Settings = Self::Settings>>;
|
||||
Ok(index)
|
||||
})))
|
||||
}
|
||||
|
||||
fn tasks(
|
||||
&self,
|
||||
) -> Result<Box<dyn Iterator<Item = Result<(Self::Task, Option<Self::UpdateFile>)>>>> {
|
||||
Ok(Box::new(std::iter::from_fn(|| {
|
||||
fn tasks(&self) -> Box<dyn Iterator<Item = Result<(Self::Task, Option<Self::UpdateFile>)>>> {
|
||||
Box::new(std::iter::from_fn(|| {
|
||||
self.next_update()
|
||||
.transpose()
|
||||
.map(|result| result.map(|task| (task, None)))
|
||||
})))
|
||||
}))
|
||||
}
|
||||
|
||||
fn keys(&self) -> Result<Box<dyn Iterator<Item = Self::Key>>> {
|
||||
Ok(Box::new(std::iter::empty()))
|
||||
fn keys(&self) -> Box<dyn Iterator<Item = Result<Self::Key>>> {
|
||||
Box::new(std::iter::empty())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,16 +1,170 @@
|
|||
use std::{
|
||||
fs::{self},
|
||||
fs::{self, File},
|
||||
io::{BufRead, BufReader},
|
||||
path::Path,
|
||||
};
|
||||
|
||||
use index::Unchecked;
|
||||
use tempfile::TempDir;
|
||||
use time::OffsetDateTime;
|
||||
|
||||
use crate::Result;
|
||||
use crate::{Error, Result, Version};
|
||||
|
||||
use super::{DumpReader, IndexReader};
|
||||
|
||||
type Metadata = crate::Metadata;
|
||||
|
||||
pub fn date(dump: &Path) -> Result<OffsetDateTime> {
|
||||
let metadata = fs::read(dump.join("metadata.json"))?;
|
||||
let metadata: Metadata = serde_json::from_reader(metadata)?;
|
||||
let metadata: Metadata = serde_json::from_reader(&*metadata)?;
|
||||
Ok(metadata.dump_date)
|
||||
}
|
||||
|
||||
pub struct V6Reader {
|
||||
dump: TempDir,
|
||||
metadata: Metadata,
|
||||
tasks: BufReader<File>,
|
||||
keys: BufReader<File>,
|
||||
}
|
||||
|
||||
struct V6IndexReader {
|
||||
name: String,
|
||||
documents: BufReader<File>,
|
||||
settings: BufReader<File>,
|
||||
}
|
||||
|
||||
impl V6IndexReader {
|
||||
pub fn new(name: String, path: &Path) -> Result<Self> {
|
||||
let ret = V6IndexReader {
|
||||
name,
|
||||
documents: BufReader::new(File::open(path.join("documents.jsonl"))?),
|
||||
settings: BufReader::new(File::open(path.join("settings.json"))?),
|
||||
};
|
||||
|
||||
Ok(ret)
|
||||
}
|
||||
}
|
||||
|
||||
impl V6Reader {
|
||||
pub fn open(dump: TempDir) -> Result<Self> {
|
||||
let meta_file = fs::read(dump.path().join("metadata.json"))?;
|
||||
let metadata = serde_json::from_reader(&*meta_file)?;
|
||||
|
||||
Ok(V6Reader {
|
||||
metadata,
|
||||
tasks: BufReader::new(File::open(dump.path().join("tasks").join("queue.jsonl"))?),
|
||||
keys: BufReader::new(File::open(dump.path().join("keys.jsonl"))?),
|
||||
dump,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl DumpReader for V6Reader {
|
||||
type Document = serde_json::Map<String, serde_json::Value>;
|
||||
type Settings = index::Settings<Unchecked>;
|
||||
|
||||
type Task = index_scheduler::TaskView;
|
||||
type UpdateFile = File;
|
||||
|
||||
type Key = meilisearch_auth::Key;
|
||||
|
||||
fn version(&self) -> Version {
|
||||
Version::V6
|
||||
}
|
||||
|
||||
fn date(&self) -> Option<OffsetDateTime> {
|
||||
Some(self.metadata.dump_date)
|
||||
}
|
||||
|
||||
fn indexes(
|
||||
&self,
|
||||
) -> Result<
|
||||
Box<
|
||||
dyn Iterator<
|
||||
Item = Result<
|
||||
Box<
|
||||
dyn super::IndexReader<
|
||||
Document = Self::Document,
|
||||
Settings = Self::Settings,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
>,
|
||||
> {
|
||||
let entries = fs::read_dir(self.dump.path().join("indexes"))?;
|
||||
Ok(Box::new(
|
||||
entries
|
||||
.map(|entry| -> Result<Option<_>> {
|
||||
let entry = entry?;
|
||||
if entry.file_type()?.is_dir() {
|
||||
let index = Box::new(V6IndexReader::new(
|
||||
entry
|
||||
.file_name()
|
||||
.to_str()
|
||||
.ok_or(Error::BadIndexName)?
|
||||
.to_string(),
|
||||
&entry.path(),
|
||||
)?)
|
||||
as Box<
|
||||
dyn IndexReader<
|
||||
Document = Self::Document,
|
||||
Settings = Self::Settings,
|
||||
>,
|
||||
>;
|
||||
Ok(Some(index))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
})
|
||||
.filter_map(|entry| entry.transpose()),
|
||||
))
|
||||
}
|
||||
|
||||
fn tasks(
|
||||
&mut self,
|
||||
) -> Box<dyn Iterator<Item = Result<(Self::Task, Option<Self::UpdateFile>)>> + '_> {
|
||||
Box::new((&mut self.tasks).lines().map(|line| -> Result<_> {
|
||||
let task: index_scheduler::TaskView = serde_json::from_str(&line?)?;
|
||||
let update_file_path = self
|
||||
.dump
|
||||
.path()
|
||||
.join("tasks")
|
||||
.join("update_files")
|
||||
.join(task.uid.to_string());
|
||||
|
||||
if update_file_path.exists() {
|
||||
Ok((task, Some(File::open(update_file_path)?)))
|
||||
} else {
|
||||
Ok((task, None))
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
fn keys(&mut self) -> Box<dyn Iterator<Item = Result<Self::Key>> + '_> {
|
||||
Box::new(
|
||||
(&mut self.keys)
|
||||
.lines()
|
||||
.map(|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) }),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl IndexReader for V6IndexReader {
|
||||
type Document = serde_json::Map<String, serde_json::Value>;
|
||||
type Settings = index::Settings<Unchecked>;
|
||||
|
||||
fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
fn documents(&mut self) -> Result<Box<dyn Iterator<Item = Result<Self::Document>> + '_>> {
|
||||
Ok(Box::new((&mut self.documents).lines().map(
|
||||
|line| -> Result<_> { Ok(serde_json::from_str(&line?)?) },
|
||||
)))
|
||||
}
|
||||
|
||||
fn settings(&mut self) -> Result<Self::Settings> {
|
||||
Ok(serde_json::from_reader(&mut self.settings)?)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue