diff --git a/src/index_controller/index_actor.rs b/src/index_controller/index_actor.rs index 360f3a3c0..2556e224a 100644 --- a/src/index_controller/index_actor.rs +++ b/src/index_controller/index_actor.rs @@ -94,11 +94,16 @@ pub enum IndexError { Error(#[from] anyhow::Error), #[error("index already exists")] IndexAlreadyExists, + #[error("Index doesn't exists")] + UnexistingIndex, } #[async_trait::async_trait] trait IndexStore { async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result; + async fn update_index(&self, uuid: Uuid, f: F) -> Result + where F: FnOnce(Index) -> Result + Send + Sync + 'static, + R: Sync + Send + 'static; async fn get_or_create(&self, uuid: Uuid) -> Result; async fn get(&self, uuid: Uuid) -> Result>; async fn delete(&self, uuid: &Uuid) -> Result>; @@ -238,13 +243,16 @@ impl IndexActor { ) { info!("Processing update {}", meta.id()); let uuid = meta.index_uuid().clone(); - let index = self.store.get_or_create(uuid).await.unwrap(); let update_handler = self.update_handler.clone(); - tokio::task::spawn_blocking(move || { - let result = update_handler.handle_update(meta, data, index); - let _ = ret.send(result); - }) - .await; + let handle = self.store.update_index(uuid, |index| { + let handle = tokio::task::spawn_blocking(move || { + let result = update_handler.handle_update(meta, data, index); + let _ = ret.send(result); + }); + Ok(handle) + }); + + handle.await; } async fn handle_settings(&self, uuid: Uuid, ret: oneshot::Sender>) { @@ -432,10 +440,11 @@ impl IndexStore for MapIndexStore { async fn create_index(&self, uuid: Uuid, primary_key: Option) -> Result { let meta = match self.meta_store.write().await.entry(uuid.clone()) { Entry::Vacant(entry) => { + let now = Utc::now(); let meta = IndexMeta{ uuid, - created_at: Utc::now(), - updated_at: Utc::now(), + created_at: now.clone(), + updated_at: now, primary_key, }; entry.insert(meta).clone() @@ -494,6 +503,23 @@ impl IndexStore for MapIndexStore { async fn get_meta(&self, uuid: &Uuid) -> Result> { Ok(self.meta_store.read().await.get(uuid).cloned()) } + + async fn update_index(&self, uuid: Uuid, f: F) -> Result + where F: FnOnce(Index) -> Result + Send + Sync + 'static, + R: Sync + Send + 'static, + { + let index = self.get_or_create(uuid.clone()).await?; + let mut meta = self.get_meta(&uuid).await? + .ok_or(IndexError::UnexistingIndex)?; + match f(index) { + Ok(r) => { + meta.updated_at = Utc::now(); + self.meta_store.write().await.insert(uuid, meta); + Ok(r) + } + Err(e) => Err(e) + } + } } impl MapIndexStore {