From 76597fc3823005dec1abc60e286a9b18fa42ea1d Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 13 Sep 2022 19:28:39 +0200 Subject: [PATCH] import the update_file_store in the index-scheduler --- Cargo.lock | 1 + index-scheduler/Cargo.toml | 1 + index-scheduler/src/index/error.rs | 17 +- index-scheduler/src/index/updates.rs | 2 +- index-scheduler/src/lib.rs | 3 +- index-scheduler/src/update_file_store.rs | 258 ----------------------- 6 files changed, 5 insertions(+), 277 deletions(-) delete mode 100644 index-scheduler/src/update_file_store.rs diff --git a/Cargo.lock b/Cargo.lock index 1444d2a5b..9923865d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1738,6 +1738,7 @@ dependencies = [ "csv", "derivative", "either", + "file-store", "fst", "indexmap", "lazy_static", diff --git a/index-scheduler/Cargo.toml b/index-scheduler/Cargo.toml index d5ce1b19a..6a512a164 100644 --- a/index-scheduler/Cargo.toml +++ b/index-scheduler/Cargo.toml @@ -11,6 +11,7 @@ bincode = "1.3.3" csv = "1.1.6" derivative = "2.2.0" either = { version = "1.6.1", features = ["serde"] } +file-store = { path = "../file-store" } fst = "0.4.7" indexmap = { version = "1.8.0", features = ["serde-1"] } lazy_static = "1.4.0" diff --git a/index-scheduler/src/index/error.rs b/index-scheduler/src/index/error.rs index ee0ff8abf..667dfcde3 100644 --- a/index-scheduler/src/index/error.rs +++ b/index-scheduler/src/index/error.rs @@ -4,8 +4,6 @@ use meilisearch_types::error::{Code, ErrorCode}; use meilisearch_types::internal_error; use serde_json::Value; -use crate::update_file_store; - pub type Result = std::result::Result; #[derive(Debug, thiserror::Error)] @@ -25,23 +23,10 @@ internal_error!( milli::heed::Error, fst::Error, serde_json::Error, - update_file_store::UpdateFileStoreError, + file_store::Error, milli::documents::Error ); -/* -impl ErrorCode for IndexError { - fn error_code(&self) -> Code { - match self { - IndexError::Internal(_) => Code::Internal, - IndexError::DocumentNotFound(_) => Code::DocumentNotFound, - IndexError::Facet(e) => e.error_code(), - IndexError::Milli(e) => MilliError(e).error_code(), - } - } -} -*/ - impl From for IndexError { fn from(error: milli::UserError) -> IndexError { IndexError::Milli(error.into()) diff --git a/index-scheduler/src/index/updates.rs b/index-scheduler/src/index/updates.rs index 1361cb919..4683b570a 100644 --- a/index-scheduler/src/index/updates.rs +++ b/index-scheduler/src/index/updates.rs @@ -13,7 +13,7 @@ use uuid::Uuid; use super::error::{IndexError, Result}; use super::index::{Index, IndexMeta}; -use crate::update_file_store::UpdateFileStore; +use file_store::UpdateFileStore; fn serialize_with_wildcard( field: &Setting>, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 02bf085dd..f76655b0f 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -4,16 +4,15 @@ mod document_formats; pub mod error; pub mod index; pub mod task; -mod update_file_store; mod utils; use batch::Batch; pub use error::Error; +use file_store::UpdateFileStore; use index::Index; pub use task::Task; use task::{Kind, KindWithContent, Status}; use time::OffsetDateTime; -use update_file_store::UpdateFileStore; use std::collections::hash_map::Entry; use std::sync::atomic::{AtomicBool, Ordering}; diff --git a/index-scheduler/src/update_file_store.rs b/index-scheduler/src/update_file_store.rs deleted file mode 100644 index 143279c37..000000000 --- a/index-scheduler/src/update_file_store.rs +++ /dev/null @@ -1,258 +0,0 @@ -use std::fs::{create_dir_all, File}; -use std::io::{self, BufReader, BufWriter, Write}; -use std::ops::{Deref, DerefMut}; -use std::path::{Path, PathBuf}; - -use milli::documents::DocumentsBatchReader; -use serde_json::Map; -use tempfile::{NamedTempFile, PersistError}; -use uuid::Uuid; - -#[cfg(not(test))] -pub use store::UpdateFileStore; -#[cfg(test)] -pub use test::MockUpdateFileStore as UpdateFileStore; - -const UPDATE_FILES_PATH: &str = "updates/updates_files"; - -use crate::document_formats::read_ndjson; - -pub struct UpdateFile { - path: PathBuf, - file: NamedTempFile, -} - -#[derive(Debug, thiserror::Error)] -#[error("Error while persisting update to disk: {0}")] -pub struct UpdateFileStoreError(Box); - -pub type Result = std::result::Result; - -macro_rules! into_update_store_error { - ($($other:path),*) => { - $( - impl From<$other> for UpdateFileStoreError { - fn from(other: $other) -> Self { - Self(Box::new(other)) - } - } - )* - }; -} - -into_update_store_error!( - PersistError, - io::Error, - serde_json::Error, - milli::documents::Error, - milli::documents::DocumentsBatchCursorError -); - -impl UpdateFile { - pub fn persist(self) -> Result<()> { - self.file.persist(&self.path)?; - Ok(()) - } -} - -impl Deref for UpdateFile { - type Target = NamedTempFile; - - fn deref(&self) -> &Self::Target { - &self.file - } -} - -impl DerefMut for UpdateFile { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.file - } -} - -mod store { - use super::*; - - #[derive(Clone, Debug)] - pub struct UpdateFileStore { - path: PathBuf, - } - - impl UpdateFileStore { - pub fn load_dump(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { - let src_update_files_path = src.as_ref().join(UPDATE_FILES_PATH); - let dst_update_files_path = dst.as_ref().join(UPDATE_FILES_PATH); - - // No update files to load - if !src_update_files_path.exists() { - return Ok(()); - } - - create_dir_all(&dst_update_files_path)?; - - let entries = std::fs::read_dir(src_update_files_path)?; - - for entry in entries { - let entry = entry?; - let update_file = BufReader::new(File::open(entry.path())?); - let file_uuid = entry.file_name(); - let file_uuid = file_uuid - .to_str() - .ok_or_else(|| anyhow::anyhow!("invalid update file name"))?; - let dst_path = dst_update_files_path.join(file_uuid); - let dst_file = BufWriter::new(File::create(dst_path)?); - read_ndjson(update_file, dst_file)?; - } - - Ok(()) - } - - pub fn new(path: impl AsRef) -> Result { - let path = path.as_ref().join(UPDATE_FILES_PATH); - std::fs::create_dir_all(&path)?; - Ok(Self { path }) - } - - /// Creates a new temporary update file. - /// A call to `persist` is needed to persist the file in the database. - pub fn new_update(&self) -> Result<(Uuid, UpdateFile)> { - let file = NamedTempFile::new_in(&self.path)?; - let uuid = Uuid::new_v4(); - let path = self.path.join(uuid.to_string()); - let update_file = UpdateFile { file, path }; - - Ok((uuid, update_file)) - } - - /// Returns the file corresponding to the requested uuid. - pub fn get_update(&self, uuid: Uuid) -> Result { - let path = self.path.join(uuid.to_string()); - let file = File::open(path)?; - Ok(file) - } - - /// Copies the content of the update file pointed to by `uuid` to the `dst` directory. - pub fn snapshot(&self, uuid: Uuid, dst: impl AsRef) -> Result<()> { - let src = self.path.join(uuid.to_string()); - let mut dst = dst.as_ref().join(UPDATE_FILES_PATH); - std::fs::create_dir_all(&dst)?; - dst.push(uuid.to_string()); - std::fs::copy(src, dst)?; - Ok(()) - } - - /// Peforms a dump of the given update file uuid into the provided dump path. - pub fn dump(&self, uuid: Uuid, dump_path: impl AsRef) -> Result<()> { - let uuid_string = uuid.to_string(); - let update_file_path = self.path.join(&uuid_string); - let mut dst = dump_path.as_ref().join(UPDATE_FILES_PATH); - std::fs::create_dir_all(&dst)?; - dst.push(&uuid_string); - - let update_file = File::open(update_file_path)?; - let mut dst_file = NamedTempFile::new_in(&dump_path)?; - let (mut document_cursor, index) = - DocumentsBatchReader::from_reader(update_file)?.into_cursor_and_fields_index(); - - let mut document_buffer = Map::new(); - // TODO: we need to find a way to do this more efficiently. (create a custom serializer - // for jsonl for example...) - while let Some(document) = document_cursor.next_document()? { - for (field_id, content) in document.iter() { - if let Some(field_name) = index.name(field_id) { - let content = serde_json::from_slice(content)?; - document_buffer.insert(field_name.to_string(), content); - } - } - - serde_json::to_writer(&mut dst_file, &document_buffer)?; - dst_file.write_all(b"\n")?; - document_buffer.clear(); - } - - dst_file.persist(dst)?; - - Ok(()) - } - - pub fn get_size(&self, uuid: Uuid) -> Result { - Ok(self.get_update(uuid)?.metadata()?.len()) - } - - pub fn delete(&self, uuid: Uuid) -> Result<()> { - let path = self.path.join(uuid.to_string()); - std::fs::remove_file(path)?; - Ok(()) - } - } -} - -#[cfg(test)] -mod test { - use std::sync::Arc; - - use nelson::Mocker; - - use super::*; - - #[derive(Clone)] - pub enum MockUpdateFileStore { - Real(store::UpdateFileStore), - Mock(Arc), - } - - impl MockUpdateFileStore { - pub fn mock(mocker: Mocker) -> Self { - Self::Mock(Arc::new(mocker)) - } - - pub fn load_dump(src: impl AsRef, dst: impl AsRef) -> anyhow::Result<()> { - store::UpdateFileStore::load_dump(src, dst) - } - - pub fn new(path: impl AsRef) -> Result { - store::UpdateFileStore::new(path).map(Self::Real) - } - - pub fn new_update(&self) -> Result<(Uuid, UpdateFile)> { - match self { - MockUpdateFileStore::Real(s) => s.new_update(), - MockUpdateFileStore::Mock(_) => todo!(), - } - } - - pub fn get_update(&self, uuid: Uuid) -> Result { - match self { - MockUpdateFileStore::Real(s) => s.get_update(uuid), - MockUpdateFileStore::Mock(_) => todo!(), - } - } - - pub fn snapshot(&self, uuid: Uuid, dst: impl AsRef) -> Result<()> { - match self { - MockUpdateFileStore::Real(s) => s.snapshot(uuid, dst), - MockUpdateFileStore::Mock(_) => todo!(), - } - } - - pub fn dump(&self, uuid: Uuid, dump_path: impl AsRef) -> Result<()> { - match self { - MockUpdateFileStore::Real(s) => s.dump(uuid, dump_path), - MockUpdateFileStore::Mock(_) => todo!(), - } - } - - pub fn get_size(&self, uuid: Uuid) -> Result { - match self { - MockUpdateFileStore::Real(s) => s.get_size(uuid), - MockUpdateFileStore::Mock(_) => todo!(), - } - } - - pub fn delete(&self, uuid: Uuid) -> Result<()> { - match self { - MockUpdateFileStore::Real(s) => s.delete(uuid), - MockUpdateFileStore::Mock(mocker) => unsafe { mocker.get("delete").call(uuid) }, - } - } - } -}