MeiliSearch/meilisearch-lib/src/index_controller/update_file_store.rs

167 lines
5.1 KiB
Rust
Raw Normal View History

2021-09-28 19:29:14 +02:00
use std::fs::{create_dir_all, File};
use std::io::{self, BufReader, BufWriter, Write};
2021-09-14 18:39:02 +02:00
use std::ops::{Deref, DerefMut};
2021-09-28 19:29:14 +02:00
use std::path::{Path, PathBuf};
2021-09-14 18:39:02 +02:00
2021-09-28 11:59:55 +02:00
use milli::documents::DocumentBatchReader;
use serde_json::Map;
2021-09-28 19:29:14 +02:00
use tempfile::{NamedTempFile, PersistError};
2021-09-14 18:39:02 +02:00
use uuid::Uuid;
2021-09-27 16:48:03 +02:00
const UPDATE_FILES_PATH: &str = "updates/updates_files";
2021-09-28 11:59:55 +02:00
use crate::document_formats::read_jsonl;
2021-09-14 18:39:02 +02:00
pub struct UpdateFile {
path: PathBuf,
file: NamedTempFile,
}
2021-09-28 19:29:14 +02:00
#[derive(Debug, thiserror::Error)]
#[error("Error while persisting update to disk: {0}")]
pub struct UpdateFileStoreError(Box<dyn std::error::Error + Sync + Send + 'static>);
type Result<T> = std::result::Result<T, UpdateFileStoreError>;
macro_rules! into_update_store_error {
($($other:path),*) => {
$(
impl From<$other> for UpdateFileStoreError {
fn from(other: $other) -> Self {
Self(Box::new(other))
}
}
)*
};
}
into_update_store_error!(
PersistError,
io::Error,
serde_json::Error,
milli::documents::Error
);
2021-09-14 18:39:02 +02:00
impl UpdateFile {
2021-09-28 19:29:14 +02:00
pub fn persist(self) -> Result<()> {
self.file.persist(&self.path)?;
Ok(())
2021-09-14 18:39:02 +02:00
}
}
impl Deref for UpdateFile {
type Target = NamedTempFile;
fn deref(&self) -> &Self::Target {
&self.file
}
}
impl DerefMut for UpdateFile {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.file
}
}
#[derive(Clone, Debug)]
pub struct UpdateFileStore {
path: PathBuf,
}
impl UpdateFileStore {
2021-09-28 11:59:55 +02:00
pub fn load_dump(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
let src_update_files_path = src.as_ref().join(UPDATE_FILES_PATH);
let dst_update_files_path = dst.as_ref().join(UPDATE_FILES_PATH);
2021-09-28 19:29:14 +02:00
create_dir_all(&dst_update_files_path)?;
2021-09-28 11:59:55 +02:00
2021-09-28 19:29:14 +02:00
let entries = std::fs::read_dir(src_update_files_path)?;
2021-09-28 11:59:55 +02:00
for entry in entries {
2021-09-28 19:29:14 +02:00
let entry = entry?;
let update_file = BufReader::new(File::open(entry.path())?);
2021-09-28 11:59:55 +02:00
let file_uuid = entry.file_name();
2021-09-28 19:29:14 +02:00
let file_uuid = file_uuid
.to_str()
.ok_or_else(|| anyhow::anyhow!("invalid update file name"))?;
2021-09-28 11:59:55 +02:00
let dst_path = dst_update_files_path.join(file_uuid);
let dst_file = BufWriter::new(File::create(dst_path)?);
read_jsonl(update_file, dst_file)?;
}
Ok(())
}
2021-09-14 18:39:02 +02:00
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
2021-09-27 16:48:03 +02:00
let path = path.as_ref().join(UPDATE_FILES_PATH);
2021-09-28 19:29:14 +02:00
std::fs::create_dir_all(&path)?;
2021-09-14 18:39:02 +02:00
Ok(Self { path })
}
2021-09-27 16:48:03 +02:00
/// Created a new temporary update file.
///
/// A call to persist is needed to persist in the database.
2021-09-14 18:39:02 +02:00
pub fn new_update(&self) -> Result<(Uuid, UpdateFile)> {
2021-09-28 19:29:14 +02:00
let file = NamedTempFile::new()?;
2021-09-14 18:39:02 +02:00
let uuid = Uuid::new_v4();
let path = self.path.join(uuid.to_string());
let update_file = UpdateFile { file, path };
Ok((uuid, update_file))
}
2021-09-27 16:48:03 +02:00
/// Returns a the file corresponding to the requested uuid.
2021-09-14 18:39:02 +02:00
pub fn get_update(&self, uuid: Uuid) -> Result<File> {
let path = self.path.join(uuid.to_string());
2021-09-28 19:29:14 +02:00
let file = File::open(path)?;
2021-09-14 18:39:02 +02:00
Ok(file)
}
2021-09-27 16:48:03 +02:00
/// Copies the content of the update file poited to by uuid to dst directory.
pub fn snapshot(&self, uuid: Uuid, dst: impl AsRef<Path>) -> Result<()> {
let src = self.path.join(uuid.to_string());
let mut dst = dst.as_ref().join(UPDATE_FILES_PATH);
2021-09-28 19:29:14 +02:00
std::fs::create_dir_all(&dst)?;
2021-09-27 16:48:03 +02:00
dst.push(uuid.to_string());
2021-09-28 19:29:14 +02:00
std::fs::copy(src, dst)?;
2021-09-27 16:48:03 +02:00
Ok(())
}
/// Peform a dump of the given update file uuid into the provided snapshot path.
2021-09-28 11:59:55 +02:00
pub fn dump(&self, uuid: Uuid, dump_path: impl AsRef<Path>) -> Result<()> {
let uuid_string = uuid.to_string();
let update_file_path = self.path.join(&uuid_string);
let mut dst = dump_path.as_ref().join(UPDATE_FILES_PATH);
2021-09-28 19:29:14 +02:00
std::fs::create_dir_all(&dst)?;
2021-09-28 11:59:55 +02:00
dst.push(&uuid_string);
2021-09-27 16:48:03 +02:00
2021-09-28 19:29:14 +02:00
let update_file = File::open(update_file_path)?;
let mut dst_file = NamedTempFile::new()?;
let mut document_reader = DocumentBatchReader::from_reader(update_file)?;
2021-09-27 16:48:03 +02:00
2021-09-28 11:59:55 +02:00
let mut document_buffer = Map::new();
// TODO: we need to find a way to do this more efficiently. (create a custom serializer to
// jsonl for example...)
2021-09-28 19:29:14 +02:00
while let Some((index, document)) = document_reader.next_document_with_index()? {
2021-09-28 11:59:55 +02:00
for (field_id, content) in document.iter() {
2021-09-28 19:29:14 +02:00
if let Some(field_name) = index.get_by_left(&field_id) {
let content = serde_json::from_slice(content)?;
document_buffer.insert(field_name.to_string(), content);
}
2021-09-28 11:59:55 +02:00
}
2021-09-27 16:48:03 +02:00
2021-09-28 19:29:14 +02:00
serde_json::to_writer(&mut dst_file, &document_buffer)?;
2021-09-28 22:22:59 +02:00
dst_file.write_all(b"\n")?;
2021-09-28 11:59:55 +02:00
document_buffer.clear();
}
2021-09-27 16:48:03 +02:00
2021-09-28 19:29:14 +02:00
dst_file.persist(dst)?;
2021-09-27 16:48:03 +02:00
2021-09-28 11:59:55 +02:00
Ok(())
2021-09-27 16:48:03 +02:00
}
2021-09-28 22:02:04 +02:00
pub fn get_size(&self, uuid: Uuid) -> Result<u64> {
Ok(self.get_update(uuid)?.metadata()?.len())
}
2021-09-14 18:39:02 +02:00
}