diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index 90c67e2b6..2bd373a5c 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -16,10 +16,12 @@ use milli::update::{IndexDocumentsMethod, UpdateFormat}; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tokio::time::sleep; +use uuid::Uuid; use crate::index::{Document, SearchQuery, SearchResult}; use crate::index::{Facets, Settings, UpdateResult}; pub use updates::{Failed, Processed, Processing}; +use uuid_resolver::UuidError; pub type UpdateStatus = updates::UpdateStatus; @@ -80,41 +82,51 @@ impl IndexController { uid: String, method: milli::update::IndexDocumentsMethod, format: milli::update::UpdateFormat, - mut payload: Payload, + payload: Payload, primary_key: Option, ) -> anyhow::Result { - let uuid = self.uuid_resolver.get_or_create(uid).await?; - let meta = UpdateMeta::DocumentsAddition { - method, - format, - primary_key, + let perform_update = |uuid| async move { + let meta = UpdateMeta::DocumentsAddition { + method, + format, + primary_key, + }; + let (sender, receiver) = mpsc::channel(10); + + // It is necessary to spawn a local task to send the payload to the update handle to + // prevent dead_locking between the update_handle::update that waits for the update to be + // registered and the update_actor that waits for the the payload to be sent to it. + tokio::task::spawn_local(async move { + payload + .map(|bytes| { + bytes.map_err(|e| { + Box::new(e) as Box + }) + }) + .for_each(|r| async { + let _ = sender.send(r).await; + }) + .await + }); + + // This must be done *AFTER* spawning the task. + self.update_handle.update(meta, receiver, uuid).await }; - let (sender, receiver) = mpsc::channel(10); - // It is necessary to spawn a local task to senf the payload to the update handle to - // prevent dead_locking between the update_handle::update that waits for the update to be - // registered and the update_actor that waits for the the payload to be sent to it. - tokio::task::spawn_local(async move { - while let Some(bytes) = payload.next().await { - match bytes { - Ok(bytes) => { - let _ = sender.send(Ok(bytes)).await; - } - Err(e) => { - let error: Box = Box::new(e); - let _ = sender.send(Err(error)).await; - } - } + match self.uuid_resolver.get(uid).await { + Ok(uuid) => Ok(perform_update(uuid).await?), + Err(UuidError::UnexistingIndex(name)) => { + let uuid = Uuid::new_v4(); + let status = perform_update(uuid).await?; + self.uuid_resolver.insert(name, uuid).await?; + Ok(status) } - }); - - // This must be done *AFTER* spawning the task. - let status = self.update_handle.update(meta, receiver, uuid).await?; - Ok(status) + Err(e) => Err(e.into()), + } } pub async fn clear_documents(&self, uid: String) -> anyhow::Result { - let uuid = self.uuid_resolver.resolve(uid).await?; + let uuid = self.uuid_resolver.get(uid).await?; let meta = UpdateMeta::ClearDocuments; let (_, receiver) = mpsc::channel(1); let status = self.update_handle.update(meta, receiver, uuid).await?; @@ -126,7 +138,7 @@ impl IndexController { uid: String, document_ids: Vec, ) -> anyhow::Result { - let uuid = self.uuid_resolver.resolve(uid).await?; + let uuid = self.uuid_resolver.get(uid).await?; let meta = UpdateMeta::DeleteDocuments; let (sender, receiver) = mpsc::channel(10); @@ -146,26 +158,23 @@ impl IndexController { settings: Settings, create: bool, ) -> anyhow::Result { - let uuid = if create { - let uuid = self.uuid_resolver.get_or_create(uid).await?; - // We need to create the index upfront, since it would otherwise only be created when - // the update is processed. This would make calls to GET index to fail until the update - // is complete. Since this is get or create, we ignore the error when the index already - // exists. - match self.index_handle.create_index(uuid, None).await { - Ok(_) | Err(index_actor::IndexError::IndexAlreadyExists) => (), - Err(e) => return Err(e.into()), - } - uuid - } else { - self.uuid_resolver.resolve(uid).await? + let perform_udpate = |uuid| async move { + let meta = UpdateMeta::Settings(settings); + // Nothing so send, drop the sender right away, as not to block the update actor. + let (_, receiver) = mpsc::channel(1); + self.update_handle.update(meta, receiver, uuid).await }; - let meta = UpdateMeta::Settings(settings); - // Nothing so send, drop the sender right away, as not to block the update actor. - let (_, receiver) = mpsc::channel(1); - let status = self.update_handle.update(meta, receiver, uuid).await?; - Ok(status) + match self.uuid_resolver.get(uid).await { + Ok(uuid) => Ok(perform_udpate(uuid).await?), + Err(UuidError::UnexistingIndex(name)) if create => { + let uuid = Uuid::new_v4(); + let status = perform_udpate(uuid).await?; + self.uuid_resolver.insert(name, uuid).await?; + Ok(status) + } + Err(e) => Err(e.into()), + } } pub async fn create_index( @@ -177,7 +186,11 @@ impl IndexController { let uuid = self.uuid_resolver.create(uid.clone()).await?; let meta = self.index_handle.create_index(uuid, primary_key).await?; let _ = self.update_handle.create(uuid).await?; - let meta = IndexMetadata { name: uid.clone(), uid, meta }; + let meta = IndexMetadata { + name: uid.clone(), + uid, + meta, + }; Ok(meta) } @@ -190,13 +203,13 @@ impl IndexController { } pub async fn update_status(&self, uid: String, id: u64) -> anyhow::Result { - let uuid = self.uuid_resolver.resolve(uid).await?; + let uuid = self.uuid_resolver.get(uid).await?; let result = self.update_handle.update_status(uuid, id).await?; Ok(result) } pub async fn all_update_status(&self, uid: String) -> anyhow::Result> { - let uuid = self.uuid_resolver.resolve(uid).await?; + let uuid = self.uuid_resolver.get(uid).await?; let result = self.update_handle.get_all_updates_status(uuid).await?; Ok(result) } @@ -208,7 +221,11 @@ impl IndexController { for (uid, uuid) in uuids { let meta = self.index_handle.get_index_meta(uuid).await?; - let meta = IndexMetadata { name: uid.clone(), uid, meta }; + let meta = IndexMetadata { + name: uid.clone(), + uid, + meta, + }; ret.push(meta); } @@ -216,7 +233,7 @@ impl IndexController { } pub async fn settings(&self, uid: String) -> anyhow::Result { - let uuid = self.uuid_resolver.resolve(uid.clone()).await?; + let uuid = self.uuid_resolver.get(uid.clone()).await?; let settings = self.index_handle.settings(uuid).await?; Ok(settings) } @@ -228,7 +245,7 @@ impl IndexController { limit: usize, attributes_to_retrieve: Option>, ) -> anyhow::Result> { - let uuid = self.uuid_resolver.resolve(uid.clone()).await?; + let uuid = self.uuid_resolver.get(uid.clone()).await?; let documents = self .index_handle .documents(uuid, offset, limit, attributes_to_retrieve) @@ -242,7 +259,7 @@ impl IndexController { doc_id: String, attributes_to_retrieve: Option>, ) -> anyhow::Result { - let uuid = self.uuid_resolver.resolve(uid.clone()).await?; + let uuid = self.uuid_resolver.get(uid.clone()).await?; let document = self .index_handle .document(uuid, doc_id, attributes_to_retrieve) @@ -259,22 +276,30 @@ impl IndexController { bail!("Can't change the index uid.") } - let uuid = self.uuid_resolver.resolve(uid.clone()).await?; + let uuid = self.uuid_resolver.get(uid.clone()).await?; let meta = self.index_handle.update_index(uuid, index_settings).await?; - let meta = IndexMetadata { name: uid.clone(), uid, meta }; + let meta = IndexMetadata { + name: uid.clone(), + uid, + meta, + }; Ok(meta) } pub async fn search(&self, uid: String, query: SearchQuery) -> anyhow::Result { - let uuid = self.uuid_resolver.resolve(uid).await?; + let uuid = self.uuid_resolver.get(uid).await?; let result = self.index_handle.search(uuid, query).await?; Ok(result) } pub async fn get_index(&self, uid: String) -> anyhow::Result { - let uuid = self.uuid_resolver.resolve(uid.clone()).await?; + let uuid = self.uuid_resolver.get(uid.clone()).await?; let meta = self.index_handle.get_index_meta(uuid).await?; - let meta = IndexMetadata { name: uid.clone(), uid, meta }; + let meta = IndexMetadata { + name: uid.clone(), + uid, + meta, + }; Ok(meta) } } diff --git a/meilisearch-http/src/index_controller/uuid_resolver.rs b/meilisearch-http/src/index_controller/uuid_resolver.rs index 2ee9c6b17..328080d90 100644 --- a/meilisearch-http/src/index_controller/uuid_resolver.rs +++ b/meilisearch-http/src/index_controller/uuid_resolver.rs @@ -13,11 +13,7 @@ pub type Result = std::result::Result; #[derive(Debug)] enum UuidResolveMsg { - Resolve { - uid: String, - ret: oneshot::Sender>, - }, - GetOrCreate { + Get { uid: String, ret: oneshot::Sender>, }, @@ -32,6 +28,11 @@ enum UuidResolveMsg { List { ret: oneshot::Sender>>, }, + Insert { + uuid: Uuid, + name: String, + ret: oneshot::Sender>, + } } struct UuidResolverActor { @@ -54,11 +55,8 @@ impl UuidResolverActor { Some(Create { uid: name, ret }) => { let _ = ret.send(self.handle_create(name).await); } - Some(GetOrCreate { uid: name, ret }) => { - let _ = ret.send(self.handle_get_or_create(name).await); - } - Some(Resolve { uid: name, ret }) => { - let _ = ret.send(self.handle_resolve(name).await); + Some(Get { uid: name, ret }) => { + let _ = ret.send(self.handle_get(name).await); } Some(Delete { uid: name, ret }) => { let _ = ret.send(self.handle_delete(name).await); @@ -66,6 +64,9 @@ impl UuidResolverActor { Some(List { ret }) => { let _ = ret.send(self.handle_list().await); } + Some(Insert { ret, uuid, name }) => { + let _ = ret.send(self.handle_insert(name, uuid).await); + } // all senders have been dropped, need to quit. None => break, } @@ -81,14 +82,7 @@ impl UuidResolverActor { self.store.create_uuid(uid, true).await } - async fn handle_get_or_create(&self, uid: String) -> Result { - if !is_index_uid_valid(&uid) { - return Err(UuidError::BadlyFormatted(uid)); - } - self.store.create_uuid(uid, false).await - } - - async fn handle_resolve(&self, uid: String) -> Result { + async fn handle_get(&self, uid: String) -> Result { self.store .get_uuid(uid.clone()) .await? @@ -106,6 +100,14 @@ impl UuidResolverActor { let result = self.store.list().await?; Ok(result) } + + async fn handle_insert(&self, uid: String, uuid: Uuid) -> Result<()> { + if !is_index_uid_valid(&uid) { + return Err(UuidError::BadlyFormatted(uid)); + } + self.store.insert(uid, uuid).await?; + Ok(()) + } } fn is_index_uid_valid(uid: &str) -> bool { @@ -127,18 +129,9 @@ impl UuidResolverHandle { Ok(Self { sender }) } - pub async fn resolve(&self, name: String) -> anyhow::Result { + pub async fn get(&self, name: String) -> Result { let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::Resolve { uid: name, ret }; - let _ = self.sender.send(msg).await; - Ok(receiver - .await - .expect("Uuid resolver actor has been killed")?) - } - - pub async fn get_or_create(&self, name: String) -> Result { - let (ret, receiver) = oneshot::channel(); - let msg = UuidResolveMsg::GetOrCreate { uid: name, ret }; + let msg = UuidResolveMsg::Get { uid: name, ret }; let _ = self.sender.send(msg).await; Ok(receiver .await @@ -171,6 +164,15 @@ impl UuidResolverHandle { .await .expect("Uuid resolver actor has been killed")?) } + + pub async fn insert(&self, name: String, uuid: Uuid) -> anyhow::Result<()> { + let (ret, receiver) = oneshot::channel(); + let msg = UuidResolveMsg::Insert { ret, name, uuid }; + let _ = self.sender.send(msg).await; + Ok(receiver + .await + .expect("Uuid resolver actor has been killed")?) + } } #[derive(Debug, Error)] @@ -197,6 +199,7 @@ trait UuidStore { async fn get_uuid(&self, uid: String) -> Result>; async fn delete(&self, uid: String) -> Result>; async fn list(&self) -> Result>; + async fn insert(&self, name: String, uuid: Uuid) -> Result<()>; } struct HeedUuidStore { @@ -292,4 +295,16 @@ impl UuidStore for HeedUuidStore { }) .await? } + + async fn insert(&self, name: String, uuid: Uuid) -> Result<()> { + let env = self.env.clone(); + let db = self.db; + tokio::task::spawn_blocking(move || { + let mut txn = env.write_txn()?; + db.put(&mut txn, &name, uuid.as_bytes())?; + txn.commit()?; + Ok(()) + }) + .await? + } }