From caa231aebe6692a79bb3ef5c2c03c887cba5e558 Mon Sep 17 00:00:00 2001 From: marin postma Date: Wed, 16 Jun 2021 14:52:06 +0200 Subject: [PATCH] fix race condition --- meilisearch-http/src/index_controller/mod.rs | 17 +++++++++++++++-- .../index_controller/update_actor/store/mod.rs | 13 ++++++++++--- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/meilisearch-http/src/index_controller/mod.rs b/meilisearch-http/src/index_controller/mod.rs index b51a6c5b0..313709e21 100644 --- a/meilisearch-http/src/index_controller/mod.rs +++ b/meilisearch-http/src/index_controller/mod.rs @@ -6,6 +6,7 @@ use std::time::Duration; use actix_web::web::{Bytes, Payload}; use chrono::{DateTime, Utc}; use futures::stream::StreamExt; +use log::error; use log::info; use milli::FieldsDistribution; use serde::{Deserialize, Serialize}; @@ -256,8 +257,20 @@ impl IndexController { pub async fn delete_index(&self, uid: String) -> Result<()> { let uuid = self.uuid_resolver.delete(uid).await?; - self.update_handle.delete(uuid).await?; - self.index_handle.delete(uuid).await?; + + // We remove the index from the resolver synchronously, and effectively perform the index + // deletion as a background task. + let update_handle = self.update_handle.clone(); + let index_handle = self.index_handle.clone(); + tokio::spawn(async move { + if let Err(e) = update_handle.delete(uuid).await { + error!("Error while deleting index: {}", e); + } + if let Err(e) = index_handle.delete(uuid).await { + error!("Error while deleting index: {}", e); + } + }); + Ok(()) } diff --git a/meilisearch-http/src/index_controller/update_actor/store/mod.rs b/meilisearch-http/src/index_controller/update_actor/store/mod.rs index 0bdf53aee..1ee30c99d 100644 --- a/meilisearch-http/src/index_controller/update_actor/store/mod.rs +++ b/meilisearch-http/src/index_controller/update_actor/store/mod.rs @@ -428,7 +428,8 @@ impl UpdateStore { Ok(None) } - /// Delete all updates for an index from the update store. + /// Delete all updates for an index from the update store. If the currently processing update + /// is for `index_uuid`, the call will block until the update is terminated. pub fn delete_all(&self, index_uuid: Uuid) -> Result<()> { let mut txn = self.env.write_txn()?; // Contains all the content file paths that we need to be removed if the deletion was successful. @@ -469,8 +470,14 @@ impl UpdateStore { let _ = remove_file(path); }); - // We don't care about the currently processing update, since it will be removed by itself - // once its done processing, and we can't abort a running update. + // If the currently processing update is from our index, we wait until it is + // finished before returning. This ensure that no write to the index occurs after we delete it. + if let State::Processing(uuid, _) = *self.state.read() { + if uuid == index_uuid { + // wait for a write lock, do nothing with it. + self.state.write(); + } + } Ok(()) }