mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-23 05:14:27 +01:00
split the dumps between v1 and v2
This commit is contained in:
parent
e389c088eb
commit
c4d898a265
@ -55,10 +55,10 @@ impl ApiKeys {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl Data {
|
impl Data {
|
||||||
pub fn new(options: Opt) -> anyhow::Result<Data> {
|
pub async fn new(options: Opt) -> anyhow::Result<Data> {
|
||||||
let path = options.db_path.clone();
|
let path = options.db_path.clone();
|
||||||
|
|
||||||
let index_controller = IndexController::new(&path, &options)?;
|
let index_controller = IndexController::new(&path, &options).await?;
|
||||||
|
|
||||||
let mut api_keys = ApiKeys {
|
let mut api_keys = ApiKeys {
|
||||||
master: options.clone().master_key,
|
master: options.clone().master_key,
|
||||||
|
@ -1,14 +1,13 @@
|
|||||||
use std::{
|
mod v1;
|
||||||
fs::File,
|
mod v2;
|
||||||
path::{Path, PathBuf},
|
|
||||||
sync::Arc,
|
use std::{fs::File, path::{Path, PathBuf}, sync::Arc};
|
||||||
};
|
|
||||||
|
|
||||||
use anyhow::bail;
|
use anyhow::bail;
|
||||||
use heed::EnvOpenOptions;
|
use heed::EnvOpenOptions;
|
||||||
use log::{error, info};
|
use log::{error, info};
|
||||||
use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
|
use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{de::Deserializer, Deserialize, Serialize};
|
||||||
use tempfile::TempDir;
|
use tempfile::TempDir;
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use tokio::task::spawn_blocking;
|
use tokio::task::spawn_blocking;
|
||||||
@ -20,13 +19,30 @@ use crate::index::Index;
|
|||||||
use crate::index_controller::uuid_resolver;
|
use crate::index_controller::uuid_resolver;
|
||||||
use crate::{helpers::compression, index::Settings};
|
use crate::{helpers::compression, index::Settings};
|
||||||
|
|
||||||
|
pub (super) fn deserialize_some<'de, T, D>(deserializer: D) -> Result<Option<T>, D::Error>
|
||||||
|
where
|
||||||
|
T: Deserialize<'de>,
|
||||||
|
D: Deserializer<'de>,
|
||||||
|
{
|
||||||
|
Deserialize::deserialize(deserializer).map(Some)
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
|
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
|
||||||
enum DumpVersion {
|
enum DumpVersion {
|
||||||
V1,
|
V1,
|
||||||
|
V2,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DumpVersion {
|
impl DumpVersion {
|
||||||
const CURRENT: Self = Self::V1;
|
const CURRENT: Self = Self::V2;
|
||||||
|
|
||||||
|
/// Select the good importation function from the `DumpVersion` of metadata
|
||||||
|
pub fn import_index(self, size: usize, dump_path: &Path, index_path: &Path) -> anyhow::Result<()> {
|
||||||
|
match self {
|
||||||
|
Self::V1 => v1::import_index(size, dump_path, index_path),
|
||||||
|
Self::V2 => v2::import_index(size, dump_path, index_path),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
@ -141,16 +157,6 @@ where
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extract Settings from `settings.json` file present at provided `dir_path`
|
|
||||||
fn settings_from_path(dir_path: &Path) -> anyhow::Result<Settings> {
|
|
||||||
let path = dir_path.join("settings.json");
|
|
||||||
let file = File::open(path)?;
|
|
||||||
let reader = std::io::BufReader::new(file);
|
|
||||||
let metadata = serde_json::from_reader(reader)?;
|
|
||||||
|
|
||||||
Ok(metadata)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Write Settings in `settings.json` file at provided `dir_path`
|
/// Write Settings in `settings.json` file at provided `dir_path`
|
||||||
fn settings_to_path(settings: &Settings, dir_path: &Path) -> anyhow::Result<()> {
|
fn settings_to_path(settings: &Settings, dir_path: &Path) -> anyhow::Result<()> {
|
||||||
let path = dir_path.join("settings.json");
|
let path = dir_path.join("settings.json");
|
||||||
@ -161,40 +167,7 @@ fn settings_to_path(settings: &Settings, dir_path: &Path) -> anyhow::Result<()>
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn import_index_v1(size: usize, dump_path: &Path, index_path: &Path) -> anyhow::Result<()> {
|
pub async fn load_dump(
|
||||||
std::fs::create_dir_all(&index_path)?;
|
|
||||||
let mut options = EnvOpenOptions::new();
|
|
||||||
options.map_size(size);
|
|
||||||
let index = milli::Index::new(options, index_path)?;
|
|
||||||
let index = Index(Arc::new(index));
|
|
||||||
|
|
||||||
// extract `settings.json` file and import content
|
|
||||||
let settings = settings_from_path(&dump_path)?;
|
|
||||||
let update_builder = UpdateBuilder::new(0);
|
|
||||||
index.update_settings(&settings, update_builder)?;
|
|
||||||
|
|
||||||
let update_builder = UpdateBuilder::new(1);
|
|
||||||
let file = File::open(&index_path.join("documents.jsonl"))?;
|
|
||||||
let reader = std::io::BufReader::new(file);
|
|
||||||
index.update_documents(
|
|
||||||
UpdateFormat::JsonStream,
|
|
||||||
IndexDocumentsMethod::ReplaceDocuments,
|
|
||||||
reader,
|
|
||||||
update_builder,
|
|
||||||
None,
|
|
||||||
)?;
|
|
||||||
|
|
||||||
// the last step: we extract the milli::Index and close it
|
|
||||||
Arc::try_unwrap(index.0)
|
|
||||||
.map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index")
|
|
||||||
.unwrap()
|
|
||||||
.prepare_for_closing()
|
|
||||||
.wait();
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn load_dump(
|
|
||||||
db_path: impl AsRef<Path>,
|
db_path: impl AsRef<Path>,
|
||||||
dump_path: impl AsRef<Path>,
|
dump_path: impl AsRef<Path>,
|
||||||
size: usize,
|
size: usize,
|
||||||
@ -212,15 +185,10 @@ pub fn load_dump(
|
|||||||
// read dump metadata
|
// read dump metadata
|
||||||
let metadata = DumpMetadata::from_path(&tmp_dir_path)?;
|
let metadata = DumpMetadata::from_path(&tmp_dir_path)?;
|
||||||
|
|
||||||
// choose importation function from DumpVersion of metadata
|
|
||||||
let import_index = match metadata.dump_version {
|
|
||||||
DumpVersion::V1 => import_index_v1,
|
|
||||||
};
|
|
||||||
|
|
||||||
// remove indexes which have same `uuid` than indexes to import and create empty indexes
|
// remove indexes which have same `uuid` than indexes to import and create empty indexes
|
||||||
let existing_index_uids = futures::executor::block_on(uuid_resolver.list())?;
|
let existing_index_uids = uuid_resolver.list().await?;
|
||||||
|
|
||||||
info!("Deleting indexes provided in the dump...");
|
info!("Deleting indexes already present in the db and provided in the dump...");
|
||||||
for idx in &metadata.indexes {
|
for idx in &metadata.indexes {
|
||||||
if let Some((_, uuid)) = existing_index_uids.iter().find(|(s, _)| s == &idx.uid) {
|
if let Some((_, uuid)) = existing_index_uids.iter().find(|(s, _)| s == &idx.uid) {
|
||||||
// if we find the index in the `uuid_resolver` it's supposed to exist on the file system
|
// if we find the index in the `uuid_resolver` it's supposed to exist on the file system
|
||||||
@ -237,18 +205,19 @@ pub fn load_dump(
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// if the index does not exist in the `uuid_resolver` we create it
|
// if the index does not exist in the `uuid_resolver` we create it
|
||||||
futures::executor::block_on(uuid_resolver.create(idx.uid.clone()))?;
|
uuid_resolver.create(idx.uid.clone()).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// import each indexes content
|
// import each indexes content
|
||||||
for idx in metadata.indexes {
|
for idx in metadata.indexes {
|
||||||
let dump_path = tmp_dir_path.join(&idx.uid);
|
let dump_path = tmp_dir_path.join(&idx.uid);
|
||||||
let uuid = futures::executor::block_on(uuid_resolver.get(idx.uid))?;
|
let uuid = uuid_resolver.get(idx.uid).await?;
|
||||||
let index_path = db_path.join(&format!("indexes/index-{}", uuid));
|
let index_path = db_path.join(&format!("indexes/index-{}", uuid));
|
||||||
|
let update_path = db_path.join(&format!("updates/updates-{}", uuid)); // TODO: add the update db
|
||||||
|
|
||||||
info!("Importing dump from {} into {}...", dump_path.display(), index_path.display());
|
info!("Importing dump from {} into {}...", dump_path.display(), index_path.display());
|
||||||
import_index(size, &dump_path, &index_path).unwrap();
|
metadata.dump_version.import_index(size, &dump_path, &index_path).unwrap();
|
||||||
info!("Dump importation from {} succeed", dump_path.display());
|
info!("Dump importation from {} succeed", dump_path.display());
|
||||||
}
|
}
|
||||||
|
|
119
meilisearch-http/src/index_controller/dump/v1.rs
Normal file
119
meilisearch-http/src/index_controller/dump/v1.rs
Normal file
@ -0,0 +1,119 @@
|
|||||||
|
use std::collections::{BTreeMap, BTreeSet};
|
||||||
|
|
||||||
|
use log::warn;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use crate::index_controller;
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
/// This is the settings used in the last version of meilisearch exporting dump in V1
|
||||||
|
#[derive(Default, Clone, Serialize, Deserialize, Debug)]
|
||||||
|
#[serde(rename_all = "camelCase", deny_unknown_fields)]
|
||||||
|
struct Settings {
|
||||||
|
#[serde(default, deserialize_with = "deserialize_some")]
|
||||||
|
pub ranking_rules: Option<Option<Vec<String>>>,
|
||||||
|
#[serde(default, deserialize_with = "deserialize_some")]
|
||||||
|
pub distinct_attribute: Option<Option<String>>,
|
||||||
|
#[serde(default, deserialize_with = "deserialize_some")]
|
||||||
|
pub searchable_attributes: Option<Option<Vec<String>>>,
|
||||||
|
#[serde(default, deserialize_with = "deserialize_some")]
|
||||||
|
pub displayed_attributes: Option<Option<BTreeSet<String>>>,
|
||||||
|
#[serde(default, deserialize_with = "deserialize_some")]
|
||||||
|
pub stop_words: Option<Option<BTreeSet<String>>>,
|
||||||
|
#[serde(default, deserialize_with = "deserialize_some")]
|
||||||
|
pub synonyms: Option<Option<BTreeMap<String, Vec<String>>>>,
|
||||||
|
#[serde(default, deserialize_with = "deserialize_some")]
|
||||||
|
pub attributes_for_faceting: Option<Option<Vec<String>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// we need to **always** be able to convert the old settings to the settings currently being used
|
||||||
|
impl From<Settings> for index_controller::Settings {
|
||||||
|
fn from(settings: Settings) -> Self {
|
||||||
|
if settings.distinct_attribute.flatten().is_some() {
|
||||||
|
error!("`distinct_attribute` are not yet implemented and thus will be ignored");
|
||||||
|
}
|
||||||
|
if settings.synonyms.flatten().is_some() {
|
||||||
|
error!("`synonyms` are not yet implemented and thus will be ignored");
|
||||||
|
}
|
||||||
|
Self {
|
||||||
|
// we need to convert the old `Vec<String>` into a `BTreeSet<String>`
|
||||||
|
displayed_attributes: settings.displayed_attributes.map(|o| o.map(|vec| vec.into_iter().collect())),
|
||||||
|
searchable_attributes: settings.searchable_attributes,
|
||||||
|
// we previously had a `Vec<String>` but now we have a `HashMap<String, String>`
|
||||||
|
// representing the name of the faceted field + the type of the field. Since the type
|
||||||
|
// was not known in the V1 of the dump we are just going to assume everything is a
|
||||||
|
// String
|
||||||
|
attributes_for_faceting: settings.attributes_for_faceting.map(|o| o.map(|vec| vec.into_iter().map(|key| (key, String::from("string"))).collect())),
|
||||||
|
// we need to convert the old `Vec<String>` into a `BTreeSet<String>`
|
||||||
|
ranking_rules: settings.ranking_rules.map(|o| o.map(|vec| vec.into_iter().filter_map(|criterion| {
|
||||||
|
match criterion.as_str() {
|
||||||
|
"words" | "typo" | "proximity" => Some(criterion),
|
||||||
|
s if s.starts_with("asc") || s.starts_with("desc") => Some(criterion),
|
||||||
|
"wordsPosition" => {
|
||||||
|
warn!("The criteria `words` and `wordsPosition` have been merged into a single criterion `words` so `wordsPositon` will be ignored");
|
||||||
|
Some(String::from("words"))
|
||||||
|
}
|
||||||
|
"attribute" | "exactness" => {
|
||||||
|
error!("The criterion `{}` is not implemented currently and thus will be ignored", criterion);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
s => {
|
||||||
|
error!("Unknown criterion found in the dump: `{}`, it will be ignored", s);
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).collect())),
|
||||||
|
// we need to convert the old `Vec<String>` into a `BTreeSet<String>`
|
||||||
|
stop_words: settings.stop_words.map(|o| o.map(|vec| vec.into_iter().collect())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Extract Settings from `settings.json` file present at provided `dir_path`
|
||||||
|
fn import_settings(dir_path: &Path) -> anyhow::Result<Settings> {
|
||||||
|
let path = dir_path.join("settings.json");
|
||||||
|
let file = File::open(path)?;
|
||||||
|
let reader = std::io::BufReader::new(file);
|
||||||
|
let metadata = serde_json::from_reader(reader)?;
|
||||||
|
|
||||||
|
Ok(metadata)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub fn import_index(size: usize, dump_path: &Path, index_path: &Path) -> anyhow::Result<()> {
|
||||||
|
info!("Importing a dump from an old version of meilisearch with dump version 1");
|
||||||
|
|
||||||
|
std::fs::create_dir_all(&index_path)?;
|
||||||
|
let mut options = EnvOpenOptions::new();
|
||||||
|
options.map_size(size);
|
||||||
|
let index = milli::Index::new(options, index_path)?;
|
||||||
|
let index = Index(Arc::new(index));
|
||||||
|
|
||||||
|
// extract `settings.json` file and import content
|
||||||
|
let settings = import_settings(&dump_path)?;
|
||||||
|
dbg!(&settings);
|
||||||
|
let settings = settings.into();
|
||||||
|
dbg!(&settings);
|
||||||
|
let update_builder = UpdateBuilder::new(0);
|
||||||
|
index.update_settings(&settings, update_builder)?;
|
||||||
|
|
||||||
|
let update_builder = UpdateBuilder::new(1);
|
||||||
|
let file = File::open(&dump_path.join("documents.jsonl"))?;
|
||||||
|
let reader = std::io::BufReader::new(file);
|
||||||
|
|
||||||
|
index.update_documents(
|
||||||
|
UpdateFormat::JsonStream,
|
||||||
|
IndexDocumentsMethod::ReplaceDocuments,
|
||||||
|
reader,
|
||||||
|
update_builder,
|
||||||
|
None,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
// the last step: we extract the original milli::Index and close it
|
||||||
|
Arc::try_unwrap(index.0)
|
||||||
|
.map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index")
|
||||||
|
.unwrap()
|
||||||
|
.prepare_for_closing()
|
||||||
|
.wait();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
51
meilisearch-http/src/index_controller/dump/v2.rs
Normal file
51
meilisearch-http/src/index_controller/dump/v2.rs
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
use heed::EnvOpenOptions;
|
||||||
|
use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
|
||||||
|
use crate::index::Index;
|
||||||
|
use crate::index_controller::Settings;
|
||||||
|
use std::{fs::File, path::Path, sync::Arc};
|
||||||
|
|
||||||
|
/// Extract Settings from `settings.json` file present at provided `dir_path`
|
||||||
|
fn import_settings(dir_path: &Path) -> anyhow::Result<Settings> {
|
||||||
|
let path = dir_path.join("settings.json");
|
||||||
|
let file = File::open(path)?;
|
||||||
|
let reader = std::io::BufReader::new(file);
|
||||||
|
let metadata = serde_json::from_reader(reader)?;
|
||||||
|
|
||||||
|
Ok(metadata)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn import_index(size: usize, dump_path: &Path, index_path: &Path) -> anyhow::Result<()> {
|
||||||
|
std::fs::create_dir_all(&index_path)?;
|
||||||
|
let mut options = EnvOpenOptions::new();
|
||||||
|
options.map_size(size);
|
||||||
|
let index = milli::Index::new(options, index_path)?;
|
||||||
|
let index = Index(Arc::new(index));
|
||||||
|
|
||||||
|
// extract `settings.json` file and import content
|
||||||
|
let settings = import_settings(&dump_path)?;
|
||||||
|
let update_builder = UpdateBuilder::new(0);
|
||||||
|
index.update_settings(&settings, update_builder)?;
|
||||||
|
dbg!(settings);
|
||||||
|
|
||||||
|
let update_builder = UpdateBuilder::new(1);
|
||||||
|
let file = File::open(&dump_path.join("documents.jsonl"))?;
|
||||||
|
let reader = std::io::BufReader::new(file);
|
||||||
|
|
||||||
|
index.update_documents(
|
||||||
|
UpdateFormat::JsonStream,
|
||||||
|
IndexDocumentsMethod::ReplaceDocuments,
|
||||||
|
reader,
|
||||||
|
update_builder,
|
||||||
|
None,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
// the last step: we extract the original milli::Index and close it
|
||||||
|
Arc::try_unwrap(index.0)
|
||||||
|
.map_err(|_e| "[dumps] At this point no one is supposed to have a reference on the index")
|
||||||
|
.unwrap()
|
||||||
|
.prepare_for_closing()
|
||||||
|
.wait();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
@ -75,7 +75,7 @@ pub struct Stats {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl IndexController {
|
impl IndexController {
|
||||||
pub fn new(path: impl AsRef<Path>, options: &Opt) -> anyhow::Result<Self> {
|
pub async fn new(path: impl AsRef<Path>, options: &Opt) -> anyhow::Result<Self> {
|
||||||
let index_size = options.max_mdb_size.get_bytes() as usize;
|
let index_size = options.max_mdb_size.get_bytes() as usize;
|
||||||
let update_store_size = options.max_udb_size.get_bytes() as usize;
|
let update_store_size = options.max_udb_size.get_bytes() as usize;
|
||||||
|
|
||||||
@ -92,7 +92,7 @@ impl IndexController {
|
|||||||
&options.db_path,
|
&options.db_path,
|
||||||
path,
|
path,
|
||||||
index_size,
|
index_size,
|
||||||
);
|
).await?;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -54,7 +54,7 @@ async fn main() -> Result<(), MainError> {
|
|||||||
//snapshot::load_snapshot(&opt.db_path, path, opt.ignore_snapshot_if_db_exists, opt.ignore_missing_snapshot)?;
|
//snapshot::load_snapshot(&opt.db_path, path, opt.ignore_snapshot_if_db_exists, opt.ignore_missing_snapshot)?;
|
||||||
//}
|
//}
|
||||||
|
|
||||||
let data = Data::new(opt.clone())?;
|
let data = Data::new(opt.clone()).await?;
|
||||||
|
|
||||||
//if !opt.no_analytics {
|
//if !opt.no_analytics {
|
||||||
//let analytics_data = data.clone();
|
//let analytics_data = data.clone();
|
||||||
|
Loading…
Reference in New Issue
Block a user