diff --git a/meilisearch-http/src/index/mod.rs b/meilisearch-http/src/index/mod.rs index b0c145001..ceaa6103e 100644 --- a/meilisearch-http/src/index/mod.rs +++ b/meilisearch-http/src/index/mod.rs @@ -35,15 +35,6 @@ where Deserialize::deserialize(deserializer).map(Some) } -pub fn deserialize_wildcard<'de, I, D>(deserializer: D) -> Result>, D::Error> -where - D: Deserializer<'de>, - I: IntoIterator + Deserialize<'de> + Clone, -{ - Ok( as Deserialize>::deserialize(deserializer)? - .map(|item: I| (!item.clone().into_iter().any(|s| s == "*")).then(|| item))) -} - impl Index { pub fn settings(&self) -> anyhow::Result> { let txn = self.read_txn()?; diff --git a/meilisearch-http/src/index/updates.rs b/meilisearch-http/src/index/updates.rs index 75d0dc3e6..67edc15d0 100644 --- a/meilisearch-http/src/index/updates.rs +++ b/meilisearch-http/src/index/updates.rs @@ -8,7 +8,7 @@ use log::info; use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; use serde::{Deserialize, Serialize}; -use super::{deserialize_some, deserialize_wildcard, Index}; +use super::{deserialize_some, Index}; use crate::index_controller::UpdateResult; @@ -23,14 +23,14 @@ pub struct Unchecked; pub struct Settings { #[serde( default, - deserialize_with = "deserialize_wildcard", + deserialize_with = "deserialize_some", skip_serializing_if = "Option::is_none" )] pub displayed_attributes: Option>>, #[serde( default, - deserialize_with = "deserialize_wildcard", + deserialize_with = "deserialize_some", skip_serializing_if = "Option::is_none" )] pub searchable_attributes: Option>>, diff --git a/meilisearch-http/src/index_controller/dump_actor/mod.rs b/meilisearch-http/src/index_controller/dump_actor/mod.rs index eb2bc4684..f79cd839b 100644 --- a/meilisearch-http/src/index_controller/dump_actor/mod.rs +++ b/meilisearch-http/src/index_controller/dump_actor/mod.rs @@ -1,33 +1,29 @@ +mod actor; +mod handle_impl; +mod message; mod v1; mod v2; -mod handle_impl; -mod actor; -mod message; -use std::{ - fs::File, - path::Path, - sync::Arc, -}; +use std::{fs::File, path::Path, sync::Arc}; -#[cfg(test)] -use mockall::automock; use anyhow::bail; -use thiserror::Error; use heed::EnvOpenOptions; use log::{error, info}; use milli::update::{IndexDocumentsMethod, UpdateBuilder, UpdateFormat}; +#[cfg(test)] +use mockall::automock; use serde::{Deserialize, Serialize}; use serde_json::json; use tempfile::TempDir; +use thiserror::Error; use super::IndexMetadata; use crate::helpers::compression; use crate::index::Index; -use crate::index_controller::uuid_resolver; +use crate::index_controller::{uuid_resolver, UpdateStatus}; -pub use handle_impl::*; pub use actor::DumpActor; +pub use handle_impl::*; pub use message::DumpMsg; pub type DumpResult = std::result::Result; @@ -80,7 +76,6 @@ pub trait DumpActorHandle { async fn dump_info(&self, uid: String) -> DumpResult; } - #[derive(Debug, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct Metadata { @@ -158,7 +153,6 @@ impl DumpInfo { } } - pub fn load_dump( db_path: impl AsRef, dump_path: impl AsRef, @@ -209,6 +203,22 @@ pub fn load_dump( let index_path = db_path.join(&format!("indexes/index-{}", uuid)); // let update_path = db_path.join(&format!("updates")); + info!( + "Importing dump from {} into {}...", + dump_path.display(), + index_path.display() + ); + metadata + .dump_version + .import_index( + size, + &dump_path, + &index_path, + idx.meta.primary_key.as_ref().map(|s| s.as_ref()), + ) + .unwrap(); + info!("Dump importation from {} succeed", dump_path.display()); + info!("importing the updates"); use crate::index_controller::update_actor::UpdateStore; use std::io::BufRead; @@ -217,29 +227,39 @@ pub fn load_dump( let options = EnvOpenOptions::new(); // create an UpdateStore to import the updates std::fs::create_dir_all(&update_path)?; - let (update_store, _) = UpdateStore::create(options, update_path)?; + let (update_store, _) = UpdateStore::create(options, &update_path)?; let file = File::open(&dump_path.join("updates.jsonl"))?; let reader = std::io::BufReader::new(file); let mut wtxn = update_store.env.write_txn()?; for update in reader.lines() { - let update = serde_json::from_str(&update?)?; + let mut update: UpdateStatus = serde_json::from_str(&update?)?; + if let Some(path) = update.content_path_mut() { + *path = update_path.join("update_files").join(&path).into(); + } update_store.register_raw_updates(&mut wtxn, update, uuid)?; } wtxn.commit()?; - - info!( - "Importing dump from {} into {}...", - dump_path.display(), - index_path.display() - ); - metadata - .dump_version - .import_index(size, &dump_path, &index_path, idx.meta.primary_key.as_ref().map(|s| s.as_ref())) - .unwrap(); - info!("Dump importation from {} succeed", dump_path.display()); } + // finally we can move all the unprocessed update file into our new DB + let update_path = tmp_dir_path.join("update_files"); + let files: Vec<_> = std::fs::read_dir(&db_path.join("updates"))? + .map(|file| file.unwrap().path()) + .collect(); + let db_update_path = db_path.join("updates/update_files"); + eprintln!("path {:?} exists: {:?}", update_path, update_path.exists()); + eprintln!( + "path {:?} exists: {:?}", + db_update_path, + db_update_path.exists() + ); + let _ = std::fs::remove_dir_all(db_update_path); + std::fs::rename( + tmp_dir_path.join("update_files"), + db_path.join("updates/update_files"), + ) + .unwrap(); info!("Dump importation from {} succeed", dump_path.display()); Ok(()) diff --git a/meilisearch-http/src/index_controller/dump_actor/v1.rs b/meilisearch-http/src/index_controller/dump_actor/v1.rs index 89c5b24c0..92f8bf712 100644 --- a/meilisearch-http/src/index_controller/dump_actor/v1.rs +++ b/meilisearch-http/src/index_controller/dump_actor/v1.rs @@ -3,7 +3,7 @@ use std::collections::{BTreeMap, BTreeSet}; use log::warn; use serde::{Deserialize, Serialize}; use crate::index_controller; -use crate::index::{deserialize_wildcard, deserialize_some}; +use crate::index::deserialize_some; use super::*; /// This is the settings used in the last version of meilisearch exporting dump in V1 @@ -14,9 +14,9 @@ struct Settings { pub ranking_rules: Option>>, #[serde(default, deserialize_with = "deserialize_some")] pub distinct_attribute: Option>, - #[serde(default, deserialize_with = "deserialize_wildcard")] + #[serde(default, deserialize_with = "deserialize_some")] pub searchable_attributes: Option>>, - #[serde(default, deserialize_with = "deserialize_wildcard")] + #[serde(default, deserialize_with = "deserialize_some")] pub displayed_attributes: Option>>, #[serde(default, deserialize_with = "deserialize_some")] pub stop_words: Option>>, diff --git a/meilisearch-http/src/index_controller/update_actor/update_store.rs b/meilisearch-http/src/index_controller/update_actor/update_store.rs index 745311f05..07dfdf273 100644 --- a/meilisearch-http/src/index_controller/update_actor/update_store.rs +++ b/meilisearch-http/src/index_controller/update_actor/update_store.rs @@ -1,13 +1,14 @@ -use std::{borrow::Cow, path::PathBuf}; use std::collections::{BTreeMap, HashSet}; use std::convert::TryInto; use std::fs::{copy, create_dir_all, remove_file, File}; use std::mem::size_of; use std::path::Path; use std::sync::Arc; +use std::{borrow::Cow, path::PathBuf}; use anyhow::Context; use arc_swap::ArcSwap; +use futures::StreamExt; use heed::types::{ByteSlice, OwnedType, SerdeJson}; use heed::zerocopy::U64; use heed::{BytesDecode, BytesEncode, CompactionOption, Database, Env, EnvOpenOptions}; @@ -16,11 +17,10 @@ use parking_lot::{Mutex, MutexGuard}; use tokio::runtime::Handle; use tokio::sync::mpsc; use uuid::Uuid; -use futures::StreamExt; use super::UpdateMeta; use crate::helpers::EnvSizer; -use crate::index_controller::{IndexActorHandle, updates::*, index_actor::CONCURRENT_INDEX_MSG}; +use crate::index_controller::{index_actor::CONCURRENT_INDEX_MSG, updates::*, IndexActorHandle}; #[allow(clippy::upper_case_acronyms)] type BEU64 = U64; @@ -180,7 +180,10 @@ pub struct UpdateStore { } impl UpdateStore { - pub fn create(mut options: EnvOpenOptions, path: impl AsRef) -> anyhow::Result<(Self, mpsc::Receiver<()>)> { + pub fn create( + mut options: EnvOpenOptions, + path: impl AsRef, + ) -> anyhow::Result<(Self, mpsc::Receiver<()>)> { options.max_dbs(5); let env = options.open(path)?; @@ -194,7 +197,17 @@ impl UpdateStore { // Send a first notification to trigger the process. let _ = notification_sender.send(()); - Ok((Self { env, pending_queue, next_update_id, updates, state, notification_sender }, notification_receiver)) + Ok(( + Self { + env, + pending_queue, + next_update_id, + updates, + state, + notification_sender, + }, + notification_receiver, + )) } pub fn open( @@ -208,7 +221,8 @@ impl UpdateStore { // Init update loop to perform any pending updates at launch. // Since we just launched the update store, and we still own the receiving end of the // channel, this call is guaranteed to succeed. - update_store.notification_sender + update_store + .notification_sender .try_send(()) .expect("Failed to init update store"); @@ -303,22 +317,28 @@ impl UpdateStore { /// Push already processed update in the UpdateStore without triggering the notification /// process. This is useful for the dumps. - pub fn register_raw_updates ( + pub fn register_raw_updates( &self, wtxn: &mut heed::RwTxn, update: UpdateStatus, index_uuid: Uuid, ) -> heed::Result<()> { - // TODO: TAMO: since I don't want to store anything I currently generate a new global ID - // everytime I encounter an enqueued update, can we do better? match update { UpdateStatus::Enqueued(enqueued) => { - let (global_id, update_id) = self.next_update_id(wtxn, index_uuid)?; - self.pending_queue.remap_key_type::().put(wtxn, &(global_id, index_uuid, update_id), &enqueued)?; + let (global_id, _update_id) = self.next_update_id(wtxn, index_uuid)?; + self.pending_queue.remap_key_type::().put( + wtxn, + &(global_id, index_uuid, enqueued.id()), + &enqueued, + )?; } _ => { - let update_id = self.next_update_id_raw(wtxn, index_uuid)?; - self.updates.remap_key_type::().put(wtxn, &(index_uuid, update_id), &update)?; + let _update_id = self.next_update_id_raw(wtxn, index_uuid)?; + self.updates.remap_key_type::().put( + wtxn, + &(index_uuid, update.id()), + &update, + )?; } } Ok(()) @@ -544,20 +564,39 @@ impl UpdateStore { &self, uuids: &HashSet<(String, Uuid)>, path: PathBuf, - handle: impl IndexActorHandle - ) -> anyhow::Result<()> { + handle: impl IndexActorHandle, + ) -> anyhow::Result<()> { use std::io::prelude::*; let state_lock = self.state.write(); state_lock.swap(State::Dumping); let txn = self.env.write_txn()?; - for (uid, uuid) in uuids.iter() { - let file = File::create(path.join(uid).join("updates.jsonl"))?; + for (index_uid, index_uuid) in uuids.iter() { + let file = File::create(path.join(index_uid).join("updates.jsonl"))?; let mut file = std::io::BufWriter::new(file); - for update in &self.list(*uuid)? { - serde_json::to_writer(&mut file, update)?; + let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); + for entry in pendings { + let ((_, uuid, _), pending) = entry?; + if &uuid == index_uuid { + let mut update: UpdateStatus = pending.decode()?.into(); + if let Some(path) = update.content_path_mut() { + *path = path.file_name().expect("update path can't be empty").into(); + } + serde_json::to_writer(&mut file, &update)?; + file.write_all(b"\n")?; + } + } + + let updates = self.updates.prefix_iter(&txn, index_uuid.as_bytes())?; + for entry in updates { + let (_, update) = entry?; + let mut update = update.clone(); + if let Some(path) = update.content_path_mut() { + *path = path.file_name().expect("update path can't be empty").into(); + } + serde_json::to_writer(&mut file, &update)?; file.write_all(b"\n")?; } } diff --git a/meilisearch-http/src/index_controller/updates.rs b/meilisearch-http/src/index_controller/updates.rs index 6b5ef345d..31f0005f8 100644 --- a/meilisearch-http/src/index_controller/updates.rs +++ b/meilisearch-http/src/index_controller/updates.rs @@ -188,7 +188,7 @@ impl Failed { } } -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] #[serde(tag = "status", rename_all = "camelCase")] pub enum UpdateStatus { Processing(Processing), diff --git a/meilisearch-http/tests/settings/get_settings.rs b/meilisearch-http/tests/settings/get_settings.rs index 4230e19f8..ab688076d 100644 --- a/meilisearch-http/tests/settings/get_settings.rs +++ b/meilisearch-http/tests/settings/get_settings.rs @@ -73,7 +73,7 @@ async fn reset_all_settings() { let server = Server::new().await; let index = server.index("test"); index - .update_settings(json!({"displayedAttributes": ["foo"], "searchableAttributes": ["bar"], "stopWords": ["the"] })) + .update_settings(json!({"displayedAttributes": ["foo"], "searchableAttributes": ["bar"], "stopWords": ["the"], "attributesForFaceting": { "toto": "string" } })) .await; index.wait_update_id(0).await; let (response, code) = index.settings().await; @@ -81,6 +81,7 @@ async fn reset_all_settings() { assert_eq!(response["displayedAttributes"], json!(["foo"])); assert_eq!(response["searchableAttributes"], json!(["bar"])); assert_eq!(response["stopWords"], json!(["the"])); + assert_eq!(response["attributesForFaceting"], json!({"toto": "string"})); index.delete_settings().await; index.wait_update_id(1).await; @@ -90,6 +91,7 @@ async fn reset_all_settings() { assert_eq!(response["displayedAttributes"], json!(["*"])); assert_eq!(response["searchableAttributes"], json!(["*"])); assert_eq!(response["stopWords"], json!([])); + assert_eq!(response["attributesForFaceting"], json!({})); } #[actix_rt::test]