start dumping the update files to a known format

This commit is contained in:
Tamo 2022-10-10 17:58:30 +02:00 committed by Clément Renault
parent 9117fde712
commit 0284764b5e
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
50 changed files with 20209 additions and 7996 deletions

View file

@ -37,19 +37,17 @@ pub mod meta;
pub mod settings;
pub mod updates;
use crate::{IndexMetadata, Result, Version};
use crate::{Error, IndexMetadata, Result, Version};
use self::meta::{DumpMeta, IndexUuid};
use super::compat::v3_to_v4::CompatV3ToV4;
use super::{compat::v3_to_v4::CompatV3ToV4, Document};
pub type Document = serde_json::Map<String, serde_json::Value>;
pub type Settings<T> = settings::Settings<T>;
pub type Checked = settings::Checked;
pub type Unchecked = settings::Unchecked;
pub type Task = updates::UpdateEntry;
pub type UpdateFile = File;
// ===== Other types to clarify the code of the compat module
// everything related to the tasks
@ -127,7 +125,9 @@ impl V3Reader {
}))
}
pub fn tasks(&mut self) -> Box<dyn Iterator<Item = Result<(Task, Option<UpdateFile>)>> + '_> {
pub fn tasks(
&mut self,
) -> Box<dyn Iterator<Item = Result<(Task, Option<Box<super::UpdateFile>>)>> + '_> {
Box::new((&mut self.tasks).lines().map(|line| -> Result<_> {
let task: Task = serde_json::from_str(&line?)?;
if !task.is_finished() {
@ -138,7 +138,12 @@ impl V3Reader {
.join("updates")
.join("updates_files")
.join(uuid.to_string());
Ok((task, Some(File::open(update_file_path).unwrap())))
Ok((
task,
Some(
Box::new(UpdateFile::new(&update_file_path)?) as Box<super::UpdateFile>
),
))
} else {
Ok((task, None))
}
@ -193,6 +198,32 @@ impl V3IndexReader {
}
}
pub struct UpdateFile {
reader: BufReader<File>,
}
impl UpdateFile {
fn new(path: &Path) -> Result<Self> {
Ok(UpdateFile {
reader: BufReader::new(File::open(path)?),
})
}
}
impl Iterator for UpdateFile {
type Item = Result<Document>;
fn next(&mut self) -> Option<Self::Item> {
(&mut self.reader)
.lines()
.map(|line| {
line.map_err(Error::from)
.and_then(|line| serde_json::from_str(&line).map_err(Error::from))
})
.next()
}
}
#[cfg(test)]
pub(crate) mod test {
use std::{fs::File, io::BufReader};
@ -218,12 +249,19 @@ pub(crate) mod test {
// tasks
let tasks = dump.tasks().collect::<Result<Vec<_>>>().unwrap();
let (tasks, update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
let (tasks, mut update_files): (Vec<_>, Vec<_>) = tasks.into_iter().unzip();
insta::assert_json_snapshot!(tasks);
assert_eq!(update_files.len(), 10);
assert!(update_files[0].is_some()); // the enqueued document addition
assert!(update_files[1..].iter().all(|u| u.is_none())); // everything already processed
let update_file = update_files
.remove(0)
.unwrap()
.collect::<Result<Vec<_>>>()
.unwrap();
insta::assert_json_snapshot!(update_file);
// indexes
let mut indexes = dump.indexes().unwrap().collect::<Result<Vec<_>>>().unwrap();
// the index are not ordered in any way by default