dump content is now only uuid

This commit is contained in:
Marin Postma 2021-05-29 00:08:17 +02:00
parent b258f4f394
commit 1cb64caae4
No known key found for this signature in database
GPG Key ID: D5241F0C0C865F30
4 changed files with 75 additions and 127 deletions

View File

@ -117,7 +117,7 @@ where
if file_len != 0 {
file.flush().await?;
let file = file.into_std().await;
Some((file, path))
Some((file, update_file_id))
} else {
// empty update, delete the empty file.
fs::remove_file(&path).await?;
@ -133,7 +133,7 @@ where
use std::io::{copy, sink, BufReader, Seek};
// If the payload is empty, ignore the check.
let path = if let Some((mut file, path)) = file_path {
let update_uuid = if let Some((mut file, uuid)) = file_path {
// set the file back to the beginning
file.seek(SeekFrom::Start(0))?;
// Check that the json payload is valid:
@ -145,14 +145,14 @@ where
file.seek(SeekFrom::Start(0))?;
let _: serde_json::Value = serde_json::from_reader(file)?;
}
Some(path)
Some(uuid)
} else {
None
};
// The payload is valid, we can register it to the update store.
let status = update_store
.register_update(meta, path, uuid)
.register_update(meta, update_uuid, uuid)
.map(UpdateStatus::Enqueued)?;
Ok(status)
})

View File

@ -1,12 +1,17 @@
use std::{collections::HashSet, fs::{copy, create_dir_all, File}, io::{BufRead, BufReader, Write}, path::{Path, PathBuf}};
use std::{
collections::HashSet,
fs::{create_dir_all, File},
io::{BufRead, BufReader, Write},
path::{Path, PathBuf},
};
use anyhow::Context;
use heed::{EnvOpenOptions, RoTxn};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::{State, codec::UpdateKeyCodec};
use super::UpdateStore;
use super::{codec::UpdateKeyCodec, State};
use crate::index_controller::{index_actor::IndexActorHandle, UpdateStatus};
#[derive(Serialize, Deserialize)]
@ -50,10 +55,10 @@ impl UpdateStore {
let dump_data_path = path.as_ref().join("data.jsonl");
let mut dump_data_file = File::create(dump_data_path)?;
let update_files_path = path.as_ref().join("update_files");
let update_files_path = path.as_ref().join(super::UPDATE_DIR);
create_dir_all(&update_files_path)?;
self.dump_pending(&txn, uuids, &mut dump_data_file, &update_files_path)?;
self.dump_pending(&txn, uuids, &mut dump_data_file, &path)?;
self.dump_completed(&txn, uuids, &mut dump_data_file)?;
Ok(())
@ -64,19 +69,24 @@ impl UpdateStore {
txn: &RoTxn,
uuids: &HashSet<Uuid>,
mut file: &mut File,
update_files_path: impl AsRef<Path>,
dst_update_files: impl AsRef<Path>,
) -> anyhow::Result<()> {
let pendings = self.pending_queue.iter(txn)?.lazily_decode_data();
for pending in pendings {
let ((_, uuid, _), data) = pending?;
if uuids.contains(&uuid) {
let mut update = data.decode()?;
let update = data.decode()?;
if let Some(content) = update.content.take() {
update.content = Some(dump_update_file(content, &update_files_path)?);
if let Some(ref update_uuid) = update.content {
let src = dbg!(super::update_uuid_to_file_path(&self.path, *update_uuid));
let dst = dbg!(super::update_uuid_to_file_path(&dst_update_files, *update_uuid));
assert!(src.exists());
dbg!(std::fs::copy(src, dst))?;
}
println!("copied files");
let update_json = UpdateEntry {
uuid,
update: update.into(),
@ -117,18 +127,20 @@ impl UpdateStore {
Ok(())
}
pub fn load_dump(src: impl AsRef<Path>, dst: impl AsRef<Path>, db_size: u64) -> anyhow::Result<()> {
let dst_updates_path = dst.as_ref().join("updates/");
create_dir_all(&dst_updates_path)?;
let dst_update_files_path = dst_updates_path.join("update_files/");
create_dir_all(&dst_update_files_path)?;
pub fn load_dump(
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
db_size: u64,
) -> anyhow::Result<()> {
let dst_update_path = dst.as_ref().join("updates/");
create_dir_all(&dst_update_path)?;
let mut options = EnvOpenOptions::new();
options.map_size(db_size as usize);
let (store, _) = UpdateStore::new(options, &dst_updates_path)?;
let (store, _) = UpdateStore::new(options, &dst_update_path)?;
let src_update_path = src.as_ref().join("updates");
let src_update_files_path = src_update_path.join("update_files");
let update_data = File::open(&src_update_path.join("data.jsonl"))?;
let mut update_data = BufReader::new(update_data);
@ -138,15 +150,7 @@ impl UpdateStore {
match update_data.read_line(&mut line) {
Ok(0) => break,
Ok(_) => {
let UpdateEntry { uuid, mut update } = serde_json::from_str(&line)?;
if let Some(path) = update.content_path_mut() {
let dst_file_path = dst_update_files_path.join(&path);
let src_file_path = src_update_files_path.join(&path);
*path = dst_update_files_path.join(&path);
std::fs::copy(src_file_path, dst_file_path)?;
}
let UpdateEntry { uuid, update } = serde_json::from_str(&line)?;
store.register_raw_updates(&mut wtxn, update, uuid)?;
}
_ => break,
@ -154,30 +158,25 @@ impl UpdateStore {
line.clear();
}
let dst_update_files_path = dst_update_path.join("update_files/");
let src_update_files_path = src_update_path.join("update_files/");
std::fs::copy(src_update_files_path, dst_update_files_path)?;
wtxn.commit()?;
Ok(())
}
}
async fn dump_indexes(uuids: &HashSet<Uuid>, handle: impl IndexActorHandle, path: impl AsRef<Path>)-> anyhow::Result<()> {
async fn dump_indexes(
uuids: &HashSet<Uuid>,
handle: impl IndexActorHandle,
path: impl AsRef<Path>,
) -> anyhow::Result<()> {
for uuid in uuids {
handle.dump(*uuid, path.as_ref().to_owned()).await?;
}
Ok(())
}
fn dump_update_file(
file_path: impl AsRef<Path>,
dump_path: impl AsRef<Path>,
) -> anyhow::Result<PathBuf> {
let filename: PathBuf = file_path
.as_ref()
.file_name()
.context("invalid update file name")?
.into();
let dump_file_path = dump_path.as_ref().join(&filename);
copy(file_path, dump_file_path)?;
Ok(filename)
}

View File

@ -1,12 +1,11 @@
pub mod dump;
mod codec;
use std::collections::{BTreeMap, HashSet};
use std::{collections::{BTreeMap, HashSet}, path::PathBuf};
use std::fs::{copy, create_dir_all, remove_file, File};
use std::path::Path;
use std::sync::Arc;
use anyhow::Context;
use arc_swap::ArcSwap;
use futures::StreamExt;
use heed::types::{ByteSlice, OwnedType, SerdeJson};
@ -27,6 +26,8 @@ use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, Ind
#[allow(clippy::upper_case_acronyms)]
type BEU64 = U64<heed::byteorder::BE>;
const UPDATE_DIR: &'static str = "update_files";
pub struct UpdateStoreInfo {
/// Size of the update store in bytes.
pub size: u64,
@ -97,6 +98,7 @@ pub struct UpdateStore {
pub state: Arc<StateLock>,
/// Wake up the loop when a new event occurs.
notification_sender: mpsc::Sender<()>,
path: PathBuf,
}
impl UpdateStore {
@ -106,7 +108,7 @@ impl UpdateStore {
) -> anyhow::Result<(Self, mpsc::Receiver<()>)> {
options.max_dbs(5);
let env = options.open(path)?;
let env = options.open(&path)?;
let pending_queue = env.create_database(Some("pending-queue"))?;
let next_update_id = env.create_database(Some("next-update-id"))?;
let updates = env.create_database(Some("updates"))?;
@ -123,6 +125,7 @@ impl UpdateStore {
updates,
state,
notification_sender,
path: path.as_ref().to_owned(),
},
notification_receiver,
))
@ -165,7 +168,7 @@ impl UpdateStore {
match res {
Ok(Some(_)) => (),
Ok(None) => break,
Err(e) => error!("error while processing update: {}", e),
Err(e) => panic!("error while processing update: {}", e),
}
}
// the ownership on the arc has been taken, we need to exit.
@ -217,13 +220,13 @@ impl UpdateStore {
pub fn register_update(
&self,
meta: UpdateMeta,
content: Option<impl AsRef<Path>>,
content: Option<Uuid>,
index_uuid: Uuid,
) -> heed::Result<Enqueued> {
let mut txn = self.env.write_txn()?;
let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?;
let meta = Enqueued::new(meta, update_id, content.map(|p| p.as_ref().to_owned()));
let meta = Enqueued::new(meta, update_id, content);
self.pending_queue
.put(&mut txn, &(global_id, index_uuid, update_id), &meta)?;
@ -290,9 +293,9 @@ impl UpdateStore {
state.swap(State::Processing(index_uuid, processing.clone()));
let file = match content_path {
Some(ref path) => {
let file = File::open(path)
.with_context(|| format!("file at path: {:?}", &content_path))?;
Some(uuid) => {
let path = update_uuid_to_file_path(&self.path, uuid);
let file = File::open(path)?;
Some(file)
}
None => None,
@ -308,7 +311,8 @@ impl UpdateStore {
self.pending_queue
.delete(&mut wtxn, &(global_id, index_uuid, update_id))?;
if let Some(path) = content_path {
if let Some(uuid) = content_path {
let path = update_uuid_to_file_path(&self.path, uuid);
remove_file(&path)?;
}
@ -408,7 +412,7 @@ impl UpdateStore {
pub fn delete_all(&self, index_uuid: Uuid) -> anyhow::Result<()> {
let mut txn = self.env.write_txn()?;
// Contains all the content file paths that we need to be removed if the deletion was successful.
let mut paths_to_remove = Vec::new();
let mut uuids_to_remove = Vec::new();
let mut pendings = self.pending_queue.iter_mut(&mut txn)?.lazily_decode_data();
@ -416,8 +420,8 @@ impl UpdateStore {
if uuid == index_uuid {
pendings.del_current()?;
let mut pending = pending.decode()?;
if let Some(path) = pending.content.take() {
paths_to_remove.push(path);
if let Some(update_uuid) = pending.content.take() {
uuids_to_remove.push(update_uuid);
}
}
}
@ -437,7 +441,9 @@ impl UpdateStore {
txn.commit()?;
paths_to_remove.iter().for_each(|path| {
uuids_to_remove.iter()
.map(|uuid| update_uuid_to_file_path(&self.path, *uuid))
.for_each(|path| {
let _ = remove_file(path);
});
@ -468,7 +474,7 @@ impl UpdateStore {
// create db snapshot
self.env.copy_to_path(&db_path, CompactionOption::Enabled)?;
let update_files_path = update_path.join("update_files");
let update_files_path = update_path.join(UPDATE_DIR);
create_dir_all(&update_files_path)?;
let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
@ -476,10 +482,9 @@ impl UpdateStore {
for entry in pendings {
let ((_, uuid, _), pending) = entry?;
if uuids.contains(&uuid) {
if let Some(path) = pending.decode()?.content_path() {
let name = path.file_name().unwrap();
let to = update_files_path.join(name);
copy(path, to)?;
if let Enqueued { content: Some(uuid), .. } = pending.decode()? {
let path = update_uuid_to_file_path(&self.path, uuid);
copy(path, &update_files_path)?;
}
}
}
@ -508,7 +513,8 @@ impl UpdateStore {
let txn = self.env.read_txn()?;
for entry in self.pending_queue.iter(&txn)? {
let (_, pending) = entry?;
if let Some(path) = pending.content_path() {
if let Enqueued { content: Some(uuid), .. } = pending {
let path = update_uuid_to_file_path(&self.path, uuid);
size += File::open(path)?.metadata()?.len();
}
}
@ -521,6 +527,10 @@ impl UpdateStore {
}
}
fn update_uuid_to_file_path(root: impl AsRef<Path>, uuid: Uuid) -> PathBuf {
root.as_ref().join(UPDATE_DIR).join(format!("update_{}", uuid))
}
#[cfg(test)]
mod test {
use super::*;

View File

@ -1,8 +1,7 @@
use std::path::{Path, PathBuf};
use chrono::{DateTime, Utc};
use milli::update::{DocumentAdditionResult, IndexDocumentsMethod, UpdateFormat};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::index::{Checked, Settings};
@ -34,11 +33,11 @@ pub struct Enqueued {
pub update_id: u64,
pub meta: UpdateMeta,
pub enqueued_at: DateTime<Utc>,
pub content: Option<PathBuf>,
pub content: Option<Uuid>,
}
impl Enqueued {
pub fn new(meta: UpdateMeta, update_id: u64, content: Option<PathBuf>) -> Self {
pub fn new(meta: UpdateMeta, update_id: u64, content: Option<Uuid>) -> Self {
Self {
enqueued_at: Utc::now(),
meta,
@ -68,14 +67,6 @@ impl Enqueued {
pub fn id(&self) -> u64 {
self.update_id
}
pub fn content_path(&self) -> Option<&Path> {
self.content.as_deref()
}
pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> {
self.content.as_mut()
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -91,14 +82,6 @@ impl Processed {
pub fn id(&self) -> u64 {
self.from.id()
}
pub fn content_path(&self) -> Option<&Path> {
self.from.content_path()
}
pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> {
self.from.content_path_mut()
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -118,14 +101,6 @@ impl Processing {
self.from.meta()
}
pub fn content_path(&self) -> Option<&Path> {
self.from.content_path()
}
pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> {
self.from.content_path_mut()
}
pub fn process(self, success: UpdateResult) -> Processed {
Processed {
success,
@ -155,14 +130,6 @@ impl Aborted {
pub fn id(&self) -> u64 {
self.from.id()
}
pub fn content_path(&self) -> Option<&Path> {
self.from.content_path()
}
pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> {
self.from.content_path_mut()
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -178,14 +145,6 @@ impl Failed {
pub fn id(&self) -> u64 {
self.from.id()
}
pub fn content_path(&self) -> Option<&Path> {
self.from.content_path()
}
pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> {
self.from.content_path_mut()
}
}
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -215,26 +174,6 @@ impl UpdateStatus {
_ => None,
}
}
pub fn content_path(&self) -> Option<&Path> {
match self {
UpdateStatus::Processing(u) => u.content_path(),
UpdateStatus::Processed(u) => u.content_path(),
UpdateStatus::Aborted(u) => u.content_path(),
UpdateStatus::Failed(u) => u.content_path(),
UpdateStatus::Enqueued(u) => u.content_path(),
}
}
pub fn content_path_mut(&mut self) -> Option<&mut PathBuf> {
match self {
UpdateStatus::Processing(u) => u.content_path_mut(),
UpdateStatus::Processed(u) => u.content_path_mut(),
UpdateStatus::Aborted(u) => u.content_path_mut(),
UpdateStatus::Failed(u) => u.content_path_mut(),
UpdateStatus::Enqueued(u) => u.content_path_mut(),
}
}
}
impl From<Enqueued> for UpdateStatus {