diff --git a/file-store/src/lib.rs b/file-store/src/lib.rs index e3851a2df..15c4168bc 100644 --- a/file-store/src/lib.rs +++ b/file-store/src/lib.rs @@ -1,5 +1,5 @@ use std::fs::File as StdFile; -use std::ops::{Deref, DerefMut}; +use std::io::Write; use std::path::{Path, PathBuf}; use std::str::FromStr; @@ -22,20 +22,6 @@ pub enum Error { pub type Result = std::result::Result; -impl Deref for File { - type Target = NamedTempFile; - - fn deref(&self) -> &Self::Target { - &self.file - } -} - -impl DerefMut for File { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.file - } -} - #[derive(Clone, Debug)] pub struct FileStore { path: PathBuf, @@ -56,7 +42,7 @@ impl FileStore { let file = NamedTempFile::new_in(&self.path)?; let uuid = Uuid::new_v4(); let path = self.path.join(uuid.to_string()); - let update_file = File { dry: false, file, path }; + let update_file = File { file: Some(file), path }; Ok((uuid, update_file)) } @@ -67,7 +53,7 @@ impl FileStore { let file = NamedTempFile::new_in(&self.path)?; let uuid = Uuid::from_u128(uuid); let path = self.path.join(uuid.to_string()); - let update_file = File { dry: false, file, path }; + let update_file = File { file: Some(file), path }; Ok((uuid, update_file)) } @@ -135,33 +121,41 @@ impl FileStore { } pub struct File { - dry: bool, path: PathBuf, - file: NamedTempFile, + file: Option, } impl File { pub fn dry_file() -> Result { - #[cfg(target_family = "unix")] - let path = PathBuf::from_str("/dev/null").unwrap(); - #[cfg(target_family = "windows")] - let path = PathBuf::from_str("\\Device\\Null").unwrap(); - - Ok(Self { - dry: true, - path: path.clone(), - file: tempfile::Builder::new().make(|_| std::fs::File::create(path.clone()))?, - }) + Ok(Self { path: PathBuf::new(), file: None }) } pub fn persist(self) -> Result<()> { - if !self.dry { - self.file.persist(&self.path)?; + if let Some(file) = self.file { + file.persist(&self.path)?; } Ok(()) } } +impl Write for File { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + if let Some(file) = self.file.as_mut() { + file.write(buf) + } else { + Ok(buf.len()) + } + } + + fn flush(&mut self) -> std::io::Result<()> { + if let Some(file) = self.file.as_mut() { + file.flush() + } else { + Ok(()) + } + } +} + #[cfg(test)] mod test { use std::io::Write; diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 5d0ce9eb9..1c3b93bce 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -1550,7 +1550,7 @@ impl<'a> Dump<'a> { let content_uuid = match content_file { Some(content_file) if task.status == Status::Enqueued => { let (uuid, mut file) = self.index_scheduler.create_update_file(false)?; - let mut builder = DocumentsBatchBuilder::new(file.as_file_mut()); + let mut builder = DocumentsBatchBuilder::new(&mut file); for doc in content_file { builder.append_json_object(&doc?)?; } @@ -1734,7 +1734,7 @@ pub struct IndexStats { #[cfg(test)] mod tests { - use std::io::{BufWriter, Seek, Write}; + use std::io::{BufWriter, Write}; use std::time::Instant; use big_s::S; @@ -1882,7 +1882,7 @@ mod tests { /// Adapting to the new json reading interface pub fn read_json( bytes: &[u8], - write: impl Write + Seek, + write: impl Write, ) -> std::result::Result { let temp_file = NamedTempFile::new().unwrap(); let mut buffer = BufWriter::new(temp_file.reopen().unwrap()); @@ -1909,7 +1909,7 @@ mod tests { ); let (_uuid, mut file) = index_scheduler.create_update_file_with_uuid(file_uuid).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); (file, documents_count) } @@ -2321,7 +2321,7 @@ mod tests { }"#; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); file.persist().unwrap(); index_scheduler .register( @@ -2366,7 +2366,7 @@ mod tests { snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task"); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); file.persist().unwrap(); index_scheduler .register( @@ -2406,7 +2406,7 @@ mod tests { ]"#; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); file.persist().unwrap(); index_scheduler .register( @@ -2472,7 +2472,7 @@ mod tests { ]"#; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); file.persist().unwrap(); index_scheduler .register( @@ -2678,7 +2678,7 @@ mod tests { }"#; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); file.persist().unwrap(); index_scheduler .register( @@ -2852,7 +2852,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); file.persist().unwrap(); index_scheduler .register( @@ -2903,7 +2903,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); file.persist().unwrap(); index_scheduler .register( @@ -2956,7 +2956,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); file.persist().unwrap(); index_scheduler .register( @@ -3010,7 +3010,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); file.persist().unwrap(); index_scheduler .register( @@ -3065,7 +3065,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); file.persist().unwrap(); index_scheduler .register( @@ -3567,7 +3567,7 @@ mod tests { }"#; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); file.persist().unwrap(); index_scheduler .register( @@ -3609,7 +3609,7 @@ mod tests { }"#; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); file.persist().unwrap(); index_scheduler .register( @@ -3669,7 +3669,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); file.persist().unwrap(); index_scheduler .register( @@ -3721,7 +3721,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); file.persist().unwrap(); index_scheduler .register( @@ -3783,7 +3783,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); file.persist().unwrap(); index_scheduler .register( @@ -3850,7 +3850,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); file.persist().unwrap(); index_scheduler .register( @@ -3922,7 +3922,7 @@ mod tests { let allow_index_creation = i % 2 != 0; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); file.persist().unwrap(); index_scheduler .register( @@ -3979,7 +3979,7 @@ mod tests { let allow_index_creation = i % 2 != 0; let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(i).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); file.persist().unwrap(); index_scheduler .register( @@ -4033,7 +4033,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(id as u128).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); assert_eq!(documents_count, 1); file.persist().unwrap(); @@ -4098,7 +4098,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(id as u128).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); assert_eq!(documents_count, 1); file.persist().unwrap(); @@ -4159,7 +4159,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(id as u128).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); assert_eq!(documents_count, 1); file.persist().unwrap(); @@ -4244,7 +4244,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(id as u128).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); assert_eq!(documents_count, 1); file.persist().unwrap(); @@ -4331,7 +4331,7 @@ mod tests { ); let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(id as u128).unwrap(); - let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap(); + let documents_count = read_json(content.as_bytes(), &mut file).unwrap(); assert_eq!(documents_count, 1); file.persist().unwrap(); diff --git a/meilisearch-types/src/document_formats.rs b/meilisearch-types/src/document_formats.rs index 0f1d995f9..50dc5bad4 100644 --- a/meilisearch-types/src/document_formats.rs +++ b/meilisearch-types/src/document_formats.rs @@ -1,6 +1,6 @@ use std::fmt::{self, Debug, Display}; use std::fs::File; -use std::io::{self, Seek, Write}; +use std::io::{self, BufWriter, Write}; use std::marker::PhantomData; use memmap2::MmapOptions; @@ -104,8 +104,8 @@ impl ErrorCode for DocumentFormatError { } /// Reads CSV from input and write an obkv batch to writer. -pub fn read_csv(file: &File, writer: impl Write + Seek, delimiter: u8) -> Result { - let mut builder = DocumentsBatchBuilder::new(writer); +pub fn read_csv(file: &File, writer: impl Write, delimiter: u8) -> Result { + let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer)); let mmap = unsafe { MmapOptions::new().map(file)? }; let csv = csv::ReaderBuilder::new().delimiter(delimiter).from_reader(mmap.as_ref()); builder.append_csv(csv).map_err(|e| (PayloadType::Csv { delimiter }, e))?; @@ -116,9 +116,9 @@ pub fn read_csv(file: &File, writer: impl Write + Seek, delimiter: u8) -> Result Ok(count as u64) } -/// Reads JSON from temporary file and write an obkv batch to writer. -pub fn read_json(file: &File, writer: impl Write + Seek) -> Result { - let mut builder = DocumentsBatchBuilder::new(writer); +/// Reads JSON from temporary file and write an obkv batch to writer. +pub fn read_json(file: &File, writer: impl Write) -> Result { + let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer)); let mmap = unsafe { MmapOptions::new().map(file)? }; let mut deserializer = serde_json::Deserializer::from_slice(&mmap); @@ -151,8 +151,8 @@ pub fn read_json(file: &File, writer: impl Write + Seek) -> Result { } /// Reads JSON from temporary file and write an obkv batch to writer. -pub fn read_ndjson(file: &File, writer: impl Write + Seek) -> Result { - let mut builder = DocumentsBatchBuilder::new(writer); +pub fn read_ndjson(file: &File, writer: impl Write) -> Result { + let mut builder = DocumentsBatchBuilder::new(BufWriter::new(writer)); let mmap = unsafe { MmapOptions::new().map(file)? }; for result in serde_json::Deserializer::from_slice(&mmap).into_iter() { diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index a74bbff49..43fab1dae 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -425,11 +425,9 @@ async fn document_addition( let read_file = buffer.into_inner().into_std().await; let documents_count = tokio::task::spawn_blocking(move || { let documents_count = match format { - PayloadType::Json => read_json(&read_file, update_file.as_file_mut())?, - PayloadType::Csv { delimiter } => { - read_csv(&read_file, update_file.as_file_mut(), delimiter)? - } - PayloadType::Ndjson => read_ndjson(&read_file, update_file.as_file_mut())?, + PayloadType::Json => read_json(&read_file, &mut update_file)?, + PayloadType::Csv { delimiter } => read_csv(&read_file, &mut update_file, delimiter)?, + PayloadType::Ndjson => read_ndjson(&read_file, &mut update_file)?, }; // we NEED to persist the file here because we moved the `udpate_file` in another task. update_file.persist()?;