From 1a65eed724fcef7fd5e87cacfd4578400f7bbcad Mon Sep 17 00:00:00 2001 From: mpostma Date: Wed, 9 Jun 2021 11:52:36 +0200 Subject: [PATCH 1/8] fix index creation bug --- .../src/index_controller/index_actor/store.rs | 7 +++- meilisearch-http/src/index_controller/mod.rs | 3 +- .../update_actor/store/mod.rs | 4 +-- .../index_controller/uuid_resolver/actor.rs | 10 ------ .../uuid_resolver/handle_impl.rs | 9 ----- .../index_controller/uuid_resolver/message.rs | 4 --- .../src/index_controller/uuid_resolver/mod.rs | 1 - .../index_controller/uuid_resolver/store.rs | 34 ++++--------------- 8 files changed, 16 insertions(+), 56 deletions(-) diff --git a/meilisearch-http/src/index_controller/index_actor/store.rs b/meilisearch-http/src/index_controller/index_actor/store.rs index 11791be48..8f892587d 100644 --- a/meilisearch-http/src/index_controller/index_actor/store.rs +++ b/meilisearch-http/src/index_controller/index_actor/store.rs @@ -40,6 +40,11 @@ impl MapIndexStore { #[async_trait::async_trait] impl IndexStore for MapIndexStore { async fn create(&self, uuid: Uuid, primary_key: Option) -> IndexResult { + let mut lock = self.index_store.write().await; + + if let Some(index) = lock.get(&uuid) { + return Ok(index.clone()) + } let path = self.path.join(format!("index-{}", uuid)); if path.exists() { return Err(IndexError::IndexAlreadyExists); @@ -57,7 +62,7 @@ impl IndexStore for MapIndexStore { }) .await??; - self.index_store.write().await.insert(uuid, index.clone()); + lock.insert(uuid, index.clone()); Ok(index) } diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index f562d2185..4d5a52666 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -249,8 +249,9 @@ impl IndexController { ) -> anyhow::Result { let IndexSettings { uid, primary_key } = index_settings; let uid = uid.ok_or_else(|| anyhow::anyhow!("Can't create an index without a uid."))?; - let uuid = self.uuid_resolver.create(uid.clone()).await?; + let uuid = Uuid::new_v4(); let meta = self.index_handle.create_index(uuid, primary_key).await?; + self.uuid_resolver.insert(uid.clone(), uuid).await?; let meta = IndexMetadata { uuid, name: uid.clone(), diff --git a/meilisearch-http/src/index_controller/update_actor/store/mod.rs b/meilisearch-http/src/index_controller/update_actor/store/mod.rs index 39de02ef1..331e7b2bb 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/mod.rs @@ -229,7 +229,7 @@ impl UpdateStore { let mut txn = self.env.write_txn()?; let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?; - let meta = Enqueued::new(meta, update_id, content); + let meta = dbg!(Enqueued::new(meta, update_id, content)); self.pending_queue .put(&mut txn, &(global_id, index_uuid, update_id), &meta)?; @@ -280,7 +280,7 @@ impl UpdateStore { ) -> anyhow::Result> { // Create a read transaction to be able to retrieve the pending update in order. let rtxn = self.env.read_txn()?; - let first_meta = self.pending_queue.first(&rtxn)?; + let first_meta = dbg!(self.pending_queue.first(&rtxn)?); drop(rtxn); // If there is a pending update we process and only keep diff --git a/meilisearch-http/src/index_controller/uuid_resolver/actor.rs b/meilisearch-http/src/index_controller/uuid_resolver/actor.rs index 0211cef25..74158ce04 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/actor.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/actor.rs @@ -23,9 +23,6 @@ impl UuidResolverActor { loop { match self.inbox.recv().await { - Some(Create { uid: name, ret }) => { - let _ = ret.send(self.handle_create(name).await); - } Some(Get { uid: name, ret }) => { let _ = ret.send(self.handle_get(name).await); } @@ -55,13 +52,6 @@ impl UuidResolverActor { warn!("exiting uuid resolver loop"); } - async fn handle_create(&self, uid: String) -> Result { - if !is_index_uid_valid(&uid) { - return Err(UuidResolverError::BadlyFormatted(uid)); - } - self.store.create_uuid(uid, true).await - } - async fn handle_get(&self, uid: String) -> Result { self.store .get_uuid(uid.clone()) diff --git a/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs b/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs index 981beb0f6..af710dd87 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/handle_impl.rs @@ -32,15 +32,6 @@ impl UuidResolverHandle for UuidResolverHandleImpl { .expect("Uuid resolver actor has been killed")?) } - async fn create(&self, name: String) -> anyhow::Result { - let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::Create { uid: name, ret }; - let _ = self.sender.send(msg).await; - Ok(receiver - .await - .expect("Uuid resolver actor has been killed")?) - } - async fn delete(&self, name: String) -> anyhow::Result { let (ret, receiver) = oneshot::channel(); let msg = UuidResolveMsg::Delete { uid: name, ret }; diff --git a/meilisearch-http/src/index_controller/uuid_resolver/message.rs b/meilisearch-http/src/index_controller/uuid_resolver/message.rs index 2092c67fd..46d9b585f 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/message.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/message.rs @@ -11,10 +11,6 @@ pub enum UuidResolveMsg { uid: String, ret: oneshot::Sender>, }, - Create { - uid: String, - ret: oneshot::Sender>, - }, Delete { uid: String, ret: oneshot::Sender>, diff --git a/meilisearch-http/src/index_controller/uuid_resolver/mod.rs b/meilisearch-http/src/index_controller/uuid_resolver/mod.rs index 5bddadf02..3c3b5fd06 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/mod.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/mod.rs @@ -28,7 +28,6 @@ pub type Result = std::result::Result; pub trait UuidResolverHandle { async fn get(&self, name: String) -> Result; async fn insert(&self, name: String, uuid: Uuid) -> anyhow::Result<()>; - async fn create(&self, name: String) -> anyhow::Result; async fn delete(&self, name: String) -> anyhow::Result; async fn list(&self) -> anyhow::Result>; async fn snapshot(&self, path: PathBuf) -> Result>; diff --git a/meilisearch-http/src/index_controller/uuid_resolver/store.rs b/meilisearch-http/src/index_controller/uuid_resolver/store.rs index 1d6ada269..5f7c23f97 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/store.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/store.rs @@ -8,7 +8,7 @@ use heed::{CompactionOption, Database, Env, EnvOpenOptions}; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use super::{Result, UuidResolverError, UUID_STORE_SIZE}; +use super::{Result, UUID_STORE_SIZE, UuidResolverError}; use crate::helpers::EnvSizer; #[derive(Serialize, Deserialize)] @@ -23,7 +23,6 @@ const UUIDS_DB_PATH: &str = "index_uuids"; pub trait UuidStore: Sized { // Create a new entry for `name`. Return an error if `err` and the entry already exists, return // the uuid otherwise. - async fn create_uuid(&self, uid: String, err: bool) -> Result; async fn get_uuid(&self, uid: String) -> Result>; async fn delete(&self, uid: String) -> Result>; async fn list(&self) -> Result>; @@ -50,27 +49,6 @@ impl HeedUuidStore { Ok(Self { env, db }) } - pub fn create_uuid(&self, name: String, err: bool) -> Result { - let env = self.env.clone(); - let db = self.db; - let mut txn = env.write_txn()?; - match db.get(&txn, &name)? { - Some(uuid) => { - if err { - Err(UuidResolverError::NameAlreadyExist) - } else { - let uuid = Uuid::from_slice(uuid)?; - Ok(uuid) - } - } - None => { - let uuid = Uuid::new_v4(); - db.put(&mut txn, &name, uuid.as_bytes())?; - txn.commit()?; - Ok(uuid) - } - } - } pub fn get_uuid(&self, name: String) -> Result> { let env = self.env.clone(); let db = self.db; @@ -116,6 +94,11 @@ impl HeedUuidStore { let env = self.env.clone(); let db = self.db; let mut txn = env.write_txn()?; + + if db.get(&txn, &name)?.is_some() { + return Err(UuidResolverError::NameAlreadyExist) + } + db.put(&mut txn, &name, uuid.as_bytes())?; txn.commit()?; Ok(()) @@ -205,11 +188,6 @@ impl HeedUuidStore { #[async_trait::async_trait] impl UuidStore for HeedUuidStore { - async fn create_uuid(&self, name: String, err: bool) -> Result { - let this = self.clone(); - tokio::task::spawn_blocking(move || this.create_uuid(name, err)).await? - } - async fn get_uuid(&self, name: String) -> Result> { let this = self.clone(); tokio::task::spawn_blocking(move || this.get_uuid(name)).await? From 2716c1aebb1cf9961c5da181b10b384baf3254ea Mon Sep 17 00:00:00 2001 From: mpostma Date: Wed, 9 Jun 2021 16:19:45 +0200 Subject: [PATCH 2/8] fix update store lock --- .../src/index_controller/index_actor/store.rs | 2 +- .../index_controller/update_actor/actor.rs | 17 ++- .../update_actor/handle_impl.rs | 66 +++++++-- .../src/index_controller/update_actor/mod.rs | 4 + .../update_actor/store/mod.rs | 138 ++++++++++++------ .../src/index_controller/updates.rs | 2 +- .../index_controller/uuid_resolver/store.rs | 4 +- 7 files changed, 163 insertions(+), 70 deletions(-) diff --git a/meilisearch-http/src/index_controller/index_actor/store.rs b/meilisearch-http/src/index_controller/index_actor/store.rs index 8f892587d..9b6b057c3 100644 --- a/meilisearch-http/src/index_controller/index_actor/store.rs +++ b/meilisearch-http/src/index_controller/index_actor/store.rs @@ -43,7 +43,7 @@ impl IndexStore for MapIndexStore { let mut lock = self.index_store.write().await; if let Some(index) = lock.get(&uuid) { - return Ok(index.clone()) + return Ok(index.clone()); } let path = self.path.join(format!("index-{}", uuid)); if path.exists() { diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs index 7779f2556..76cba7e07 100644 --- a/meilisearch-http/src/index_controller/update_actor/actor.rs +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -1,6 +1,7 @@ use std::collections::HashSet; use std::io::SeekFrom; use std::path::{Path, PathBuf}; +use std::sync::atomic::AtomicBool; use std::sync::Arc; use log::info; @@ -19,6 +20,7 @@ pub struct UpdateActor { store: Arc, inbox: mpsc::Receiver>, index_handle: I, + must_exit: Arc, } impl UpdateActor @@ -39,14 +41,17 @@ where let mut options = heed::EnvOpenOptions::new(); options.map_size(update_db_size); - let store = UpdateStore::open(options, &path, index_handle.clone())?; + let must_exit = Arc::new(AtomicBool::new(false)); + + let store = UpdateStore::open(options, &path, index_handle.clone(), must_exit.clone())?; std::fs::create_dir_all(path.join("update_files"))?; - assert!(path.exists()); + Ok(Self { path, store, inbox, index_handle, + must_exit, }) } @@ -56,7 +61,13 @@ where info!("Started update actor."); loop { - match self.inbox.recv().await { + let msg = self.inbox.recv().await; + + if self.must_exit.load(std::sync::atomic::Ordering::Relaxed) { + break; + } + + match msg { Some(Update { uuid, meta, diff --git a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs index cc5ba9757..7844bf855 100644 --- a/meilisearch-http/src/index_controller/update_actor/handle_impl.rs +++ b/meilisearch-http/src/index_controller/update_actor/handle_impl.rs @@ -7,7 +7,8 @@ use uuid::Uuid; use crate::index_controller::{IndexActorHandle, UpdateStatus}; use super::{ - PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateMeta, UpdateMsg, UpdateStoreInfo, + PayloadData, Result, UpdateActor, UpdateActorHandle, UpdateError, UpdateMeta, UpdateMsg, + UpdateStoreInfo, }; #[derive(Clone)] @@ -47,42 +48,72 @@ where async fn get_all_updates_status(&self, uuid: Uuid) -> Result> { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::ListUpdates { uuid, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") + self.sender + .send(msg) + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)?; + receiver + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)? } async fn update_status(&self, uuid: Uuid, id: u64) -> Result { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::GetUpdate { uuid, id, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") + self.sender + .send(msg) + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)?; + receiver + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)? } async fn delete(&self, uuid: Uuid) -> Result<()> { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::Delete { uuid, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") + self.sender + .send(msg) + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)?; + receiver + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)? } async fn snapshot(&self, uuids: HashSet, path: PathBuf) -> Result<()> { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::Snapshot { uuids, path, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") + self.sender + .send(msg) + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)?; + receiver + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)? } async fn dump(&self, uuids: HashSet, path: PathBuf) -> Result<()> { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::Dump { uuids, path, ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") + self.sender + .send(msg) + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)?; + receiver + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)? } async fn get_info(&self) -> Result { let (ret, receiver) = oneshot::channel(); let msg = UpdateMsg::GetInfo { ret }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") + self.sender + .send(msg) + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)?; + receiver + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)? } async fn update( @@ -98,7 +129,12 @@ where meta, ret, }; - let _ = self.sender.send(msg).await; - receiver.await.expect("update actor killed.") + self.sender + .send(msg) + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)?; + receiver + .await + .map_err(|_| UpdateError::FatalUpdateStoreError)? } } diff --git a/meilisearch-http/src/index_controller/update_actor/mod.rs b/meilisearch-http/src/index_controller/update_actor/mod.rs index ba89eebe3..b854cca70 100644 --- a/meilisearch-http/src/index_controller/update_actor/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/mod.rs @@ -30,6 +30,10 @@ pub enum UpdateError { UnexistingUpdate(u64), #[error("Internal error processing update: {0}")] Internal(String), + #[error( + "Update store was shut down due to a fatal error, please check your logs for more info." + )] + FatalUpdateStoreError, } macro_rules! internal_error { diff --git a/meilisearch-http/src/index_controller/update_actor/store/mod.rs b/meilisearch-http/src/index_controller/update_actor/store/mod.rs index 331e7b2bb..7ab854be7 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/mod.rs @@ -3,6 +3,7 @@ pub mod dump; use std::fs::{copy, create_dir_all, remove_file, File}; use std::path::Path; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::{ collections::{BTreeMap, HashSet}, @@ -98,7 +99,7 @@ pub struct UpdateStore { /// | 16-bytes | 8-bytes | updates: Database>, /// Indicates the current state of the update store, - pub state: Arc, + state: Arc, /// Wake up the loop when a new event occurs. notification_sender: mpsc::Sender<()>, path: PathBuf, @@ -138,6 +139,7 @@ impl UpdateStore { options: EnvOpenOptions, path: impl AsRef, index_handle: impl IndexActorHandle + Clone + Sync + Send + 'static, + must_exit: Arc, ) -> anyhow::Result> { let (update_store, mut notification_receiver) = Self::new(options, path)?; let update_store = Arc::new(update_store); @@ -171,7 +173,11 @@ impl UpdateStore { match res { Ok(Some(_)) => (), Ok(None) => break, - Err(e) => error!("error while processing update: {}", e), + Err(e) => { + error!("Fatal error while processing update that requires the update store to shutdown: {}", e); + must_exit.store(true, Ordering::SeqCst); + break 'outer; + } } } // the ownership on the arc has been taken, we need to exit. @@ -181,6 +187,8 @@ impl UpdateStore { } }); + error!("Update store loop exited."); + Ok(update_store) } @@ -286,63 +294,79 @@ impl UpdateStore { // If there is a pending update we process and only keep // a reader while processing it, not a writer. match first_meta { - Some(((global_id, index_uuid, update_id), mut pending)) => { - let content_path = pending.content.take(); + Some(((global_id, index_uuid, _), mut pending)) => { + let content = pending.content.take(); let processing = pending.processing(); - // Acquire the state lock and set the current state to processing. // txn must *always* be acquired after state lock, or it will dead lock. let state = self.state.write(); state.swap(State::Processing(index_uuid, processing.clone())); - let file = match content_path { - Some(uuid) => { - let path = update_uuid_to_file_path(&self.path, uuid); - let file = File::open(path)?; - Some(file) - } - None => None, - }; + let result = + self.perform_update(content, processing, index_handle, index_uuid, global_id); - // Process the pending update using the provided user function. - let handle = Handle::current(); - let result = match handle.block_on(index_handle.update(index_uuid, processing.clone(), file)) { - Ok(result) => result, - Err(e) => Err(processing.fail(e.to_string())), - }; - - // Once the pending update have been successfully processed - // we must remove the content from the pending and processing stores and - // write the *new* meta to the processed-meta store and commit. - let mut wtxn = self.env.write_txn()?; - self.pending_queue - .delete(&mut wtxn, &(global_id, index_uuid, update_id))?; - - if let Some(uuid) = content_path { - let path = update_uuid_to_file_path(&self.path, uuid); - remove_file(&path)?; - } - - let result = match result { - Ok(res) => res.into(), - Err(res) => res.into(), - }; - - self.updates.remap_key_type::().put( - &mut wtxn, - &(index_uuid, update_id), - &result, - )?; - - wtxn.commit()?; state.swap(State::Idle); - Ok(Some(())) + result } None => Ok(None), } } + fn perform_update( + &self, + content: Option, + processing: Processing, + index_handle: impl IndexActorHandle, + index_uuid: Uuid, + global_id: u64, + ) -> anyhow::Result> { + let content_path = content.map(|uuid| update_uuid_to_file_path(&self.path, uuid)); + let update_id = processing.id(); + + let file = match content_path { + Some(ref path) => { + let file = File::open(path)?; + Some(file) + } + None => None, + }; + + // Process the pending update using the provided user function. + let handle = Handle::current(); + let result = + match handle.block_on(index_handle.update(index_uuid, processing.clone(), file)) { + Ok(result) => result, + Err(e) => Err(processing.fail(e.to_string())), + }; + + // Once the pending update have been successfully processed + // we must remove the content from the pending and processing stores and + // write the *new* meta to the processed-meta store and commit. + let mut wtxn = self.env.write_txn()?; + self.pending_queue + .delete(&mut wtxn, &(global_id, index_uuid, update_id))?; + + let result = match result { + Ok(res) => res.into(), + Err(res) => res.into(), + }; + + self.updates.remap_key_type::().put( + &mut wtxn, + &(index_uuid, update_id), + &result, + )?; + + wtxn.commit()?; + + if let Some(ref path) = content_path { + remove_file(&path)?; + } + + Ok(Some(())) + } + /// List the updates for `index_uuid`. pub fn list(&self, index_uuid: Uuid) -> anyhow::Result> { let mut update_list = BTreeMap::::new(); @@ -561,7 +585,13 @@ mod test { let mut options = EnvOpenOptions::new(); let handle = Arc::new(MockIndexActorHandle::new()); options.map_size(4096 * 100); - let update_store = UpdateStore::open(options, dir.path(), handle).unwrap(); + let update_store = UpdateStore::open( + options, + dir.path(), + handle, + Arc::new(AtomicBool::new(false)), + ) + .unwrap(); let index1_uuid = Uuid::new_v4(); let index2_uuid = Uuid::new_v4(); @@ -588,7 +618,13 @@ mod test { let mut options = EnvOpenOptions::new(); let handle = Arc::new(MockIndexActorHandle::new()); options.map_size(4096 * 100); - let update_store = UpdateStore::open(options, dir.path(), handle).unwrap(); + let update_store = UpdateStore::open( + options, + dir.path(), + handle, + Arc::new(AtomicBool::new(false)), + ) + .unwrap(); let meta = UpdateMeta::ClearDocuments; let uuid = Uuid::new_v4(); let store_clone = update_store.clone(); @@ -626,7 +662,13 @@ mod test { let mut options = EnvOpenOptions::new(); options.map_size(4096 * 100); - let store = UpdateStore::open(options, dir.path(), handle.clone()).unwrap(); + let store = UpdateStore::open( + options, + dir.path(), + handle.clone(), + Arc::new(AtomicBool::new(false)), + ) + .unwrap(); // wait a bit for the event loop exit. tokio::time::sleep(std::time::Duration::from_millis(50)).await; diff --git a/meilisearch-http/src/index_controller/updates.rs b/meilisearch-http/src/index_controller/updates.rs index 303289df3..0dc1c2534 100644 --- a/meilisearch-http/src/index_controller/updates.rs +++ b/meilisearch-http/src/index_controller/updates.rs @@ -3,7 +3,7 @@ use milli::update::{DocumentAdditionResult, IndexDocumentsMethod, UpdateFormat}; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::index::{Unchecked, Settings}; +use crate::index::{Settings, Unchecked}; pub type UpdateError = String; diff --git a/meilisearch-http/src/index_controller/uuid_resolver/store.rs b/meilisearch-http/src/index_controller/uuid_resolver/store.rs index 5f7c23f97..bab223bb3 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver/store.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver/store.rs @@ -8,7 +8,7 @@ use heed::{CompactionOption, Database, Env, EnvOpenOptions}; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use super::{Result, UUID_STORE_SIZE, UuidResolverError}; +use super::{Result, UuidResolverError, UUID_STORE_SIZE}; use crate::helpers::EnvSizer; #[derive(Serialize, Deserialize)] @@ -96,7 +96,7 @@ impl HeedUuidStore { let mut txn = env.write_txn()?; if db.get(&txn, &name)?.is_some() { - return Err(UuidResolverError::NameAlreadyExist) + return Err(UuidResolverError::NameAlreadyExist); } db.put(&mut txn, &name, uuid.as_bytes())?; From 99551fc21b72b1475cf3f732d5096bc6879e9d3a Mon Sep 17 00:00:00 2001 From: Irevoire Date: Wed, 9 Jun 2021 17:10:10 +0200 Subject: [PATCH 3/8] fix encoding bug --- meilisearch-http/src/index/update_handler.rs | 2 +- meilisearch-http/src/index/updates.rs | 8 ++--- meilisearch-http/src/index_controller/mod.rs | 13 ++----- .../index_controller/update_actor/actor.rs | 7 ++-- .../update_actor/store/codec.rs | 4 +-- .../update_actor/store/dump.rs | 1 - .../update_actor/store/mod.rs | 34 +++++++++---------- .../src/index_controller/updates.rs | 4 ++- .../tests/documents/delete_documents.rs | 2 +- 9 files changed, 34 insertions(+), 41 deletions(-) diff --git a/meilisearch-http/src/index/update_handler.rs b/meilisearch-http/src/index/update_handler.rs index 8a127168e..13e516d3c 100644 --- a/meilisearch-http/src/index/update_handler.rs +++ b/meilisearch-http/src/index/update_handler.rs @@ -81,7 +81,7 @@ impl UpdateHandler { primary_key.as_deref(), ), ClearDocuments => index.clear_documents(update_builder), - DeleteDocuments => index.delete_documents(content, update_builder), + DeleteDocuments { documents } => index.delete_documents(documents.to_vec(), update_builder), Settings(settings) => index.update_settings(&settings.clone().check(), update_builder), }; diff --git a/meilisearch-http/src/index/updates.rs b/meilisearch-http/src/index/updates.rs index b4869fa42..750e1f275 100644 --- a/meilisearch-http/src/index/updates.rs +++ b/meilisearch-http/src/index/updates.rs @@ -298,18 +298,14 @@ impl Index { pub fn delete_documents( &self, - document_ids: Option, + document_ids: Vec, update_builder: UpdateBuilder, ) -> anyhow::Result { - let ids = match document_ids { - Some(reader) => serde_json::from_reader(reader)?, - None => Vec::::new(), - }; let mut txn = self.write_txn()?; let mut builder = update_builder.delete_documents(&mut txn, self)?; // We ignore unexisting document ids - ids.iter().for_each(|id| { + document_ids.iter().for_each(|id| { builder.delete_external_id(id); }); diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index 4d5a52666..3c46e48f6 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -200,18 +200,11 @@ impl IndexController { pub async fn delete_documents( &self, uid: String, - document_ids: Vec, + documents: Vec, ) -> anyhow::Result { let uuid = self.uuid_resolver.get(uid).await?; - let meta = UpdateMeta::DeleteDocuments; - let (sender, receiver) = mpsc::channel(10); - - tokio::task::spawn(async move { - let json = serde_json::to_vec(&document_ids).unwrap(); - let bytes = Bytes::from(json); - let _ = sender.send(Ok(bytes)).await; - }); - + let meta = UpdateMeta::DeleteDocuments { documents }; + let (_, receiver) = mpsc::channel(1); let status = self.update_handle.update(meta, receiver, uuid).await?; Ok(status) } diff --git a/meilisearch-http/src/index_controller/update_actor/actor.rs b/meilisearch-http/src/index_controller/update_actor/actor.rs index 76cba7e07..c74cf11f5 100644 --- a/meilisearch-http/src/index_controller/update_actor/actor.rs +++ b/meilisearch-http/src/index_controller/update_actor/actor.rs @@ -106,7 +106,7 @@ where mut payload: mpsc::Receiver>, ) -> Result { let file_path = match meta { - UpdateMeta::DocumentsAddition { .. } | UpdateMeta::DeleteDocuments => { + UpdateMeta::DocumentsAddition { .. } => { let update_file_id = uuid::Uuid::new_v4(); let path = self .path @@ -181,10 +181,13 @@ where async fn handle_get_update(&self, uuid: Uuid, id: u64) -> Result { let store = self.store.clone(); + tokio::task::spawn_blocking(move || { let result = store .meta(uuid, id)? .ok_or(UpdateError::UnexistingUpdate(id))?; - Ok(result) + Ok(result) + }) + .await? } async fn handle_delete(&self, uuid: Uuid) -> Result<()> { diff --git a/meilisearch-http/src/index_controller/update_actor/store/codec.rs b/meilisearch-http/src/index_controller/update_actor/store/codec.rs index e07b52eec..2c7068f88 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/codec.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/codec.rs @@ -75,10 +75,10 @@ impl<'a> BytesDecode<'a> for UpdateKeyCodec { type DItem = (Uuid, u64); fn bytes_decode(bytes: &'a [u8]) -> Option { - let uuid_bytes = bytes.get(0..size_of::())?.try_into().ok()?; + let uuid_bytes = dbg!(bytes.get(0..size_of::())?.try_into().ok())?; let uuid = Uuid::from_bytes(uuid_bytes); - let update_id_bytes = bytes.get(size_of::()..)?.try_into().ok()?; + let update_id_bytes = dbg!(bytes.get(size_of::()..)?.try_into().ok())?; let update_id = u64::from_be_bytes(update_id_bytes); Some((uuid, update_id)) diff --git a/meilisearch-http/src/index_controller/update_actor/store/dump.rs b/meilisearch-http/src/index_controller/update_actor/store/dump.rs index 8f947e459..4be5e27a7 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/dump.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/dump.rs @@ -108,7 +108,6 @@ impl UpdateStore { let updates = self .updates .iter(txn)? - .remap_key_type::() .lazily_decode_data(); for update in updates { diff --git a/meilisearch-http/src/index_controller/update_actor/store/mod.rs b/meilisearch-http/src/index_controller/update_actor/store/mod.rs index 7ab854be7..75b1c5b15 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/mod.rs @@ -97,7 +97,7 @@ pub struct UpdateStore { /// The keys are built as follow: /// | Uuid | id | /// | 16-bytes | 8-bytes | - updates: Database>, + updates: Database>, /// Indicates the current state of the update store, state: Arc, /// Wake up the loop when a new event occurs. @@ -244,6 +244,8 @@ impl UpdateStore { txn.commit()?; + dbg!("here"); + self.notification_sender .blocking_send(()) .expect("Update store loop exited."); @@ -269,7 +271,7 @@ impl UpdateStore { } _ => { let _update_id = self.next_update_id_raw(wtxn, index_uuid)?; - self.updates.remap_key_type::().put( + self.updates.put( wtxn, &(index_uuid, update.id()), &update, @@ -324,6 +326,8 @@ impl UpdateStore { let content_path = content.map(|uuid| update_uuid_to_file_path(&self.path, uuid)); let update_id = processing.id(); + dbg!(&processing); + let file = match content_path { Some(ref path) => { let file = File::open(path)?; @@ -352,7 +356,7 @@ impl UpdateStore { Err(res) => res.into(), }; - self.updates.remap_key_type::().put( + self.updates.put( &mut wtxn, &(index_uuid, update_id), &result, @@ -381,7 +385,11 @@ impl UpdateStore { } } - let updates = self.updates.prefix_iter(&txn, index_uuid.as_bytes())?; + let updates = self + .updates + .remap_key_type::() + .prefix_iter(&txn, index_uuid.as_bytes())?; + for entry in updates { let (_, update) = entry?; update_list.insert(update.id(), update); @@ -412,26 +420,19 @@ impl UpdateStore { let txn = self.env.read_txn()?; // Else, check if it is in the updates database: - let update = self - .updates - .remap_key_type::() - .get(&txn, &(index_uuid, update_id))?; + let update = dbg!(self.updates.get(&txn, &(index_uuid, update_id)))?; if let Some(update) = update { return Ok(Some(update)); } // If nothing was found yet, we resolve to iterate over the pending queue. - let pendings = self - .pending_queue - .remap_key_type::() - .iter(&txn)? - .lazily_decode_data(); + let pendings = self.pending_queue.iter(&txn)?.lazily_decode_data(); for entry in pendings { - let ((uuid, id), pending) = entry?; + let ((_, uuid, id), pending) = entry?; if uuid == index_uuid && id == update_id { - return Ok(Some(pending.decode()?.into())); + return Ok(Some(dbg!(pending.decode())?.into())); } } @@ -461,6 +462,7 @@ impl UpdateStore { let mut updates = self .updates + .remap_key_type::() .prefix_iter_mut(&mut txn, index_uuid.as_bytes())? .lazily_decode_data(); @@ -707,7 +709,6 @@ mod test { assert!(store.pending_queue.first(&txn).unwrap().is_none()); let update = store .updates - .remap_key_type::() .get(&txn, &(uuid, 0)) .unwrap() .unwrap(); @@ -715,7 +716,6 @@ mod test { assert!(matches!(update, UpdateStatus::Processed(_))); let update = store .updates - .remap_key_type::() .get(&txn, &(uuid, 1)) .unwrap() .unwrap(); diff --git a/meilisearch-http/src/index_controller/updates.rs b/meilisearch-http/src/index_controller/updates.rs index 0dc1c2534..86b33e3f2 100644 --- a/meilisearch-http/src/index_controller/updates.rs +++ b/meilisearch-http/src/index_controller/updates.rs @@ -23,7 +23,9 @@ pub enum UpdateMeta { primary_key: Option, }, ClearDocuments, - DeleteDocuments, + DeleteDocuments { + documents: Vec + }, Settings(Settings), } diff --git a/meilisearch-http/tests/documents/delete_documents.rs b/meilisearch-http/tests/documents/delete_documents.rs index b69b4c11f..d9b97d68d 100644 --- a/meilisearch-http/tests/documents/delete_documents.rs +++ b/meilisearch-http/tests/documents/delete_documents.rs @@ -114,7 +114,7 @@ async fn delete_no_document_batch() { index.add_documents(json!([{ "id": 1, "content": "foobar" }, { "id": 0, "content": "foobar" }, { "id": 3, "content": "foobar" }]), Some("id")).await; index.wait_update_id(0).await; let (_response, code) = index.delete_batch(vec![]).await; - assert_eq!(code, 202); + assert_eq!(code, 202, "{}", _response); let _update = index.wait_update_id(1).await; let (response, code) = index From 2d19b78dd8941631b853f3e0ffd7a1646d77a8ae Mon Sep 17 00:00:00 2001 From: mpostma Date: Thu, 10 Jun 2021 12:03:16 +0200 Subject: [PATCH 4/8] fix stats test --- meilisearch-http/tests/index/stats.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/meilisearch-http/tests/index/stats.rs b/meilisearch-http/tests/index/stats.rs index e1d8bd211..d32c06d2b 100644 --- a/meilisearch-http/tests/index/stats.rs +++ b/meilisearch-http/tests/index/stats.rs @@ -35,11 +35,6 @@ async fn stats() { assert_eq!(code, 202); assert_eq!(response["updateId"], 0); - let (response, code) = index.stats().await; - - assert_eq!(code, 200); - assert_eq!(response["isIndexing"], true); - index.wait_update_id(0).await; let (response, code) = index.stats().await; From 20e1caef470316f5ab2f493aa027150f869f4b75 Mon Sep 17 00:00:00 2001 From: Irevoire Date: Thu, 10 Jun 2021 14:53:34 +0200 Subject: [PATCH 5/8] makes clippy happy --- .../src/index_controller/update_actor/store/dump.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/meilisearch-http/src/index_controller/update_actor/store/dump.rs b/meilisearch-http/src/index_controller/update_actor/store/dump.rs index 4be5e27a7..e7f36a2a1 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/dump.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/dump.rs @@ -9,8 +9,7 @@ use heed::{EnvOpenOptions, RoTxn}; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use super::UpdateStore; -use super::{codec::UpdateKeyCodec, State}; +use super::{State, UpdateStore}; use crate::index_controller::{ index_actor::IndexActorHandle, update_actor::store::update_uuid_to_file_path, Enqueued, UpdateStatus, @@ -105,10 +104,7 @@ impl UpdateStore { uuids: &HashSet, mut file: &mut File, ) -> anyhow::Result<()> { - let updates = self - .updates - .iter(txn)? - .lazily_decode_data(); + let updates = self.updates.iter(txn)?.lazily_decode_data(); for update in updates { let ((uuid, _), data) = update?; From 592fcbc71fcb03533f40e98738411f79d9c36b18 Mon Sep 17 00:00:00 2001 From: mpostma Date: Thu, 10 Jun 2021 12:03:16 +0200 Subject: [PATCH 6/8] fix stats test --- meilisearch-http/tests/stats/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/meilisearch-http/tests/stats/mod.rs b/meilisearch-http/tests/stats/mod.rs index ef90dcf7f..f931d5066 100644 --- a/meilisearch-http/tests/stats/mod.rs +++ b/meilisearch-http/tests/stats/mod.rs @@ -53,13 +53,12 @@ async fn stats() { ]); let (response, code) = index.add_documents(documents, None).await; - assert_eq!(code, 202); + assert_eq!(code, 202, "{}", response); assert_eq!(response["updateId"], 0); let (response, code) = server.stats().await; - assert_eq!(code, 200); - assert_eq!(response["indexes"]["test"]["isIndexing"], true); + assert_eq!(code, 200, "{}", response); index.wait_update_id(0).await; From eb7616ca0fd4e013462976b9dc07e15d5cf5963f Mon Sep 17 00:00:00 2001 From: mpostma Date: Thu, 10 Jun 2021 15:32:45 +0200 Subject: [PATCH 7/8] remove dbg --- .../src/index_controller/update_actor/store/codec.rs | 4 ++-- .../src/index_controller/update_actor/store/mod.rs | 12 ++++-------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/meilisearch-http/src/index_controller/update_actor/store/codec.rs b/meilisearch-http/src/index_controller/update_actor/store/codec.rs index 2c7068f88..e07b52eec 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/codec.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/codec.rs @@ -75,10 +75,10 @@ impl<'a> BytesDecode<'a> for UpdateKeyCodec { type DItem = (Uuid, u64); fn bytes_decode(bytes: &'a [u8]) -> Option { - let uuid_bytes = dbg!(bytes.get(0..size_of::())?.try_into().ok())?; + let uuid_bytes = bytes.get(0..size_of::())?.try_into().ok()?; let uuid = Uuid::from_bytes(uuid_bytes); - let update_id_bytes = dbg!(bytes.get(size_of::()..)?.try_into().ok())?; + let update_id_bytes = bytes.get(size_of::()..)?.try_into().ok()?; let update_id = u64::from_be_bytes(update_id_bytes); Some((uuid, update_id)) diff --git a/meilisearch-http/src/index_controller/update_actor/store/mod.rs b/meilisearch-http/src/index_controller/update_actor/store/mod.rs index 75b1c5b15..fc5c44568 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/mod.rs @@ -237,15 +237,13 @@ impl UpdateStore { let mut txn = self.env.write_txn()?; let (global_id, update_id) = self.next_update_id(&mut txn, index_uuid)?; - let meta = dbg!(Enqueued::new(meta, update_id, content)); + let meta = Enqueued::new(meta, update_id, content); self.pending_queue .put(&mut txn, &(global_id, index_uuid, update_id), &meta)?; txn.commit()?; - dbg!("here"); - self.notification_sender .blocking_send(()) .expect("Update store loop exited."); @@ -290,7 +288,7 @@ impl UpdateStore { ) -> anyhow::Result> { // Create a read transaction to be able to retrieve the pending update in order. let rtxn = self.env.read_txn()?; - let first_meta = dbg!(self.pending_queue.first(&rtxn)?); + let first_meta = self.pending_queue.first(&rtxn)?; drop(rtxn); // If there is a pending update we process and only keep @@ -326,8 +324,6 @@ impl UpdateStore { let content_path = content.map(|uuid| update_uuid_to_file_path(&self.path, uuid)); let update_id = processing.id(); - dbg!(&processing); - let file = match content_path { Some(ref path) => { let file = File::open(path)?; @@ -420,7 +416,7 @@ impl UpdateStore { let txn = self.env.read_txn()?; // Else, check if it is in the updates database: - let update = dbg!(self.updates.get(&txn, &(index_uuid, update_id)))?; + let update = self.updates.get(&txn, &(index_uuid, update_id))?; if let Some(update) = update { return Ok(Some(update)); @@ -432,7 +428,7 @@ impl UpdateStore { for entry in pendings { let ((_, uuid, id), pending) = entry?; if uuid == index_uuid && id == update_id { - return Ok(Some(dbg!(pending.decode())?.into())); + return Ok(Some(pending.decode()?.into())); } } From 3ef0830c5dcde12066c9a489b9c441e6717c541f Mon Sep 17 00:00:00 2001 From: mpostma Date: Thu, 10 Jun 2021 15:55:44 +0200 Subject: [PATCH 8/8] review changes --- meilisearch-http/src/index/update_handler.rs | 2 +- meilisearch-http/src/index/updates.rs | 2 +- meilisearch-http/src/index_controller/index_actor/store.rs | 2 ++ meilisearch-http/src/index_controller/mod.rs | 2 +- .../src/index_controller/update_actor/store/mod.rs | 6 +++--- meilisearch-http/src/index_controller/updates.rs | 2 +- 6 files changed, 9 insertions(+), 7 deletions(-) diff --git a/meilisearch-http/src/index/update_handler.rs b/meilisearch-http/src/index/update_handler.rs index 13e516d3c..63a074abb 100644 --- a/meilisearch-http/src/index/update_handler.rs +++ b/meilisearch-http/src/index/update_handler.rs @@ -81,7 +81,7 @@ impl UpdateHandler { primary_key.as_deref(), ), ClearDocuments => index.clear_documents(update_builder), - DeleteDocuments { documents } => index.delete_documents(documents.to_vec(), update_builder), + DeleteDocuments { ids } => index.delete_documents(ids, update_builder), Settings(settings) => index.update_settings(&settings.clone().check(), update_builder), }; diff --git a/meilisearch-http/src/index/updates.rs b/meilisearch-http/src/index/updates.rs index 750e1f275..9ed4fe49e 100644 --- a/meilisearch-http/src/index/updates.rs +++ b/meilisearch-http/src/index/updates.rs @@ -298,7 +298,7 @@ impl Index { pub fn delete_documents( &self, - document_ids: Vec, + document_ids: &[String], update_builder: UpdateBuilder, ) -> anyhow::Result { let mut txn = self.write_txn()?; diff --git a/meilisearch-http/src/index_controller/index_actor/store.rs b/meilisearch-http/src/index_controller/index_actor/store.rs index 9b6b057c3..1646821d8 100644 --- a/meilisearch-http/src/index_controller/index_actor/store.rs +++ b/meilisearch-http/src/index_controller/index_actor/store.rs @@ -40,6 +40,8 @@ impl MapIndexStore { #[async_trait::async_trait] impl IndexStore for MapIndexStore { async fn create(&self, uuid: Uuid, primary_key: Option) -> IndexResult { + // We need to keep the lock until we are sure the db file has been opened correclty, to + // ensure that another db is not created at the same time. let mut lock = self.index_store.write().await; if let Some(index) = lock.get(&uuid) { diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index 3c46e48f6..0c801558b 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -203,7 +203,7 @@ impl IndexController { documents: Vec, ) -> anyhow::Result { let uuid = self.uuid_resolver.get(uid).await?; - let meta = UpdateMeta::DeleteDocuments { documents }; + let meta = UpdateMeta::DeleteDocuments { ids: documents }; let (_, receiver) = mpsc::channel(1); let status = self.update_handle.update(meta, receiver, uuid).await?; Ok(status) diff --git a/meilisearch-http/src/index_controller/update_actor/store/mod.rs b/meilisearch-http/src/index_controller/update_actor/store/mod.rs index fc5c44568..e7b719fc9 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/mod.rs @@ -174,7 +174,7 @@ impl UpdateStore { Ok(Some(_)) => (), Ok(None) => break, Err(e) => { - error!("Fatal error while processing update that requires the update store to shutdown: {}", e); + error!("Fatal error while processing an update that requires the update store to shutdown: {}", e); must_exit.store(true, Ordering::SeqCst); break 'outer; } @@ -185,9 +185,9 @@ impl UpdateStore { } } } - }); - error!("Update store loop exited."); + error!("Update store loop exited."); + }); Ok(update_store) } diff --git a/meilisearch-http/src/index_controller/updates.rs b/meilisearch-http/src/index_controller/updates.rs index 86b33e3f2..ea2ffb80d 100644 --- a/meilisearch-http/src/index_controller/updates.rs +++ b/meilisearch-http/src/index_controller/updates.rs @@ -24,7 +24,7 @@ pub enum UpdateMeta { }, ClearDocuments, DeleteDocuments { - documents: Vec + ids: Vec }, Settings(Settings), }