fix and remove the file-store hack of /dev/null

This commit is contained in:
Tamo 2024-02-22 18:42:12 +01:00
parent c2e2003a80
commit eb90f0b4fb
4 changed files with 63 additions and 71 deletions

View File

@ -1,5 +1,5 @@
use std::fs::File as StdFile;
use std::ops::{Deref, DerefMut};
use std::io::Write;
use std::path::{Path, PathBuf};
use std::str::FromStr;
@ -22,20 +22,6 @@ pub enum Error {
pub type Result<T> = std::result::Result<T, Error>;
impl Deref for File {
type Target = NamedTempFile;
fn deref(&self) -> &Self::Target {
&self.file
}
}
impl DerefMut for File {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.file
}
}
#[derive(Clone, Debug)]
pub struct FileStore {
path: PathBuf,
@ -56,7 +42,7 @@ impl FileStore {
let file = NamedTempFile::new_in(&self.path)?;
let uuid = Uuid::new_v4();
let path = self.path.join(uuid.to_string());
let update_file = File { dry: false, file, path };
let update_file = File { file: Some(file), path };
Ok((uuid, update_file))
}
@ -67,7 +53,7 @@ impl FileStore {
let file = NamedTempFile::new_in(&self.path)?;
let uuid = Uuid::from_u128(uuid);
let path = self.path.join(uuid.to_string());
let update_file = File { dry: false, file, path };
let update_file = File { file: Some(file), path };
Ok((uuid, update_file))
}
@ -135,33 +121,41 @@ impl FileStore {
}
pub struct File {
dry: bool,
path: PathBuf,
file: NamedTempFile,
file: Option<NamedTempFile>,
}
impl File {
pub fn dry_file() -> Result<Self> {
#[cfg(target_family = "unix")]
let path = PathBuf::from_str("/dev/null").unwrap();
#[cfg(target_family = "windows")]
let path = PathBuf::from_str("\\Device\\Null").unwrap();
Ok(Self {
dry: true,
path: path.clone(),
file: tempfile::Builder::new().make(|_| std::fs::File::create(path.clone()))?,
})
Ok(Self { path: PathBuf::new(), file: None })
}
pub fn persist(self) -> Result<()> {
if !self.dry {
self.file.persist(&self.path)?;
if let Some(file) = self.file {
file.persist(&self.path)?;
}
Ok(())
}
}
impl Write for File {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
if let Some(file) = self.file.as_mut() {
file.write(buf)
} else {
Ok(buf.len())
}
}
fn flush(&mut self) -> std::io::Result<()> {
if let Some(file) = self.file.as_mut() {
file.flush()
} else {
Ok(())
}
}
}
#[cfg(test)]
mod test {
use std::io::Write;

View File

@ -1550,7 +1550,7 @@ impl<'a> Dump<'a> {
let content_uuid = match content_file {
Some(content_file) if task.status == Status::Enqueued => {
let (uuid, mut file) = self.index_scheduler.create_update_file(false)?;
let mut builder = DocumentsBatchBuilder::new(file.as_file_mut());
let mut builder = DocumentsBatchBuilder::new(&mut file);
for doc in content_file {
builder.append_json_object(&doc?)?;
}
@ -1734,7 +1734,7 @@ pub struct IndexStats {
#[cfg(test)]
mod tests {
use std::io::{BufWriter, Seek, Write};
use std::io::{BufWriter, Write};
use std::time::Instant;
use big_s::S;
@ -1882,7 +1882,7 @@ mod tests {
/// Adapting to the new json reading interface
pub fn read_json(
bytes: &[u8],
write: impl Write + Seek,
write: impl Write,
) -> std::result::Result<u64, DocumentFormatError> {
let temp_file = NamedTempFile::new().unwrap();
let mut buffer = BufWriter::new(temp_file.reopen().unwrap());
@ -1909,7 +1909,7 @@ mod tests {
);
let (_uuid, mut file) = index_scheduler.create_update_file_with_uuid(file_uuid).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
(file, documents_count)
}
@ -2321,7 +2321,7 @@ mod tests {
}"#;
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
index_scheduler
.register(
@ -2366,7 +2366,7 @@ mod tests {
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
index_scheduler
.register(
@ -2406,7 +2406,7 @@ mod tests {
]"#;
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
index_scheduler
.register(
@ -2472,7 +2472,7 @@ mod tests {
]"#;
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
index_scheduler
.register(
@ -2678,7 +2678,7 @@ mod tests {
}"#;
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
index_scheduler
.register(
@ -2852,7 +2852,7 @@ mod tests {
);
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
index_scheduler
.register(
@ -2903,7 +2903,7 @@ mod tests {
);
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
index_scheduler
.register(
@ -2956,7 +2956,7 @@ mod tests {
);
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
index_scheduler
.register(
@ -3010,7 +3010,7 @@ mod tests {
);
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
index_scheduler
.register(
@ -3065,7 +3065,7 @@ mod tests {
);
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
index_scheduler
.register(
@ -3567,7 +3567,7 @@ mod tests {
}"#;
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
index_scheduler
.register(
@ -3609,7 +3609,7 @@ mod tests {
}"#;
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
index_scheduler
.register(
@ -3669,7 +3669,7 @@ mod tests {
);
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
index_scheduler
.register(
@ -3721,7 +3721,7 @@ mod tests {
);
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
index_scheduler
.register(
@ -3783,7 +3783,7 @@ mod tests {
);
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
index_scheduler
.register(
@ -3850,7 +3850,7 @@ mod tests {
);
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
index_scheduler
.register(
@ -3922,7 +3922,7 @@ mod tests {
let allow_index_creation = i % 2 != 0;
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
index_scheduler
.register(
@ -3979,7 +3979,7 @@ mod tests {
let allow_index_creation = i % 2 != 0;
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
file.persist().unwrap();
index_scheduler
.register(
@ -4033,7 +4033,7 @@ mod tests {
);
let (uuid, mut file) =
index_scheduler.create_update_file_with_uuid(id as u128).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
assert_eq!(documents_count, 1);
file.persist().unwrap();
@ -4098,7 +4098,7 @@ mod tests {
);
let (uuid, mut file) =
index_scheduler.create_update_file_with_uuid(id as u128).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
assert_eq!(documents_count, 1);
file.persist().unwrap();
@ -4159,7 +4159,7 @@ mod tests {
);
let (uuid, mut file) =
index_scheduler.create_update_file_with_uuid(id as u128).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
assert_eq!(documents_count, 1);
file.persist().unwrap();
@ -4244,7 +4244,7 @@ mod tests {
);
let (uuid, mut file) =
index_scheduler.create_update_file_with_uuid(id as u128).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
assert_eq!(documents_count, 1);
file.persist().unwrap();
@ -4331,7 +4331,7 @@ mod tests {
);
let (uuid, mut file) =
index_scheduler.create_update_file_with_uuid(id as u128).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
let documents_count = read_json(content.as_bytes(), &mut file).unwrap();
assert_eq!(documents_count, 1);
file.persist().unwrap();

View File

@ -1,6 +1,6 @@
use std::fmt::{self, Debug, Display};
use std::fs::File;
use std::io::{self, Seek, Write};
use std::io::{self, BufWriter, Write};
use std::marker::PhantomData;
use memmap2::MmapOptions;
@ -104,8 +104,8 @@ impl ErrorCode for DocumentFormatError {
}
/// Reads CSV from input and write an obkv batch to writer.
pub fn read_csv(file: &File, writer: impl Write + Seek, delimiter: u8) -> Result<u64> {
let mut builder = DocumentsBatchBuilder::new(writer);
pub fn read_csv(file: &File, writer: impl Write, delimiter: u8) -> Result<u64> {
let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer));
let mmap = unsafe { MmapOptions::new().map(file)? };
let csv = csv::ReaderBuilder::new().delimiter(delimiter).from_reader(mmap.as_ref());
builder.append_csv(csv).map_err(|e| (PayloadType::Csv { delimiter }, e))?;
@ -116,9 +116,9 @@ pub fn read_csv(file: &File, writer: impl Write + Seek, delimiter: u8) -> Result
Ok(count as u64)
}
/// Reads JSON from temporary file and write an obkv batch to writer.
pub fn read_json(file: &File, writer: impl Write + Seek) -> Result<u64> {
let mut builder = DocumentsBatchBuilder::new(writer);
/// Reads JSON from temporary file and write an obkv batch to writer.
pub fn read_json(file: &File, writer: impl Write) -> Result<u64> {
let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer));
let mmap = unsafe { MmapOptions::new().map(file)? };
let mut deserializer = serde_json::Deserializer::from_slice(&mmap);
@ -151,8 +151,8 @@ pub fn read_json(file: &File, writer: impl Write + Seek) -> Result<u64> {
}
/// Reads JSON from temporary file and write an obkv batch to writer.
pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result<u64> {
let mut builder = DocumentsBatchBuilder::new(writer);
pub fn read_ndjson(file: &File, writer: impl Write) -> Result<u64> {
let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer));
let mmap = unsafe { MmapOptions::new().map(file)? };
for result in serde_json::Deserializer::from_slice(&mmap).into_iter() {

View File

@ -425,11 +425,9 @@ async fn document_addition(
let read_file = buffer.into_inner().into_std().await;
let documents_count = tokio::task::spawn_blocking(move || {
let documents_count = match format {
PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?,
PayloadType::Csv { delimiter } => {
read_csv(&read_file, update_file.as_file_mut(), delimiter)?
}
PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?,
PayloadType::Json => read_json(&read_file, &mut update_file)?,
PayloadType::Csv { delimiter } => read_csv(&read_file, &mut update_file, delimiter)?,
PayloadType::Ndjson => read_ndjson(&read_file, &mut update_file)?,
};
// we NEED to persist the file here because we moved the `udpate_file` in another task.
update_file.persist()?;