fix the import of the updates in the dump

This commit is contained in:
tamo 2021-05-10 20:20:36 +02:00
parent ef438852cd
commit d767990424
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
7 changed files with 116 additions and 64 deletions

View File

@ -35,15 +35,6 @@ where
Deserialize::deserialize(deserializer).map(Some) Deserialize::deserialize(deserializer).map(Some)
} }
pub fn deserialize_wildcard<'de, I, D>(deserializer: D) -> Result<Option<Option<I>>, D::Error>
where
D: Deserializer<'de>,
I: IntoIterator<Item = String> + Deserialize<'de> + Clone,
{
Ok(<Option<I> as Deserialize>::deserialize(deserializer)?
.map(|item: I| (!item.clone().into_iter().any(|s| s == "*")).then(|| item)))
}
impl Index { impl Index {
pub fn settings(&self) -> anyhow::Result<Settings<Checked>> { pub fn settings(&self) -> anyhow::Result<Settings<Checked>> {
let txn = self.read_txn()?; let txn = self.read_txn()?;

View File

@ -8,7 +8,7 @@ use log::info;
use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use super::{deserialize_some, deserialize_wildcard, Index}; use super::{deserialize_some, Index};
use crate::index_controller::UpdateResult; use crate::index_controller::UpdateResult;
@ -23,14 +23,14 @@ pub struct Unchecked;
pub struct Settings<T> { pub struct Settings<T> {
#[serde( #[serde(
default, default,
deserialize_with = "deserialize_wildcard", deserialize_with = "deserialize_some",
skip_serializing_if = "Option::is_none" skip_serializing_if = "Option::is_none"
)] )]
pub displayed_attributes: Option<Option<Vec<String>>>, pub displayed_attributes: Option<Option<Vec<String>>>,
#[serde( #[serde(
default, default,
deserialize_with = "deserialize_wildcard", deserialize_with = "deserialize_some",
skip_serializing_if = "Option::is_none" skip_serializing_if = "Option::is_none"
)] )]
pub searchable_attributes: Option<Option<Vec<String>>>, pub searchable_attributes: Option<Option<Vec<String>>>,

View File

@ -1,33 +1,29 @@
mod actor;
mod handle_impl;
mod message;
mod v1; mod v1;
mod v2; mod v2;
mod handle_impl;
mod actor;
mod message;
use std::{ use std::{fs::File, path::Path, sync::Arc};
fs::File,
path::Path,
sync::Arc,
};
#[cfg(test)]
use mockall::automock;
use anyhow::bail; use anyhow::bail;
use thiserror::Error;
use heed::EnvOpenOptions; use heed::EnvOpenOptions;
use log::{error, info}; use log::{error, info};
use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat};
#[cfg(test)]
use mockall::automock;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use tempfile::TempDir; use tempfile::TempDir;
use thiserror::Error;
use super::IndexMetadata; use super::IndexMetadata;
use crate::helpers::compression; use crate::helpers::compression;
use crate::index::Index; use crate::index::Index;
use crate::index_controller::uuid_resolver; use crate::index_controller::{uuid_resolver, UpdateStatus};
pub use handle_impl::*;
pub use actor::DumpActor; pub use actor::DumpActor;
pub use handle_impl::*;
pub use message::DumpMsg; pub use message::DumpMsg;
pub type DumpResult<T> = std::result::Result<T, DumpError>; pub type DumpResult<T> = std::result::Result<T, DumpError>;
@ -80,7 +76,6 @@ pub trait DumpActorHandle {
async fn dump_info(&self, uid: String) -> DumpResult<DumpInfo>; async fn dump_info(&self, uid: String) -> DumpResult<DumpInfo>;
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct Metadata { pub struct Metadata {
@ -158,7 +153,6 @@ impl DumpInfo {
} }
} }
pub fn load_dump( pub fn load_dump(
db_path: impl AsRef<Path>, db_path: impl AsRef<Path>,
dump_path: impl AsRef<Path>, dump_path: impl AsRef<Path>,
@ -209,6 +203,22 @@ pub fn load_dump(
let index_path = db_path.join(&format!("indexes/index-{}", uuid)); let index_path = db_path.join(&format!("indexes/index-{}", uuid));
// let update_path = db_path.join(&format!("updates")); // let update_path = db_path.join(&format!("updates"));
info!(
"Importing dump from {} into {}...",
dump_path.display(),
index_path.display()
);
metadata
.dump_version
.import_index(
size,
&dump_path,
&index_path,
idx.meta.primary_key.as_ref().map(|s| s.as_ref()),
)
.unwrap();
info!("Dump importation from {} succeed", dump_path.display());
info!("importing the updates"); info!("importing the updates");
use crate::index_controller::update_actor::UpdateStore; use crate::index_controller::update_actor::UpdateStore;
use std::io::BufRead; use std::io::BufRead;
@ -217,29 +227,39 @@ pub fn load_dump(
let options = EnvOpenOptions::new(); let options = EnvOpenOptions::new();
// create an UpdateStore to import the updates // create an UpdateStore to import the updates
std::fs::create_dir_all(&update_path)?; std::fs::create_dir_all(&update_path)?;
let (update_store, _) = UpdateStore::create(options, update_path)?; let (update_store, _) = UpdateStore::create(options, &update_path)?;
let file = File::open(&dump_path.join("updates.jsonl"))?; let file = File::open(&dump_path.join("updates.jsonl"))?;
let reader = std::io::BufReader::new(file); let reader = std::io::BufReader::new(file);
let mut wtxn = update_store.env.write_txn()?; let mut wtxn = update_store.env.write_txn()?;
for update in reader.lines() { for update in reader.lines() {
let update = serde_json::from_str(&update?)?; let mut update: UpdateStatus = serde_json::from_str(&update?)?;
if let Some(path) = update.content_path_mut() {
*path = update_path.join("update_files").join(&path).into();
}
update_store.register_raw_updates(&mut wtxn, update, uuid)?; update_store.register_raw_updates(&mut wtxn, update, uuid)?;
} }
wtxn.commit()?; wtxn.commit()?;
info!(
"Importing dump from {} into {}...",
dump_path.display(),
index_path.display()
);
metadata
.dump_version
.import_index(size, &dump_path, &index_path, idx.meta.primary_key.as_ref().map(|s| s.as_ref()))
.unwrap();
info!("Dump importation from {} succeed", dump_path.display());
} }
// finally we can move all the unprocessed update file into our new DB
let update_path = tmp_dir_path.join("update_files");
let files: Vec<_> = std::fs::read_dir(&db_path.join("updates"))?
.map(|file| file.unwrap().path())
.collect();
let db_update_path = db_path.join("updates/update_files");
eprintln!("path {:?} exists: {:?}", update_path, update_path.exists());
eprintln!(
"path {:?} exists: {:?}",
db_update_path,
db_update_path.exists()
);
let _ = std::fs::remove_dir_all(db_update_path);
std::fs::rename(
tmp_dir_path.join("update_files"),
db_path.join("updates/update_files"),
)
.unwrap();
info!("Dump importation from {} succeed", dump_path.display()); info!("Dump importation from {} succeed", dump_path.display());
Ok(()) Ok(())

View File

@ -3,7 +3,7 @@ use std::collections::{BTreeMap, BTreeSet};
use log::warn; use log::warn;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::index_controller; use crate::index_controller;
use crate::index::{deserialize_wildcard, deserialize_some}; use crate::index::deserialize_some;
use super::*; use super::*;
/// This is the settings used in the last version of meilisearch exporting dump in V1 /// This is the settings used in the last version of meilisearch exporting dump in V1
@ -14,9 +14,9 @@ struct Settings {
pub ranking_rules: Option<Option<Vec<String>>>, pub ranking_rules: Option<Option<Vec<String>>>,
#[serde(default, deserialize_with = "deserialize_some")] #[serde(default, deserialize_with = "deserialize_some")]
pub distinct_attribute: Option<Option<String>>, pub distinct_attribute: Option<Option<String>>,
#[serde(default, deserialize_with = "deserialize_wildcard")] #[serde(default, deserialize_with = "deserialize_some")]
pub searchable_attributes: Option<Option<Vec<String>>>, pub searchable_attributes: Option<Option<Vec<String>>>,
#[serde(default, deserialize_with = "deserialize_wildcard")] #[serde(default, deserialize_with = "deserialize_some")]
pub displayed_attributes: Option<Option<BTreeSet<String>>>, pub displayed_attributes: Option<Option<BTreeSet<String>>>,
#[serde(default, deserialize_with = "deserialize_some")] #[serde(default, deserialize_with = "deserialize_some")]
pub stop_words: Option<Option<BTreeSet<String>>>, pub stop_words: Option<Option<BTreeSet<String>>>,

View File

@ -1,13 +1,14 @@
use std::{borrow::Cow, path::PathBuf};
use std::collections::{BTreeMap, HashSet}; use std::collections::{BTreeMap, HashSet};
use std::convert::TryInto; use std::convert::TryInto;
use std::fs::{copy, create_dir_all, remove_file, File}; use std::fs::{copy, create_dir_all, remove_file, File};
use std::mem::size_of; use std::mem::size_of;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::{borrow::Cow, path::PathBuf};
use anyhow::Context; use anyhow::Context;
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
use futures::StreamExt;
use heed::types::{ByteSlice, OwnedType, SerdeJson}; use heed::types::{ByteSlice, OwnedType, SerdeJson};
use heed::zerocopy::U64; use heed::zerocopy::U64;
use heed::{BytesDecode, BytesEncode, CompactionOption, Database, Env, EnvOpenOptions}; use heed::{BytesDecode, BytesEncode, CompactionOption, Database, Env, EnvOpenOptions};
@ -16,11 +17,10 @@ use parking_lot::{Mutex, MutexGuard};
use tokio::runtime::Handle; use tokio::runtime::Handle;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use uuid::Uuid; use uuid::Uuid;
use futures::StreamExt;
use super::UpdateMeta; use super::UpdateMeta;
use crate::helpers::EnvSizer; use crate::helpers::EnvSizer;
use crate::index_controller::{IndexActorHandle, updates::*, index_actor::CONCURRENT_INDEX_MSG}; use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle};
#[allow(clippy::upper_case_acronyms)] #[allow(clippy::upper_case_acronyms)]
type BEU64 = U64<heed::byteorder::BE>; type BEU64 = U64<heed::byteorder::BE>;
@ -180,7 +180,10 @@ pub struct UpdateStore {
} }
impl UpdateStore { impl UpdateStore {
pub fn create(mut options: EnvOpenOptions, path: impl AsRef<Path>) -> anyhow::Result<(Self, mpsc::Receiver<()>)> { pub fn create(
mut options: EnvOpenOptions,
path: impl AsRef<Path>,
) -> anyhow::Result<(Self, mpsc::Receiver<()>)> {
options.max_dbs(5); options.max_dbs(5);
let env = options.open(path)?; let env = options.open(path)?;
@ -194,7 +197,17 @@ impl UpdateStore {
// Send a first notification to trigger the process. // Send a first notification to trigger the process.
let _ = notification_sender.send(()); let _ = notification_sender.send(());
Ok((Self { env, pending_queue, next_update_id, updates, state, notification_sender }, notification_receiver)) Ok((
Self {
env,
pending_queue,
next_update_id,
updates,
state,
notification_sender,
},
notification_receiver,
))
} }
pub fn open( pub fn open(
@ -208,7 +221,8 @@ impl UpdateStore {
// Init update loop to perform any pending updates at launch. // Init update loop to perform any pending updates at launch.
// Since we just launched the update store, and we still own the receiving end of the // Since we just launched the update store, and we still own the receiving end of the
// channel, this call is guaranteed to succeed. // channel, this call is guaranteed to succeed.
update_store.notification_sender update_store
.notification_sender
.try_send(()) .try_send(())
.expect("Failed to init update store"); .expect("Failed to init update store");
@ -303,22 +317,28 @@ impl UpdateStore {
/// Push already processed update in the UpdateStore without triggering the notification /// Push already processed update in the UpdateStore without triggering the notification
/// process. This is useful for the dumps. /// process. This is useful for the dumps.
pub fn register_raw_updates ( pub fn register_raw_updates(
&self, &self,
wtxn: &mut heed::RwTxn, wtxn: &mut heed::RwTxn,
update: UpdateStatus, update: UpdateStatus,
index_uuid: Uuid, index_uuid: Uuid,
) -> heed::Result<()> { ) -> heed::Result<()> {
// TODO: TAMO: since I don't want to store anything I currently generate a new global ID
// everytime I encounter an enqueued update, can we do better?
match update { match update {
UpdateStatus::Enqueued(enqueued) => { UpdateStatus::Enqueued(enqueued) => {
let (global_id, update_id) = self.next_update_id(wtxn, index_uuid)?; let (global_id, _update_id) = self.next_update_id(wtxn, index_uuid)?;
self.pending_queue.remap_key_type::<PendingKeyCodec>().put(wtxn, &(global_id, index_uuid, update_id), &enqueued)?; self.pending_queue.remap_key_type::<PendingKeyCodec>().put(
wtxn,
&(global_id, index_uuid, enqueued.id()),
&enqueued,
)?;
} }
_ => { _ => {
let update_id = self.next_update_id_raw(wtxn, index_uuid)?; let _update_id = self.next_update_id_raw(wtxn, index_uuid)?;
self.updates.remap_key_type::<UpdateKeyCodec>().put(wtxn, &(index_uuid, update_id), &update)?; self.updates.remap_key_type::<UpdateKeyCodec>().put(
wtxn,
&(index_uuid, update.id()),
&update,
)?;
} }
} }
Ok(()) Ok(())
@ -544,7 +564,7 @@ impl UpdateStore {
&self, &self,
uuids: &HashSet<(String, Uuid)>, uuids: &HashSet<(String, Uuid)>,
path: PathBuf, path: PathBuf,
handle: impl IndexActorHandle handle: impl IndexActorHandle,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
use std::io::prelude::*; use std::io::prelude::*;
let state_lock = self.state.write(); let state_lock = self.state.write();
@ -552,12 +572,31 @@ impl UpdateStore {
let txn = self.env.write_txn()?; let txn = self.env.write_txn()?;
for (uid, uuid) in uuids.iter() { for (index_uid, index_uuid) in uuids.iter() {
let file = File::create(path.join(uid).join("updates.jsonl"))?; let file = File::create(path.join(index_uid).join("updates.jsonl"))?;
let mut file = std::io::BufWriter::new(file); let mut file = std::io::BufWriter::new(file);
for update in &self.list(*uuid)? { let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data();
serde_json::to_writer(&mut file, update)?; for entry in pendings {
let ((_, uuid, _), pending) = entry?;
if &uuid == index_uuid {
let mut update: UpdateStatus = pending.decode()?.into();
if let Some(path) = update.content_path_mut() {
*path = path.file_name().expect("update path can't be empty").into();
}
serde_json::to_writer(&mut file, &update)?;
file.write_all(b"\n")?;
}
}
let updates = self.updates.prefix_iter(&txn, index_uuid.as_bytes())?;
for entry in updates {
let (_, update) = entry?;
let mut update = update.clone();
if let Some(path) = update.content_path_mut() {
*path = path.file_name().expect("update path can't be empty").into();
}
serde_json::to_writer(&mut file, &update)?;
file.write_all(b"\n")?; file.write_all(b"\n")?;
} }
} }

View File

@ -188,7 +188,7 @@ impl Failed {
} }
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize, Clone)]
#[serde(tag = "status", rename_all = "camelCase")] #[serde(tag = "status", rename_all = "camelCase")]
pub enum UpdateStatus { pub enum UpdateStatus {
Processing(Processing), Processing(Processing),

View File

@ -73,7 +73,7 @@ async fn reset_all_settings() {
let server = Server::new().await; let server = Server::new().await;
let index = server.index("test"); let index = server.index("test");
index index
.update_settings(json!({"displayedAttributes": ["foo"], "searchableAttributes": ["bar"], "stopWords": ["the"] })) .update_settings(json!({"displayedAttributes": ["foo"], "searchableAttributes": ["bar"], "stopWords": ["the"], "attributesForFaceting": { "toto": "string" } }))
.await; .await;
index.wait_update_id(0).await; index.wait_update_id(0).await;
let (response, code) = index.settings().await; let (response, code) = index.settings().await;
@ -81,6 +81,7 @@ async fn reset_all_settings() {
assert_eq!(response["displayedAttributes"], json!(["foo"])); assert_eq!(response["displayedAttributes"], json!(["foo"]));
assert_eq!(response["searchableAttributes"], json!(["bar"])); assert_eq!(response["searchableAttributes"], json!(["bar"]));
assert_eq!(response["stopWords"], json!(["the"])); assert_eq!(response["stopWords"], json!(["the"]));
assert_eq!(response["attributesForFaceting"], json!({"toto": "string"}));
index.delete_settings().await; index.delete_settings().await;
index.wait_update_id(1).await; index.wait_update_id(1).await;
@ -90,6 +91,7 @@ async fn reset_all_settings() {
assert_eq!(response["displayedAttributes"], json!(["*"])); assert_eq!(response["displayedAttributes"], json!(["*"]));
assert_eq!(response["searchableAttributes"], json!(["*"])); assert_eq!(response["searchableAttributes"], json!(["*"]));
assert_eq!(response["stopWords"], json!([])); assert_eq!(response["stopWords"], json!([]));
assert_eq!(response["attributesForFaceting"], json!({}));
} }
#[actix_rt::test] #[actix_rt::test]