restore dumps

This commit is contained in:
mpostma 2021-09-28 11:59:55 +02:00
parent 90018755c5
commit 6a1964f146
13 changed files with 395 additions and 301 deletions

View File

@ -1,9 +1,9 @@
use std::fs::{create_dir_all, File};
use std::fs::File;
use std::io::Write;
use std::path::Path;
use flate2::{read::GzDecoder, write::GzEncoder, Compression};
use tar::{Archive, Builder};
use flate2::{write::GzEncoder, Compression};
use tar::Builder;
pub fn to_tar_gz(src: impl AsRef<Path>, dest: impl AsRef<Path>) -> anyhow::Result<()> {
let mut f = File::create(dest)?;

View File

@ -0,0 +1,52 @@
use std::{fmt, io::{Read, Seek, Write}};
use milli::documents::DocumentBatchBuilder;
use serde_json::{Deserializer, Map, Value};
type Result<T> = std::result::Result<T, DocumentFormatError>;
#[derive(Debug)]
pub enum PayloadType {
Jsonl,
}
impl fmt::Display for PayloadType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PayloadType::Jsonl => write!(f, "ndjson"),
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum DocumentFormatError {
#[error("Internal error: {0}")]
Internal(Box<dyn std::error::Error + Send + Sync + 'static>),
#[error("{0}. The {1} payload provided is malformed.")]
MalformedPayload(Box<dyn std::error::Error + Send + Sync + 'static>, PayloadType),
}
internal_error!(
DocumentFormatError: milli::documents::Error
);
macro_rules! malformed {
($type:path, $e:expr) => {
$e.map_err(|e| DocumentFormatError::MalformedPayload(Box::new(e), $type))
};
}
/// read jsonl from input and write an obkv batch to writer.
pub fn read_jsonl(input: impl Read, writer: impl Write + Seek) -> Result<()> {
let mut builder = DocumentBatchBuilder::new(writer)?;
let stream = Deserializer::from_reader(input).into_iter::<Map<String, Value>>();
for value in stream {
let value = malformed!(PayloadType::Jsonl, value)?;
builder.add_documents(&value)?;
}
builder.finish()?;
Ok(())
}

View File

@ -1,12 +1,18 @@
use std::fs::File;
use std::io::Write;
use std::fs::{create_dir_all, File};
use std::io::{BufReader, Seek, SeekFrom, Write};
use std::path::Path;
use heed::RoTxn;
use anyhow::Context;
use heed::{EnvOpenOptions, RoTxn};
use indexmap::IndexMap;
use milli::documents::DocumentBatchReader;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use crate::options::IndexerOpts;
use crate::document_formats::read_jsonl;
use crate::index::update_handler::UpdateHandler;
use crate::index::updates::apply_settings_to_builder;
use crate::index_controller::{asc_ranking_rule, desc_ranking_rule};
use super::error::Result;
use super::{Index, Settings, Unchecked};
@ -24,6 +30,11 @@ impl Index {
pub fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
// acquire write txn make sure any ongoing write is finished before we start.
let txn = self.env.write_txn()?;
let path = path
.as_ref()
.join(format!("indexes/{}", self.uuid.to_string()));
create_dir_all(&path)?;
self.dump_documents(&txn, &path)?;
self.dump_meta(&txn, &path)?;
@ -75,92 +86,101 @@ impl Index {
}
pub fn load_dump(
_src: impl AsRef<Path>,
_dst: impl AsRef<Path>,
_size: usize,
_indexing_options: &IndexerOpts,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
size: usize,
update_handler: &UpdateHandler,
) -> anyhow::Result<()> {
//let dir_name = src
//.as_ref()
//.file_name()
//.with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?;
let dir_name = src
.as_ref()
.file_name()
.with_context(|| format!("invalid dump index: {}", src.as_ref().display()))?;
//let dst_dir_path = dst.as_ref().join("indexes").join(dir_name);
//create_dir_all(&dst_dir_path)?;
let dst_dir_path = dst.as_ref().join("indexes").join(dir_name);
create_dir_all(&dst_dir_path)?;
//let meta_path = src.as_ref().join(META_FILE_NAME);
//let mut meta_file = File::open(meta_path)?;
let meta_path = src.as_ref().join(META_FILE_NAME);
let mut meta_file = File::open(meta_path)?;
//// We first deserialize the dump meta into a serde_json::Value and change
//// the custom ranking rules settings from the old format to the new format.
//let mut meta: Value = serde_json::from_reader(&mut meta_file)?;
//if let Some(ranking_rules) = meta.pointer_mut("/settings/rankingRules") {
//convert_custom_ranking_rules(ranking_rules);
//}
// We first deserialize the dump meta into a serde_json::Value and change
// the custom ranking rules settings from the old format to the new format.
let mut meta: Value = serde_json::from_reader(&mut meta_file)?;
if let Some(ranking_rules) = meta.pointer_mut("/settings/rankingRules") {
convert_custom_ranking_rules(ranking_rules);
}
//// Then we serialize it back into a vec to deserialize it
//// into a `DumpMeta` struct with the newly patched `rankingRules` format.
//let patched_meta = serde_json::to_vec(&meta)?;
// Then we serialize it back into a vec to deserialize it
// into a `DumpMeta` struct with the newly patched `rankingRules` format.
let patched_meta = serde_json::to_vec(&meta)?;
//let DumpMeta {
//settings,
//primary_key,
//} = serde_json::from_slice(&patched_meta)?;
//let settings = settings.check();
//let index = Self::open(&dst_dir_path, size)?;
//let mut txn = index.write_txn()?;
let DumpMeta {
settings,
primary_key,
} = serde_json::from_slice(&patched_meta)?;
let settings = settings.check();
//let handler = UpdateHandler::new(indexing_options)?;
let mut options = EnvOpenOptions::new();
options.map_size(size);
let index = milli::Index::new(options, &dst_dir_path)?;
//index.update_settings_txn(&mut txn, &settings, handler.update_builder(0))?;
let mut txn = index.write_txn()?;
//let document_file_path = src.as_ref().join(DATA_FILE_NAME);
//let reader = File::open(&document_file_path)?;
//let mut reader = BufReader::new(reader);
//reader.fill_buf()?;
// If the document file is empty, we don't perform the document addition, to prevent
// a primary key error to be thrown.
// Apply settings first
let builder = update_handler.update_builder(0);
let mut builder = builder.settings(&mut txn, &index);
todo!("fix obk document dumps")
//if !reader.buffer().is_empty() {
//index.update_documents_txn(
//&mut txn,
//IndexDocumentsMethod::UpdateDocuments,
//Some(reader),
//handler.update_builder(0),
//primary_key.as_deref(),
//)?;
//}
if let Some(primary_key) = primary_key {
builder.set_primary_key(primary_key);
}
//txn.commit()?;
apply_settings_to_builder(&settings, &mut builder);
//match Arc::try_unwrap(index.0) {
//Ok(inner) => inner.prepare_for_closing().wait(),
//Err(_) => bail!("Could not close index properly."),
//}
builder.execute(|_, _| ())?;
//Ok(())
let document_file_path = src.as_ref().join(DATA_FILE_NAME);
let reader = BufReader::new(File::open(&document_file_path)?);
let mut tmp_doc_file = tempfile::tempfile()?;
read_jsonl(reader, &mut tmp_doc_file)?;
tmp_doc_file.seek(SeekFrom::Start(0))?;
let documents_reader = DocumentBatchReader::from_reader(tmp_doc_file)?;
//If the document file is empty, we don't perform the document addition, to prevent
//a primary key error to be thrown.
if !documents_reader.is_empty() {
let builder = update_handler.update_builder(0).index_documents(&mut txn, &index);
builder.execute(documents_reader, |_, _| ())?;
}
txn.commit()?;
index.prepare_for_closing().wait();
Ok(())
}
}
// /// Converts the ranking rules from the format `asc(_)`, `desc(_)` to the format `_:asc`, `_:desc`.
// ///
// /// This is done for compatibility reasons, and to avoid a new dump version,
// /// since the new syntax was introduced soon after the new dump version.
//fn convert_custom_ranking_rules(ranking_rules: &mut Value) {
//*ranking_rules = match ranking_rules.take() {
//Value::Array(values) => values
//.into_iter()
//.filter_map(|value| match value {
//Value::String(s) if s.starts_with("asc") => asc_ranking_rule(&s)
//.map(|f| format!("{}:asc", f))
//.map(Value::String),
//Value::String(s) if s.starts_with("desc") => desc_ranking_rule(&s)
//.map(|f| format!("{}:desc", f))
//.map(Value::String),
//otherwise => Some(otherwise),
//})
//.collect(),
//otherwise => otherwise,
//}
//}
/// Converts the ranking rules from the format `asc(_)`, `desc(_)` to the format `_:asc`, `_:desc`.
///
/// This is done for compatibility reasons, and to avoid a new dump version,
/// since the new syntax was introduced soon after the new dump version.
fn convert_custom_ranking_rules(ranking_rules: &mut Value) {
*ranking_rules = match ranking_rules.take() {
Value::Array(values) => values
.into_iter()
.filter_map(|value| match value {
Value::String(s) if s.starts_with("asc") => asc_ranking_rule(&s)
.map(|f| format!("{}:asc", f))
.map(Value::String),
Value::String(s) if s.starts_with("desc") => desc_ranking_rule(&s)
.map(|f| format!("{}:desc", f))
.map(Value::String),
otherwise => Some(otherwise),
})
.collect(),
otherwise => otherwise,
}
}

View File

@ -266,59 +266,7 @@ impl Index {
// We must use the write transaction of the update here.
let mut builder = update_builder.settings(txn, self);
match settings.searchable_attributes {
Setting::Set(ref names) => builder.set_searchable_fields(names.clone()),
Setting::Reset => builder.reset_searchable_fields(),
Setting::NotSet => (),
}
match settings.displayed_attributes {
Setting::Set(ref names) => builder.set_displayed_fields(names.clone()),
Setting::Reset => builder.reset_displayed_fields(),
Setting::NotSet => (),
}
match settings.filterable_attributes {
Setting::Set(ref facets) => {
builder.set_filterable_fields(facets.clone().into_iter().collect())
}
Setting::Reset => builder.reset_filterable_fields(),
Setting::NotSet => (),
}
match settings.sortable_attributes {
Setting::Set(ref fields) => {
builder.set_sortable_fields(fields.iter().cloned().collect())
}
Setting::Reset => builder.reset_sortable_fields(),
Setting::NotSet => (),
}
match settings.ranking_rules {
Setting::Set(ref criteria) => builder.set_criteria(criteria.clone()),
Setting::Reset => builder.reset_criteria(),
Setting::NotSet => (),
}
match settings.stop_words {
Setting::Set(ref stop_words) => builder.set_stop_words(stop_words.clone()),
Setting::Reset => builder.reset_stop_words(),
Setting::NotSet => (),
}
match settings.synonyms {
Setting::Set(ref synonyms) => {
builder.set_synonyms(synonyms.clone().into_iter().collect())
}
Setting::Reset => builder.reset_synonyms(),
Setting::NotSet => (),
}
match settings.distinct_attribute {
Setting::Set(ref attr) => builder.set_distinct_field(attr.clone()),
Setting::Reset => builder.reset_distinct_field(),
Setting::NotSet => (),
}
apply_settings_to_builder(settings, &mut builder);
builder.execute(|indexing_step, update_id| {
debug!("update {}: {:?}", update_id, indexing_step)
@ -328,6 +276,62 @@ impl Index {
}
}
pub fn apply_settings_to_builder(settings: &Settings<Checked>, builder: &mut milli::update::Settings) {
match settings.searchable_attributes {
Setting::Set(ref names) => builder.set_searchable_fields(names.clone()),
Setting::Reset => builder.reset_searchable_fields(),
Setting::NotSet => (),
}
match settings.displayed_attributes {
Setting::Set(ref names) => builder.set_displayed_fields(names.clone()),
Setting::Reset => builder.reset_displayed_fields(),
Setting::NotSet => (),
}
match settings.filterable_attributes {
Setting::Set(ref facets) => {
builder.set_filterable_fields(facets.clone().into_iter().collect())
}
Setting::Reset => builder.reset_filterable_fields(),
Setting::NotSet => (),
}
match settings.sortable_attributes {
Setting::Set(ref fields) => {
builder.set_sortable_fields(fields.iter().cloned().collect())
}
Setting::Reset => builder.reset_sortable_fields(),
Setting::NotSet => (),
}
match settings.ranking_rules {
Setting::Set(ref criteria) => builder.set_criteria(criteria.clone()),
Setting::Reset => builder.reset_criteria(),
Setting::NotSet => (),
}
match settings.stop_words {
Setting::Set(ref stop_words) => builder.set_stop_words(stop_words.clone()),
Setting::Reset => builder.reset_stop_words(),
Setting::NotSet => (),
}
match settings.synonyms {
Setting::Set(ref synonyms) => {
builder.set_synonyms(synonyms.clone().into_iter().collect())
}
Setting::Reset => builder.reset_synonyms(),
Setting::NotSet => (),
}
match settings.distinct_attribute {
Setting::Set(ref attr) => builder.set_distinct_field(attr.clone()),
Setting::Reset => builder.reset_distinct_field(),
Setting::NotSet => (),
}
}
#[cfg(test)]
mod test {
use super::*;

View File

@ -4,8 +4,8 @@ use chrono::{DateTime, Utc};
use log::info;
use serde::{Deserialize, Serialize};
use crate::index::Index;
use crate::index_controller::index_resolver::uuid_store::HeedUuidStore;
use crate::index_controller::index_resolver::IndexResolver;
use crate::index_controller::update_file_store::UpdateFileStore;
use crate::index_controller::updates::store::UpdateStore;
use crate::options::IndexerOpts;
@ -41,19 +41,11 @@ impl MetadataV2 {
self.dump_date, self.db_version
);
info!("Loading index database.");
HeedUuidStore::load_dump(src.as_ref(), &dst)?;
info!("Loading updates.");
IndexResolver::load_dump(src.as_ref(), &dst, index_db_size, indexing_options)?;
UpdateFileStore::load_dump(src.as_ref(), &dst)?;
UpdateStore::load_dump(&src, &dst, update_db_size)?;
info!("Loading indexes.");
let indexes_path = src.as_ref().join("indexes");
let indexes = indexes_path.read_dir()?;
for index in indexes {
let index = index?;
Index::load_dump(&index.path(), &dst, index_db_size, indexing_options)?;
}
Ok(())
}

View File

@ -115,6 +115,7 @@ pub fn load_dump(
let tmp_src = tempfile::tempdir_in(".")?;
let tmp_src_path = tmp_src.path();
println!("importing to {}", dst_path.as_ref().display());
crate::from_tar_gz(&src_path, tmp_src_path)?;
let meta_path = tmp_src_path.join(META_FILE_NAME);
@ -179,7 +180,7 @@ impl DumpTask {
let uuids = self.index_resolver.dump(temp_dump_path.clone()).await?;
UpdateMsg::dump(&self.update_handle, uuids.into_iter().collect(), temp_dump_path.clone()).await?;
UpdateMsg::dump(&self.update_handle, uuids, temp_dump_path.clone()).await?;
let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> {
let temp_dump_file = tempfile::NamedTempFile::new_in(&self.path)?;

View File

@ -1,6 +1,5 @@
pub mod uuid_store;
mod index_store;
//mod message;
pub mod error;
use std::path::Path;
@ -10,7 +9,7 @@ use uuid_store::{UuidStore, HeedUuidStore};
use index_store::{IndexStore, MapIndexStore};
use error::{Result, IndexResolverError};
use crate::{index::Index, options::IndexerOpts};
use crate::{index::{Index, update_handler::UpdateHandler}, options::IndexerOpts};
pub type HardStateIndexResolver = IndexResolver<HeedUuidStore, MapIndexStore>;
@ -25,6 +24,28 @@ pub struct IndexResolver<U, I> {
index_store: I,
}
impl IndexResolver<HeedUuidStore, MapIndexStore> {
pub fn load_dump(
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
index_db_size: usize,
indexer_opts: &IndexerOpts,
) -> anyhow::Result<()> {
HeedUuidStore::load_dump(&src, &dst)?;
let indexes_path = src.as_ref().join("indexes");
let indexes = indexes_path.read_dir()?;
let update_handler = UpdateHandler::new(indexer_opts).unwrap();
for index in indexes {
let index = index?;
Index::load_dump(&index.path(), &dst, index_db_size, &update_handler)?;
}
Ok(())
}
}
impl<U, I> IndexResolver<U ,I>
where U: UuidStore,
I: IndexStore,
@ -39,8 +60,14 @@ where U: UuidStore,
}
}
pub async fn dump(&self, _path: impl AsRef<Path>) -> Result<Vec<Uuid>> {
todo!()
pub async fn dump(&self, path: impl AsRef<Path>) -> Result<Vec<Index>> {
let uuids = self.index_uuid_store.dump(path.as_ref().to_owned()).await?;
let mut indexes = Vec::new();
for uuid in uuids {
indexes.push(self.get_index_by_uuid(uuid).await?);
}
Ok(indexes)
}
pub async fn get_size(&self) -> Result<u64> {
@ -51,7 +78,6 @@ where U: UuidStore,
pub async fn snapshot(&self, path: impl AsRef<Path>) -> Result<Vec<Index>> {
let uuids = self.index_uuid_store.snapshot(path.as_ref().to_owned()).await?;
let mut indexes = Vec::new();
for uuid in uuids {
indexes.push(self.get_index_by_uuid(uuid).await?);
}

View File

@ -1,14 +1,17 @@
use std::fs::File;
use std::fs::{File, create_dir_all};
use std::io::{BufReader, BufWriter, Write};
use std::path::{Path, PathBuf};
use std::ops::{Deref, DerefMut};
//use milli::documents::DocumentBatchReader;
//use serde_json::Map;
use milli::documents::DocumentBatchReader;
use serde_json::Map;
use tempfile::NamedTempFile;
use uuid::Uuid;
const UPDATE_FILES_PATH: &str = "updates/updates_files";
use crate::document_formats::read_jsonl;
use super::error::Result;
pub struct UpdateFile {
@ -42,6 +45,27 @@ pub struct UpdateFileStore {
}
impl UpdateFileStore {
pub fn load_dump(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> anyhow::Result<()> {
let src_update_files_path = src.as_ref().join(UPDATE_FILES_PATH);
let dst_update_files_path = dst.as_ref().join(UPDATE_FILES_PATH);
create_dir_all(&dst_update_files_path).unwrap();
let entries = std::fs::read_dir(src_update_files_path).unwrap();
for entry in entries {
let entry = entry.unwrap();
let update_file = BufReader::new(File::open(entry.path()).unwrap());
let file_uuid = entry.file_name();
let file_uuid = file_uuid.to_str().ok_or_else(|| anyhow::anyhow!("invalid update file name"))?;
let dst_path = dst_update_files_path.join(file_uuid);
let dst_file = BufWriter::new(File::create(dst_path)?);
read_jsonl(update_file, dst_file)?;
}
Ok(())
}
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
let path = path.as_ref().join(UPDATE_FILES_PATH);
std::fs::create_dir_all(&path).unwrap();
@ -78,27 +102,34 @@ impl UpdateFileStore {
}
/// Peform a dump of the given update file uuid into the provided snapshot path.
pub fn dump(&self, _uuid: Uuid, _snapshot_path: impl AsRef<Path>) -> Result<()> {
todo!()
//let update_file_path = self.path.join(uuid.to_string());
//let snapshot_file_path: snapshot_path.as_ref().join(format!("update_files/uuid", uuid));
pub fn dump(&self, uuid: Uuid, dump_path: impl AsRef<Path>) -> Result<()> {
let uuid_string = uuid.to_string();
let update_file_path = self.path.join(&uuid_string);
let mut dst = dump_path.as_ref().join(UPDATE_FILES_PATH);
std::fs::create_dir_all(&dst).unwrap();
dst.push(&uuid_string);
//let update_file = File::open(update_file_path).unwrap();
let update_file = File::open(update_file_path).unwrap();
let mut dst_file = NamedTempFile::new().unwrap();
let mut document_reader = DocumentBatchReader::from_reader(update_file).unwrap();
let mut document_buffer = Map::new();
// TODO: we need to find a way to do this more efficiently. (create a custom serializer to
// jsonl for example...)
while let Some((index, document)) = document_reader.next_document_with_index().unwrap() {
for (field_id, content) in document.iter() {
let field_name = index.get_by_left(&field_id).unwrap();
let content = serde_json::from_slice(content).unwrap();
document_buffer.insert(field_name.to_string(), content);
}
//let mut document_reader = DocumentBatchReader::from_reader(update_file).unwrap();
serde_json::to_writer(&mut dst_file, &document_buffer).unwrap();
dst_file.write(b"\n").unwrap();
document_buffer.clear();
}
//let mut document_buffer = Map::new();
//// TODO: we need to find a way to do this more efficiently. (create a custom serializer to
//// jsonl for example...)
//while let Some((index, document)) = document_reader.next_document_with_index().unwrap() {
//for (field_id, content) in document.iter() {
//let field_name = index.get_by_left(&field_id).unwrap();
//let content = serde_json::from_slice(content).unwrap();
//document_buffer.insert(field_name.to_string(), content);
//}
dst_file.persist(dst).unwrap();
//}
//Ok(())
Ok(())
}
}

View File

@ -1,4 +1,3 @@
use std::collections::HashSet;
use std::path::PathBuf;
use tokio::sync::{mpsc, oneshot};
@ -35,7 +34,7 @@ pub enum UpdateMsg {
ret: oneshot::Sender<Result<()>>,
},
Dump {
uuids: HashSet<Uuid>,
indexes: Vec<Index>,
path: PathBuf,
ret: oneshot::Sender<Result<()>>,
},
@ -54,11 +53,11 @@ impl UpdateMsg {
pub async fn dump(
sender: &mpsc::Sender<Self>,
uuids: HashSet<Uuid>,
indexes: Vec<Index>,
path: PathBuf,
) -> Result<()> {
let (ret, rcv) = oneshot::channel();
let msg = Self::Dump { path, uuids, ret };
let msg = Self::Dump { path, indexes, ret };
sender.send(msg).await?;
rcv.await?
}

View File

@ -3,7 +3,6 @@ mod message;
pub mod status;
pub mod store;
use std::collections::HashSet;
use std::io;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool;
@ -104,7 +103,6 @@ pub struct UpdateLoop {
store: Arc<UpdateStore>,
inbox: Option<mpsc::Receiver<UpdateMsg>>,
update_file_store: UpdateFileStore,
index_resolver: Arc<HardStateIndexResolver>,
must_exit: Arc<AtomicBool>,
}
@ -133,7 +131,6 @@ impl UpdateLoop {
inbox,
must_exit,
update_file_store,
index_resolver,
})
}
@ -184,8 +181,8 @@ impl UpdateLoop {
GetInfo { ret } => {
let _ = ret.send(self.handle_get_info().await);
}
Dump { uuids, path, ret } => {
let _ = ret.send(self.handle_dump(uuids, path).await);
Dump { indexes, path, ret } => {
let _ = ret.send(self.handle_dump(indexes, path).await);
}
}
})
@ -278,12 +275,11 @@ impl UpdateLoop {
Ok(())
}
async fn handle_dump(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
let index_handle = self.index_resolver.clone();
async fn handle_dump(&self, indexes: Vec<Index>, path: PathBuf) -> Result<()> {
let update_store = self.store.clone();
tokio::task::spawn_blocking(move || -> Result<()> {
update_store.dump(&uuids, path.to_path_buf(), index_handle)?;
update_store.dump(&indexes, path.to_path_buf())?;
Ok(())
})
.await??;

View File

@ -1,11 +1,17 @@
use std::{collections::HashSet, fs::{create_dir_all, File}, io::Write, path::{Path, PathBuf}, sync::Arc};
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::io::{BufReader, Write};
use std::fs::{File, create_dir_all};
use heed::RoTxn;
use heed::{EnvOpenOptions, RoTxn};
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
use serde_json::Deserializer;
use tempfile::{NamedTempFile, TempDir};
use uuid::Uuid;
use super::{Result, State, UpdateStore};
use crate::index_controller::{index_resolver::HardStateIndexResolver, updates::status::UpdateStatus};
use crate::{RegisterUpdate, index::Index, index_controller::{update_file_store::UpdateFileStore, updates::status::{Enqueued, UpdateStatus}}};
#[derive(Serialize, Deserialize)]
struct UpdateEntry {
@ -16,9 +22,8 @@ struct UpdateEntry {
impl UpdateStore {
pub fn dump(
&self,
uuids: &HashSet<Uuid>,
indexes: &[Index],
path: PathBuf,
handle: Arc<HardStateIndexResolver>,
) -> Result<()> {
let state_lock = self.state.write();
state_lock.swap(State::Dumping);
@ -26,15 +31,11 @@ impl UpdateStore {
// txn must *always* be acquired after state lock, or it will dead lock.
let txn = self.env.write_txn()?;
let dump_path = path.join("updates");
create_dir_all(&dump_path)?;
let uuids = indexes.iter().map(|i| i.uuid).collect();
self.dump_updates(&txn, uuids, &dump_path)?;
self.dump_updates(&txn, &uuids, &path)?;
let fut = dump_indexes(uuids, handle, &path);
tokio::runtime::Handle::current().block_on(fut)?;
state_lock.swap(State::Idle);
indexes.par_iter().try_for_each(|index| index.dump(&path)).unwrap();
Ok(())
}
@ -45,58 +46,59 @@ impl UpdateStore {
uuids: &HashSet<Uuid>,
path: impl AsRef<Path>,
) -> Result<()> {
//let dump_data_path = path.as_ref().join("data.jsonl");
//let mut dump_data_file = File::create(dump_data_path)?;
let mut dump_data_file = NamedTempFile::new()?;
//let update_files_path = path.as_ref().join(super::UPDATE_DIR);
//create_dir_all(&update_files_path)?;
self.dump_pending(txn, uuids, &mut dump_data_file, &path)?;
self.dump_completed(txn, uuids, &mut dump_data_file)?;
//self.dump_pending(txn, uuids, &mut dump_data_file, &path)?;
//self.dump_completed(txn, uuids, &mut dump_data_file)?;
let mut dst_path = path.as_ref().join("updates");
create_dir_all(&dst_path)?;
dst_path.push("data.jsonl");
dump_data_file.persist(dst_path).unwrap();
//Ok(())
todo!()
Ok(())
}
fn dump_pending(
&self,
_txn: &RoTxn,
_uuids: &HashSet<Uuid>,
_file: &mut File,
_dst_path: impl AsRef<Path>,
txn: &RoTxn,
uuids: &HashSet<Uuid>,
mut file: impl Write,
dst_path: impl AsRef<Path>,
) -> Result<()> {
todo!()
//let pendings = self.pending_queue.iter(txn)?.lazily_decode_data();
let pendings = self.pending_queue.iter(txn)?.lazily_decode_data();
//for pending in pendings {
//let ((_, uuid, _), data) = pending?;
//if uuids.contains(&uuid) {
//let update = data.decode()?;
for pending in pendings {
let ((_, uuid, _), data) = pending?;
if uuids.contains(&uuid) {
let update = data.decode()?;
//if let Some(ref update_uuid) = update.content {
//let src = super::update_uuid_to_file_path(&self.path, *update_uuid);
//let dst = super::update_uuid_to_file_path(&dst_path, *update_uuid);
//std::fs::copy(src, dst)?;
//}
if let Enqueued {
meta: RegisterUpdate::DocumentAddition {
content_uuid, ..
}, ..
} = update {
self.update_file_store.dump(content_uuid, &dst_path).unwrap();
}
//let update_json = UpdateEntry {
//uuid,
//update: update.into(),
//};
let update_json = UpdateEntry {
uuid,
update: update.into(),
};
//serde_json::to_writer(&mut file, &update_json)?;
//file.write_all(b"\n")?;
//}
//}
serde_json::to_writer(&mut file, &update_json)?;
file.write_all(b"\n")?;
}
}
//Ok(())
Ok(())
}
fn dump_completed(
&self,
txn: &RoTxn,
uuids: &HashSet<Uuid>,
mut file: &mut File,
mut file: impl Write,
) -> Result<()> {
let updates = self.updates.iter(txn)?.lazily_decode_data();
@ -116,65 +118,35 @@ impl UpdateStore {
}
pub fn load_dump(
_src: impl AsRef<Path>,
_dst: impl AsRef<Path>,
_db_size: usize,
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
db_size: usize,
) -> anyhow::Result<()> {
todo!()
//let dst_update_path = dst.as_ref().join("updates/");
//create_dir_all(&dst_update_path)?;
//let mut options = EnvOpenOptions::new();
//options.map_size(db_size as usize);
//let (store, _) = UpdateStore::new(options, &dst_update_path)?;
println!("target path: {}", dst.as_ref().display());
//let src_update_path = src.as_ref().join("updates");
//let update_data = File::open(&src_update_path.join("data.jsonl"))?;
//let mut update_data = BufReader::new(update_data);
let mut options = EnvOpenOptions::new();
options.map_size(db_size as usize);
//std::fs::create_dir_all(dst_update_path.join("update_files/"))?;
// create a dummy update fiel store, since it is not needed right now.
let tmp = TempDir::new().unwrap();
let update_file_store = UpdateFileStore::new(tmp.path()).unwrap();
let (store, _) = UpdateStore::new(options, &dst, update_file_store)?;
//let mut wtxn = store.env.write_txn()?;
//let mut line = String::new();
//loop {
//match update_data.read_line(&mut line) {
//Ok(0) => break,
//Ok(_) => {
//let UpdateEntry { uuid, update } = serde_json::from_str(&line)?;
//store.register_raw_updates(&mut wtxn, &update, uuid)?;
let src_update_path = src.as_ref().join("updates");
let update_data = File::open(&src_update_path.join("data.jsonl"))?;
let update_data = BufReader::new(update_data);
//// Copy ascociated update path if it exists
//if let UpdateStatus::Enqueued(Enqueued {
//content: Some(uuid),
//..
//}) = update
//{
//let src = update_uuid_to_file_path(&src_update_path, uuid);
//let dst = update_uuid_to_file_path(&dst_update_path, uuid);
//std::fs::copy(src, dst)?;
//}
//}
//_ => break,
//}
let stream = Deserializer::from_reader(update_data).into_iter::<UpdateEntry>();
let mut wtxn = store.env.write_txn()?;
//line.clear();
//}
for entry in stream {
let UpdateEntry { uuid, update } = entry?;
store.register_raw_updates(&mut wtxn, &update, uuid)?;
}
//wtxn.commit()?;
wtxn.commit()?;
//Ok(())
Ok(())
}
}
async fn dump_indexes(
_uuids: &HashSet<Uuid>,
_handle: Arc<HardStateIndexResolver>,
_path: impl AsRef<Path>,
) -> Result<()> {
todo!()
//for uuid in uuids {
//IndexMsg::dump(&handle, *uuid, path.as_ref().to_owned()).await?;
//}
//Ok(())
}

View File

@ -262,28 +262,28 @@ impl UpdateStore {
// /// Push already processed update in the UpdateStore without triggering the notification
// /// process. This is useful for the dumps.
//pub fn register_raw_updates(
//&self,
//wtxn: &mut heed::RwTxn,
//update: &UpdateStatus,
//index_uuid: Uuid,
//) -> heed::Result<()> {
//match update {
//UpdateStatus::Enqueued(enqueued) => {
//let (global_id, _update_id) = self.next_update_id(wtxn, index_uuid)?;
//self.pending_queue.remap_key_type::<PendingKeyCodec>().put(
//wtxn,
//&(global_id, index_uuid, enqueued.id()),
//enqueued,
//)?;
//}
//_ => {
//let _update_id = self.next_update_id_raw(wtxn, index_uuid)?;
//self.updates.put(wtxn, &(index_uuid, update.id()), update)?;
//}
//}
//Ok(())
//}
pub fn register_raw_updates(
&self,
wtxn: &mut heed::RwTxn,
update: &UpdateStatus,
index_uuid: Uuid,
) -> heed::Result<()> {
match update {
UpdateStatus::Enqueued(enqueued) => {
let (global_id, _update_id) = self.next_update_id(wtxn, index_uuid)?;
self.pending_queue.remap_key_type::<PendingKeyCodec>().put(
wtxn,
&(global_id, index_uuid, enqueued.id()),
enqueued,
)?;
}
_ => {
let _update_id = self.next_update_id_raw(wtxn, index_uuid)?;
self.updates.put(wtxn, &(index_uuid, update.id()), update)?;
}
}
Ok(())
}
/// Executes the user provided function on the next pending update (the one with the lowest id).
/// This is asynchronous as it let the user process the update with a read-only txn and

View File

@ -8,6 +8,7 @@ pub mod index_controller;
pub use index_controller::{IndexController as MeiliSearch, updates::RegisterUpdate};
mod compression;
mod document_formats;
use walkdir::WalkDir;