mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-08 20:44:30 +01:00
Merge #1742
1742: Create dumps v3 r=irevoire a=MarinPostma The introduction of the obkv document format has changed the format of the updates, by removing the need for the document format of the addition (it is not necessary since update are store in the obkv format). This has caused breakage in the dumps that this pr solves by introducing a 3rd version of the dumps. A v2 compat layer has been created that support the import of v2 dumps into meilisearch. This has permitted to move the compat code that existed elsewhere in meiliearch to be moved into the v2 module. The asc/desc patching is now only done for forward compatibility when loading a v2 dump, and the v3 write the asc/desc to the dump with the new syntax. Co-authored-by: mpostma <postma.marin@protonmail.com>
This commit is contained in:
commit
ed783b67ca
@ -312,7 +312,7 @@ mod test {
|
||||
|
||||
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
|
||||
|
||||
assert!(dbg!(csv_iter.next().unwrap()).is_err());
|
||||
assert!(csv_iter.next().unwrap().is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -7,12 +7,10 @@ use heed::{EnvOpenOptions, RoTxn};
|
||||
use indexmap::IndexMap;
|
||||
use milli::documents::DocumentBatchReader;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
|
||||
use crate::document_formats::read_ndjson;
|
||||
use crate::index::update_handler::UpdateHandler;
|
||||
use crate::index::updates::apply_settings_to_builder;
|
||||
use crate::index_controller::{asc_ranking_rule, desc_ranking_rule};
|
||||
|
||||
use super::error::Result;
|
||||
use super::{Index, Settings, Unchecked};
|
||||
@ -100,23 +98,11 @@ impl Index {
|
||||
create_dir_all(&dst_dir_path)?;
|
||||
|
||||
let meta_path = src.as_ref().join(META_FILE_NAME);
|
||||
let mut meta_file = File::open(meta_path)?;
|
||||
|
||||
// We first deserialize the dump meta into a serde_json::Value and change
|
||||
// the custom ranking rules settings from the old format to the new format.
|
||||
let mut meta: Value = serde_json::from_reader(&mut meta_file)?;
|
||||
if let Some(ranking_rules) = meta.pointer_mut("/settings/rankingRules") {
|
||||
convert_custom_ranking_rules(ranking_rules);
|
||||
}
|
||||
|
||||
// Then we serialize it back into a vec to deserialize it
|
||||
// into a `DumpMeta` struct with the newly patched `rankingRules` format.
|
||||
let patched_meta = serde_json::to_vec(&meta)?;
|
||||
|
||||
let meta_file = File::open(meta_path)?;
|
||||
let DumpMeta {
|
||||
settings,
|
||||
primary_key,
|
||||
} = serde_json::from_slice(&patched_meta)?;
|
||||
} = serde_json::from_reader(meta_file)?;
|
||||
let settings = settings.check();
|
||||
|
||||
let mut options = EnvOpenOptions::new();
|
||||
@ -164,25 +150,3 @@ impl Index {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Converts the ranking rules from the format `asc(_)`, `desc(_)` to the format `_:asc`, `_:desc`.
|
||||
///
|
||||
/// This is done for compatibility reasons, and to avoid a new dump version,
|
||||
/// since the new syntax was introduced soon after the new dump version.
|
||||
fn convert_custom_ranking_rules(ranking_rules: &mut Value) {
|
||||
*ranking_rules = match ranking_rules.take() {
|
||||
Value::Array(values) => values
|
||||
.into_iter()
|
||||
.filter_map(|value| match value {
|
||||
Value::String(s) if s.starts_with("asc") => asc_ranking_rule(&s)
|
||||
.map(|f| format!("{}:asc", f))
|
||||
.map(Value::String),
|
||||
Value::String(s) if s.starts_with("desc") => desc_ranking_rule(&s)
|
||||
.map(|f| format!("{}:desc", f))
|
||||
.map(Value::String),
|
||||
otherwise => Some(otherwise),
|
||||
})
|
||||
.collect(),
|
||||
otherwise => otherwise,
|
||||
}
|
||||
}
|
||||
|
@ -1,2 +1,19 @@
|
||||
pub mod v1;
|
||||
pub mod v2;
|
||||
pub mod v3;
|
||||
|
||||
mod compat {
|
||||
/// Parses the v1 version of the Asc ranking rules `asc(price)`and returns the field name.
|
||||
pub fn asc_ranking_rule(text: &str) -> Option<&str> {
|
||||
text.split_once("asc(")
|
||||
.and_then(|(_, tail)| tail.rsplit_once(")"))
|
||||
.map(|(field, _)| field)
|
||||
}
|
||||
|
||||
/// Parses the v1 version of the Desc ranking rules `desc(price)`and returns the field name.
|
||||
pub fn desc_ranking_rule(text: &str) -> Option<&str> {
|
||||
text.split_once("desc(")
|
||||
.and_then(|(_, tail)| tail.rsplit_once(")"))
|
||||
.map(|(field, _)| field)
|
||||
}
|
||||
}
|
||||
|
@ -5,7 +5,7 @@ use std::marker::PhantomData;
|
||||
use std::path::Path;
|
||||
|
||||
use heed::EnvOpenOptions;
|
||||
use log::{error, info, warn};
|
||||
use log::{error, warn};
|
||||
use milli::documents::DocumentBatchReader;
|
||||
use milli::update::Setting;
|
||||
use serde::{Deserialize, Deserializer, Serialize};
|
||||
@ -14,14 +14,15 @@ use uuid::Uuid;
|
||||
use crate::document_formats::read_ndjson;
|
||||
use crate::index::apply_settings_to_builder;
|
||||
use crate::index::update_handler::UpdateHandler;
|
||||
use crate::index_controller::dump_actor::loaders::compat::{asc_ranking_rule, desc_ranking_rule};
|
||||
use crate::index_controller::index_resolver::uuid_store::HeedUuidStore;
|
||||
use crate::index_controller::{self, asc_ranking_rule, desc_ranking_rule, IndexMetadata};
|
||||
use crate::index_controller::{self, IndexMetadata};
|
||||
use crate::{index::Unchecked, options::IndexerOpts};
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct MetadataV1 {
|
||||
db_version: String,
|
||||
pub db_version: String,
|
||||
indexes: Vec<IndexMetadata>,
|
||||
}
|
||||
|
||||
@ -33,11 +34,6 @@ impl MetadataV1 {
|
||||
size: usize,
|
||||
indexer_options: &IndexerOpts,
|
||||
) -> anyhow::Result<()> {
|
||||
info!(
|
||||
"Loading dump, dump database version: {}, dump version: V1",
|
||||
self.db_version
|
||||
);
|
||||
|
||||
let uuid_store = HeedUuidStore::new(&dst)?;
|
||||
for index in self.indexes {
|
||||
let uuid = Uuid::new_v4();
|
||||
@ -93,7 +89,7 @@ fn load_index(
|
||||
size: usize,
|
||||
indexer_options: &IndexerOpts,
|
||||
) -> anyhow::Result<()> {
|
||||
let index_path = dst.as_ref().join(&format!("indexes/index-{}", uuid));
|
||||
let index_path = dst.as_ref().join(&format!("indexes/{}", uuid));
|
||||
|
||||
create_dir_all(&index_path)?;
|
||||
let mut options = EnvOpenOptions::new();
|
||||
|
@ -1,52 +1,393 @@
|
||||
use std::path::Path;
|
||||
use std::fs::{File, OpenOptions};
|
||||
use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use log::info;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::{Deserializer, Value};
|
||||
use tempfile::NamedTempFile;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::index_controller::index_resolver::IndexResolver;
|
||||
use crate::index_controller::update_file_store::UpdateFileStore;
|
||||
use crate::index_controller::updates::store::UpdateStore;
|
||||
use crate::index_controller::dump_actor::loaders::compat::{asc_ranking_rule, desc_ranking_rule};
|
||||
use crate::index_controller::dump_actor::Metadata;
|
||||
use crate::index_controller::updates::status::{
|
||||
Aborted, Enqueued, Failed, Processed, Processing, UpdateResult, UpdateStatus,
|
||||
};
|
||||
use crate::index_controller::updates::store::dump::UpdateEntry;
|
||||
use crate::index_controller::updates::store::Update;
|
||||
use crate::options::IndexerOpts;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct MetadataV2 {
|
||||
db_version: String,
|
||||
use super::v3;
|
||||
|
||||
/// The dump v2 reads the dump folder and patches all the needed file to make it compatible with a
|
||||
/// dump v3, then calls the dump v3 to actually handle the dump.
|
||||
pub fn load_dump(
|
||||
meta: Metadata,
|
||||
src: impl AsRef<Path>,
|
||||
dst: impl AsRef<Path>,
|
||||
index_db_size: usize,
|
||||
update_db_size: usize,
|
||||
dump_date: DateTime<Utc>,
|
||||
indexing_options: &IndexerOpts,
|
||||
) -> anyhow::Result<()> {
|
||||
let indexes_path = src.as_ref().join("indexes");
|
||||
|
||||
let dir_entries = std::fs::read_dir(indexes_path)?;
|
||||
for entry in dir_entries {
|
||||
let entry = entry?;
|
||||
|
||||
// rename the index folder
|
||||
let path = entry.path();
|
||||
let new_path = patch_index_uuid_path(&path).expect("invalid index folder.");
|
||||
|
||||
std::fs::rename(path, &new_path)?;
|
||||
|
||||
let settings_path = new_path.join("meta.json");
|
||||
|
||||
patch_settings(settings_path)?;
|
||||
}
|
||||
|
||||
let update_path = src.as_ref().join("updates/data.jsonl");
|
||||
patch_updates(update_path)?;
|
||||
|
||||
v3::load_dump(
|
||||
meta,
|
||||
src,
|
||||
dst,
|
||||
index_db_size,
|
||||
update_db_size,
|
||||
indexing_options,
|
||||
)
|
||||
}
|
||||
|
||||
impl MetadataV2 {
|
||||
pub fn new(index_db_size: usize, update_db_size: usize) -> Self {
|
||||
fn patch_index_uuid_path(path: &Path) -> Option<PathBuf> {
|
||||
let uuid = path.file_name()?.to_str()?.trim_start_matches("index-");
|
||||
let new_path = path.parent()?.join(uuid);
|
||||
Some(new_path)
|
||||
}
|
||||
|
||||
fn patch_settings(path: impl AsRef<Path>) -> anyhow::Result<()> {
|
||||
let mut meta_file = File::open(&path)?;
|
||||
let mut meta: Value = serde_json::from_reader(&mut meta_file)?;
|
||||
|
||||
// We first deserialize the dump meta into a serde_json::Value and change
|
||||
// the custom ranking rules settings from the old format to the new format.
|
||||
if let Some(ranking_rules) = meta.pointer_mut("/settings/rankingRules") {
|
||||
patch_custon_ranking_rules(ranking_rules);
|
||||
}
|
||||
|
||||
let mut meta_file = OpenOptions::new().truncate(true).write(true).open(path)?;
|
||||
|
||||
serde_json::to_writer(&mut meta_file, &meta)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn patch_updates(path: impl AsRef<Path>) -> anyhow::Result<()> {
|
||||
let mut output_update_file = NamedTempFile::new()?;
|
||||
let update_file = File::open(&path)?;
|
||||
|
||||
let stream = Deserializer::from_reader(update_file).into_iter::<compat::UpdateEntry>();
|
||||
|
||||
for update in stream {
|
||||
let update_entry = update?;
|
||||
|
||||
let update_entry = UpdateEntry::from(update_entry);
|
||||
|
||||
serde_json::to_writer(&mut output_update_file, &update_entry)?;
|
||||
output_update_file.write_all(b"\n")?;
|
||||
}
|
||||
|
||||
output_update_file.flush()?;
|
||||
output_update_file.persist(path)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Converts the ranking rules from the format `asc(_)`, `desc(_)` to the format `_:asc`, `_:desc`.
|
||||
///
|
||||
/// This is done for compatibility reasons, and to avoid a new dump version,
|
||||
/// since the new syntax was introduced soon after the new dump version.
|
||||
fn patch_custon_ranking_rules(ranking_rules: &mut Value) {
|
||||
*ranking_rules = match ranking_rules.take() {
|
||||
Value::Array(values) => values
|
||||
.into_iter()
|
||||
.filter_map(|value| match value {
|
||||
Value::String(s) if s.starts_with("asc") => asc_ranking_rule(&s)
|
||||
.map(|f| format!("{}:asc", f))
|
||||
.map(Value::String),
|
||||
Value::String(s) if s.starts_with("desc") => desc_ranking_rule(&s)
|
||||
.map(|f| format!("{}:desc", f))
|
||||
.map(Value::String),
|
||||
otherwise => Some(otherwise),
|
||||
})
|
||||
.collect(),
|
||||
otherwise => otherwise,
|
||||
}
|
||||
}
|
||||
|
||||
impl From<compat::UpdateEntry> for UpdateEntry {
|
||||
fn from(compat::UpdateEntry { uuid, update }: compat::UpdateEntry) -> Self {
|
||||
let update = match update {
|
||||
compat::UpdateStatus::Processing(meta) => UpdateStatus::Processing(meta.into()),
|
||||
compat::UpdateStatus::Enqueued(meta) => UpdateStatus::Enqueued(meta.into()),
|
||||
compat::UpdateStatus::Processed(meta) => UpdateStatus::Processed(meta.into()),
|
||||
compat::UpdateStatus::Aborted(meta) => UpdateStatus::Aborted(meta.into()),
|
||||
compat::UpdateStatus::Failed(meta) => UpdateStatus::Failed(meta.into()),
|
||||
};
|
||||
|
||||
Self { uuid, update }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<compat::Failed> for Failed {
|
||||
fn from(other: compat::Failed) -> Self {
|
||||
let compat::Failed {
|
||||
from,
|
||||
error,
|
||||
failed_at,
|
||||
} = other;
|
||||
|
||||
Self {
|
||||
db_version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
index_db_size,
|
||||
update_db_size,
|
||||
dump_date: Utc::now(),
|
||||
from: from.into(),
|
||||
msg: error.message,
|
||||
code: compat::error_code_from_str(&error.error_code)
|
||||
.expect("Invalid update: Invalid error code"),
|
||||
failed_at,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn load_dump(
|
||||
self,
|
||||
src: impl AsRef<Path>,
|
||||
dst: impl AsRef<Path>,
|
||||
index_db_size: usize,
|
||||
update_db_size: usize,
|
||||
indexing_options: &IndexerOpts,
|
||||
) -> anyhow::Result<()> {
|
||||
info!(
|
||||
"Loading dump from {}, dump database version: {}, dump version: V2",
|
||||
self.dump_date, self.db_version
|
||||
);
|
||||
impl From<compat::Aborted> for Aborted {
|
||||
fn from(other: compat::Aborted) -> Self {
|
||||
let compat::Aborted { from, aborted_at } = other;
|
||||
|
||||
IndexResolver::load_dump(src.as_ref(), &dst, index_db_size, indexing_options)?;
|
||||
UpdateFileStore::load_dump(src.as_ref(), &dst)?;
|
||||
UpdateStore::load_dump(&src, &dst, update_db_size)?;
|
||||
|
||||
info!("Loading indexes.");
|
||||
|
||||
Ok(())
|
||||
Self {
|
||||
from: from.into(),
|
||||
aborted_at,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<compat::Processing> for Processing {
|
||||
fn from(other: compat::Processing) -> Self {
|
||||
let compat::Processing {
|
||||
from,
|
||||
started_processing_at,
|
||||
} = other;
|
||||
|
||||
Self {
|
||||
from: from.into(),
|
||||
started_processing_at,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<compat::Enqueued> for Enqueued {
|
||||
fn from(other: compat::Enqueued) -> Self {
|
||||
let compat::Enqueued {
|
||||
update_id,
|
||||
meta,
|
||||
enqueued_at,
|
||||
content,
|
||||
} = other;
|
||||
|
||||
let meta = match meta {
|
||||
compat::UpdateMeta::DocumentsAddition {
|
||||
method,
|
||||
primary_key,
|
||||
..
|
||||
} => {
|
||||
Update::DocumentAddition {
|
||||
primary_key,
|
||||
method,
|
||||
// Just ignore if the uuid is no present. If it is needed later, an error will
|
||||
// be thrown.
|
||||
content_uuid: content.unwrap_or_else(Uuid::default),
|
||||
}
|
||||
}
|
||||
compat::UpdateMeta::ClearDocuments => Update::ClearDocuments,
|
||||
compat::UpdateMeta::DeleteDocuments { ids } => Update::DeleteDocuments(ids),
|
||||
compat::UpdateMeta::Settings(settings) => Update::Settings(settings),
|
||||
};
|
||||
|
||||
Self {
|
||||
update_id,
|
||||
meta,
|
||||
enqueued_at,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<compat::Processed> for Processed {
|
||||
fn from(other: compat::Processed) -> Self {
|
||||
let compat::Processed {
|
||||
from,
|
||||
success,
|
||||
processed_at,
|
||||
} = other;
|
||||
|
||||
Self {
|
||||
success: success.into(),
|
||||
processed_at,
|
||||
from: from.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<compat::UpdateResult> for UpdateResult {
|
||||
fn from(other: compat::UpdateResult) -> Self {
|
||||
match other {
|
||||
compat::UpdateResult::DocumentsAddition(r) => Self::DocumentsAddition(r),
|
||||
compat::UpdateResult::DocumentDeletion { deleted } => {
|
||||
Self::DocumentDeletion { deleted }
|
||||
}
|
||||
compat::UpdateResult::Other => Self::Other,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// compat structure from pre-dumpv3 meilisearch
|
||||
mod compat {
|
||||
use anyhow::bail;
|
||||
use chrono::{DateTime, Utc};
|
||||
use meilisearch_error::Code;
|
||||
use milli::update::{DocumentAdditionResult, IndexDocumentsMethod};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::index::{Settings, Unchecked};
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub struct UpdateEntry {
|
||||
pub uuid: Uuid,
|
||||
pub update: UpdateStatus,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum UpdateFormat {
|
||||
Json,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum UpdateResult {
|
||||
DocumentsAddition(DocumentAdditionResult),
|
||||
DocumentDeletion { deleted: u64 },
|
||||
Other,
|
||||
}
|
||||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(tag = "type")]
|
||||
pub enum UpdateMeta {
|
||||
DocumentsAddition {
|
||||
method: IndexDocumentsMethod,
|
||||
format: UpdateFormat,
|
||||
primary_key: Option<String>,
|
||||
},
|
||||
ClearDocuments,
|
||||
DeleteDocuments {
|
||||
ids: Vec<String>,
|
||||
},
|
||||
Settings(Settings<Unchecked>),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Enqueued {
|
||||
pub update_id: u64,
|
||||
pub meta: UpdateMeta,
|
||||
pub enqueued_at: DateTime<Utc>,
|
||||
pub content: Option<Uuid>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Processed {
|
||||
pub success: UpdateResult,
|
||||
pub processed_at: DateTime<Utc>,
|
||||
#[serde(flatten)]
|
||||
pub from: Processing,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Processing {
|
||||
#[serde(flatten)]
|
||||
pub from: Enqueued,
|
||||
pub started_processing_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Aborted {
|
||||
#[serde(flatten)]
|
||||
pub from: Enqueued,
|
||||
pub aborted_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Failed {
|
||||
#[serde(flatten)]
|
||||
pub from: Processing,
|
||||
pub error: ResponseError,
|
||||
pub failed_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(tag = "status", rename_all = "camelCase")]
|
||||
pub enum UpdateStatus {
|
||||
Processing(Processing),
|
||||
Enqueued(Enqueued),
|
||||
Processed(Processed),
|
||||
Aborted(Aborted),
|
||||
Failed(Failed),
|
||||
}
|
||||
|
||||
type StatusCode = ();
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ResponseError {
|
||||
#[serde(skip)]
|
||||
pub code: StatusCode,
|
||||
pub message: String,
|
||||
pub error_code: String,
|
||||
pub error_type: String,
|
||||
pub error_link: String,
|
||||
}
|
||||
|
||||
pub fn error_code_from_str(s: &str) -> anyhow::Result<Code> {
|
||||
let code = match s {
|
||||
"index_creation_failed" => Code::CreateIndex,
|
||||
"index_already_exists" => Code::IndexAlreadyExists,
|
||||
"index_not_found" => Code::IndexNotFound,
|
||||
"invalid_index_uid" => Code::InvalidIndexUid,
|
||||
"index_not_accessible" => Code::OpenIndex,
|
||||
"invalid_state" => Code::InvalidState,
|
||||
"missing_primary_key" => Code::MissingPrimaryKey,
|
||||
"primary_key_already_present" => Code::PrimaryKeyAlreadyPresent,
|
||||
"invalid_request" => Code::InvalidRankingRule,
|
||||
"max_fields_limit_exceeded" => Code::MaxFieldsLimitExceeded,
|
||||
"missing_document_id" => Code::MissingDocumentId,
|
||||
"invalid_facet" => Code::Facet,
|
||||
"invalid_filter" => Code::Filter,
|
||||
"invalid_sort" => Code::Sort,
|
||||
"bad_parameter" => Code::BadParameter,
|
||||
"bad_request" => Code::BadRequest,
|
||||
"document_not_found" => Code::DocumentNotFound,
|
||||
"internal" => Code::Internal,
|
||||
"invalid_geo_field" => Code::InvalidGeoField,
|
||||
"invalid_token" => Code::InvalidToken,
|
||||
"missing_authorization_header" => Code::MissingAuthorizationHeader,
|
||||
"not_found" => Code::NotFound,
|
||||
"payload_too_large" => Code::PayloadTooLarge,
|
||||
"unretrievable_document" => Code::RetrieveDocument,
|
||||
"search_error" => Code::SearchDocuments,
|
||||
"unsupported_media_type" => Code::UnsupportedMediaType,
|
||||
"dump_already_in_progress" => Code::DumpAlreadyInProgress,
|
||||
"dump_process_failed" => Code::DumpProcessFailed,
|
||||
_ => bail!("unknow error code."),
|
||||
};
|
||||
|
||||
Ok(code)
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,31 @@
|
||||
use std::path::Path;
|
||||
|
||||
use log::info;
|
||||
|
||||
use crate::index_controller::dump_actor::Metadata;
|
||||
use crate::index_controller::index_resolver::IndexResolver;
|
||||
use crate::index_controller::update_file_store::UpdateFileStore;
|
||||
use crate::index_controller::updates::store::UpdateStore;
|
||||
use crate::options::IndexerOpts;
|
||||
|
||||
pub fn load_dump(
|
||||
meta: Metadata,
|
||||
src: impl AsRef<Path>,
|
||||
dst: impl AsRef<Path>,
|
||||
index_db_size: usize,
|
||||
update_db_size: usize,
|
||||
indexing_options: &IndexerOpts,
|
||||
) -> anyhow::Result<()> {
|
||||
info!(
|
||||
"Loading dump from {}, dump database version: {}, dump version: V3",
|
||||
meta.dump_date, meta.db_version
|
||||
);
|
||||
|
||||
IndexResolver::load_dump(src.as_ref(), &dst, index_db_size, indexing_options)?;
|
||||
UpdateFileStore::load_dump(src.as_ref(), &dst)?;
|
||||
UpdateStore::load_dump(&src, &dst, update_db_size)?;
|
||||
|
||||
info!("Loading indexes.");
|
||||
|
||||
Ok(())
|
||||
}
|
@ -8,7 +8,6 @@ use serde::{Deserialize, Serialize};
|
||||
use tokio::fs::create_dir_all;
|
||||
|
||||
use loaders::v1::MetadataV1;
|
||||
use loaders::v2::MetadataV2;
|
||||
|
||||
pub use actor::DumpActor;
|
||||
pub use handle_impl::*;
|
||||
@ -18,6 +17,7 @@ use super::index_resolver::HardStateIndexResolver;
|
||||
use super::updates::UpdateSender;
|
||||
use crate::compression::{from_tar_gz, to_tar_gz};
|
||||
use crate::index_controller::dump_actor::error::DumpActorError;
|
||||
use crate::index_controller::dump_actor::loaders::{v2, v3};
|
||||
use crate::index_controller::updates::UpdateMsg;
|
||||
use crate::options::IndexerOpts;
|
||||
use error::Result;
|
||||
@ -30,6 +30,26 @@ mod message;
|
||||
|
||||
const META_FILE_NAME: &str = "metadata.json";
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Metadata {
|
||||
db_version: String,
|
||||
index_db_size: usize,
|
||||
update_db_size: usize,
|
||||
dump_date: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl Metadata {
|
||||
pub fn new(index_db_size: usize, update_db_size: usize) -> Self {
|
||||
Self {
|
||||
db_version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
index_db_size,
|
||||
update_db_size,
|
||||
dump_date: Utc::now(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait DumpActorHandle {
|
||||
/// Start the creation of a dump
|
||||
@ -43,15 +63,38 @@ pub trait DumpActorHandle {
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
#[serde(tag = "dumpVersion")]
|
||||
pub enum Metadata {
|
||||
pub enum MetadataVersion {
|
||||
V1(MetadataV1),
|
||||
V2(MetadataV2),
|
||||
V2(Metadata),
|
||||
V3(Metadata),
|
||||
}
|
||||
|
||||
impl Metadata {
|
||||
pub fn new_v2(index_db_size: usize, update_db_size: usize) -> Self {
|
||||
let meta = MetadataV2::new(index_db_size, update_db_size);
|
||||
Self::V2(meta)
|
||||
impl MetadataVersion {
|
||||
pub fn new_v3(index_db_size: usize, update_db_size: usize) -> Self {
|
||||
let meta = Metadata::new(index_db_size, update_db_size);
|
||||
Self::V3(meta)
|
||||
}
|
||||
|
||||
pub fn db_version(&self) -> &str {
|
||||
match self {
|
||||
Self::V1(meta) => &meta.db_version,
|
||||
Self::V2(meta) | Self::V3(meta) => &meta.db_version,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn version(&self) -> &str {
|
||||
match self {
|
||||
MetadataVersion::V1(_) => "V1",
|
||||
MetadataVersion::V2(_) => "V2",
|
||||
MetadataVersion::V3(_) => "V3",
|
||||
}
|
||||
}
|
||||
|
||||
pub fn dump_date(&self) -> Option<&DateTime<Utc>> {
|
||||
match self {
|
||||
MetadataVersion::V1(_) => None,
|
||||
MetadataVersion::V2(meta) | MetadataVersion::V3(meta) => Some(&meta.dump_date),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -109,6 +152,19 @@ pub fn load_dump(
|
||||
update_db_size: usize,
|
||||
indexer_opts: &IndexerOpts,
|
||||
) -> anyhow::Result<()> {
|
||||
// Setup a temp directory path in the same path as the database, to prevent cross devices
|
||||
// references.
|
||||
let temp_path = dst_path
|
||||
.as_ref()
|
||||
.parent()
|
||||
.map(ToOwned::to_owned)
|
||||
.unwrap_or_else(|| ".".into());
|
||||
if cfg!(windows) {
|
||||
std::env::set_var("TMP", temp_path);
|
||||
} else {
|
||||
std::env::set_var("TMPDIR", temp_path);
|
||||
}
|
||||
|
||||
let tmp_src = tempfile::tempdir()?;
|
||||
let tmp_src_path = tmp_src.path();
|
||||
|
||||
@ -116,15 +172,33 @@ pub fn load_dump(
|
||||
|
||||
let meta_path = tmp_src_path.join(META_FILE_NAME);
|
||||
let mut meta_file = File::open(&meta_path)?;
|
||||
let meta: Metadata = serde_json::from_reader(&mut meta_file)?;
|
||||
let meta: MetadataVersion = serde_json::from_reader(&mut meta_file)?;
|
||||
|
||||
let tmp_dst = tempfile::tempdir()?;
|
||||
|
||||
info!(
|
||||
"Loading dump {}, dump database version: {}, dump version: {}",
|
||||
meta.dump_date()
|
||||
.map(|t| format!("from {}", t))
|
||||
.unwrap_or_else(String::new),
|
||||
meta.db_version(),
|
||||
meta.version()
|
||||
);
|
||||
|
||||
match meta {
|
||||
Metadata::V1(meta) => {
|
||||
MetadataVersion::V1(meta) => {
|
||||
meta.load_dump(&tmp_src_path, tmp_dst.path(), index_db_size, indexer_opts)?
|
||||
}
|
||||
Metadata::V2(meta) => meta.load_dump(
|
||||
MetadataVersion::V2(meta) => v2::load_dump(
|
||||
meta,
|
||||
&tmp_src_path,
|
||||
tmp_dst.path(),
|
||||
index_db_size,
|
||||
update_db_size,
|
||||
indexer_opts,
|
||||
)?,
|
||||
MetadataVersion::V3(meta) => v3::load_dump(
|
||||
meta,
|
||||
&tmp_src_path,
|
||||
tmp_dst.path(),
|
||||
index_db_size,
|
||||
@ -162,7 +236,7 @@ impl DumpTask {
|
||||
let temp_dump_dir = tokio::task::spawn_blocking(tempfile::TempDir::new).await??;
|
||||
let temp_dump_path = temp_dump_dir.path().to_owned();
|
||||
|
||||
let meta = Metadata::new_v2(self.index_db_size, self.update_db_size);
|
||||
let meta = MetadataVersion::new_v3(self.index_db_size, self.update_db_size);
|
||||
let meta_path = temp_dump_path.join(META_FILE_NAME);
|
||||
let mut meta_file = File::create(&meta_path)?;
|
||||
serde_json::to_writer(&mut meta_file, &meta)?;
|
||||
|
@ -173,7 +173,6 @@ impl HeedUuidStore {
|
||||
Ok(0) => break,
|
||||
Ok(_) => {
|
||||
let DumpEntry { uuid, uid } = serde_json::from_str(&line)?;
|
||||
println!("importing {} {}", uid, uuid);
|
||||
db.db.put(&mut txn, &uid, uuid.as_bytes())?;
|
||||
}
|
||||
Err(e) => return Err(e.into()),
|
||||
|
@ -488,17 +488,3 @@ pub async fn get_arc_ownership_blocking<T>(mut item: Arc<T>) -> T {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Parses the v1 version of the Asc ranking rules `asc(price)`and returns the field name.
|
||||
pub fn asc_ranking_rule(text: &str) -> Option<&str> {
|
||||
text.split_once("asc(")
|
||||
.and_then(|(_, tail)| tail.rsplit_once(")"))
|
||||
.map(|(field, _)| field)
|
||||
}
|
||||
|
||||
/// Parses the v1 version of the Desc ranking rules `desc(price)`and returns the field name.
|
||||
pub fn desc_ranking_rule(text: &str) -> Option<&str> {
|
||||
text.split_once("desc(")
|
||||
.and_then(|(_, tail)| tail.rsplit_once(")"))
|
||||
.map(|(field, _)| field)
|
||||
}
|
||||
|
@ -73,6 +73,11 @@ impl UpdateFileStore {
|
||||
let src_update_files_path = src.as_ref().join(UPDATE_FILES_PATH);
|
||||
let dst_update_files_path = dst.as_ref().join(UPDATE_FILES_PATH);
|
||||
|
||||
// No update files to load
|
||||
if !src_update_files_path.exists() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
create_dir_all(&dst_update_files_path)?;
|
||||
|
||||
let entries = std::fs::read_dir(src_update_files_path)?;
|
||||
|
@ -133,8 +133,8 @@ impl Processing {
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Aborted {
|
||||
#[serde(flatten)]
|
||||
from: Enqueued,
|
||||
aborted_at: DateTime<Utc>,
|
||||
pub from: Enqueued,
|
||||
pub aborted_at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl Aborted {
|
||||
|
@ -21,9 +21,9 @@ use crate::{
|
||||
};
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct UpdateEntry {
|
||||
uuid: Uuid,
|
||||
update: UpdateStatus,
|
||||
pub struct UpdateEntry {
|
||||
pub uuid: Uuid,
|
||||
pub update: UpdateStatus,
|
||||
}
|
||||
|
||||
impl UpdateStore {
|
||||
@ -130,8 +130,6 @@ impl UpdateStore {
|
||||
dst: impl AsRef<Path>,
|
||||
db_size: usize,
|
||||
) -> anyhow::Result<()> {
|
||||
println!("target path: {}", dst.as_ref().display());
|
||||
|
||||
let mut options = EnvOpenOptions::new();
|
||||
options.map_size(db_size as usize);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user