rewrite the update file API

This commit is contained in:
Tamo 2022-10-10 19:57:47 +02:00 committed by Clément Renault
parent 7579a363ab
commit 2ae0806773
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 40 additions and 12 deletions

View File

@ -236,12 +236,15 @@ pub(crate) mod test {
// ========== pushing the task queue // ========== pushing the task queue
let tasks = create_test_tasks(); let tasks = create_test_tasks();
/*
let mut task_queue = dump.create_tasks_queue().unwrap(); let mut task_queue = dump.create_tasks_queue().unwrap();
for (task, update_file) in &tasks { for (task, update_file) in &tasks {
task_queue.push_task(task, update_file.map(|c| c)).unwrap(); let mut update = task_queue.push_task(task).unwrap();
if let Some(update_file) = update_file {
for u in update_file {
update.push_document(u).unwrap();
}
}
} }
*/
// ========== pushing the api keys // ========== pushing the api keys
let api_keys = create_test_api_keys(); let api_keys = create_test_api_keys();

View File

@ -118,7 +118,7 @@ impl V6Reader {
.path() .path()
.join("tasks") .join("tasks")
.join("update_files") .join("update_files")
.join(task.uid.to_string()); .join(format!("{}.jsonl", task.uid.to_string()));
if update_file_path.exists() { if update_file_path.exists() {
Ok(( Ok((

View File

@ -1,6 +1,6 @@
use std::{ use std::{
fs::{self, File}, fs::{self, File},
io::{Read, Write}, io::{BufReader, BufWriter, Read, Write},
path::PathBuf, path::PathBuf,
}; };
@ -13,7 +13,7 @@ use tempfile::TempDir;
use time::OffsetDateTime; use time::OffsetDateTime;
use uuid::Uuid; use uuid::Uuid;
use crate::{IndexMetadata, Metadata, Result, CURRENT_DUMP_VERSION}; use crate::{reader::Document, IndexMetadata, Metadata, Result, CURRENT_DUMP_VERSION};
pub struct DumpWriter { pub struct DumpWriter {
dir: TempDir, dir: TempDir,
@ -105,16 +105,41 @@ impl TaskWriter {
/// Pushes tasks in the dump. /// Pushes tasks in the dump.
/// If the tasks has an associated `update_file` it'll use the `task_id` as its name. /// If the tasks has an associated `update_file` it'll use the `task_id` as its name.
pub fn push_task(&mut self, task: &TaskView, update_file: Option<impl Read>) -> Result<()> { pub fn push_task(&mut self, task: &TaskView) -> Result<UpdateFile> {
// TODO: this could be removed the day we implements `Deserialize` on the Duration. // TODO: this could be removed the day we implements `Deserialize` on the Duration.
let mut task = task.clone(); let mut task = task.clone();
task.duration = None; task.duration = None;
self.queue.write_all(&serde_json::to_vec(&task)?)?; self.queue.write_all(&serde_json::to_vec(&task)?)?;
self.queue.write_all(b"\n")?; self.queue.write_all(b"\n")?;
if let Some(mut update_file) = update_file {
let mut file = File::create(&self.update_files.join(task.uid.to_string()))?; Ok(UpdateFile::new(
std::io::copy(&mut update_file, &mut file)?; self.update_files
.join(format!("{}.jsonl", task.uid.to_string())),
))
}
}
pub struct UpdateFile {
path: PathBuf,
writer: Option<BufWriter<File>>,
}
impl UpdateFile {
pub(crate) fn new(path: PathBuf) -> UpdateFile {
UpdateFile { path, writer: None }
}
pub fn push_document(&mut self, document: &Document) -> Result<()> {
if let Some(writer) = self.writer.as_mut() {
writer.write_all(&serde_json::to_vec(document)?)?;
writer.write_all(b"\n")?;
writer.flush()?;
} else {
dbg!(&self.path);
let file = File::create(&self.path).unwrap();
self.writer = Some(BufWriter::new(file));
self.push_document(document)?;
} }
Ok(()) Ok(())
} }
@ -253,7 +278,7 @@ pub(crate) mod test {
---- metadata.json ---- metadata.json
---- tasks/ ---- tasks/
---- update_files/ ---- update_files/
---- 1 ---- 1.jsonl
---- queue.jsonl ---- queue.jsonl
---- keys.jsonl ---- keys.jsonl
---- metadata.json ---- metadata.json
@ -310,7 +335,7 @@ pub(crate) mod test {
assert_eq!(serde_json::from_str::<TaskView>(task).unwrap(), expected.0); assert_eq!(serde_json::from_str::<TaskView>(task).unwrap(), expected.0);
if let Some(expected_update) = expected.1 { if let Some(expected_update) = expected.1 {
let path = dump_path.join(format!("tasks/update_files/{}", expected.0.uid)); let path = dump_path.join(format!("tasks/update_files/{}.jsonl", expected.0.uid));
println!("trying to open {}", path.display()); println!("trying to open {}", path.display());
let update = fs::read_to_string(path).unwrap(); let update = fs::read_to_string(path).unwrap();
let documents: Vec<Document> = update let documents: Vec<Document> = update