mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-26 23:04:26 +01:00
fix race condition
This commit is contained in:
parent
25af262e79
commit
caa231aebe
@ -6,6 +6,7 @@ use std::time::Duration;
|
|||||||
use actix_web::web::{Bytes, Payload};
|
use actix_web::web::{Bytes, Payload};
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use futures::stream::StreamExt;
|
use futures::stream::StreamExt;
|
||||||
|
use log::error;
|
||||||
use log::info;
|
use log::info;
|
||||||
use milli::FieldsDistribution;
|
use milli::FieldsDistribution;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@ -256,8 +257,20 @@ impl IndexController {
|
|||||||
|
|
||||||
pub async fn delete_index(&self, uid: String) -> Result<()> {
|
pub async fn delete_index(&self, uid: String) -> Result<()> {
|
||||||
let uuid = self.uuid_resolver.delete(uid).await?;
|
let uuid = self.uuid_resolver.delete(uid).await?;
|
||||||
self.update_handle.delete(uuid).await?;
|
|
||||||
self.index_handle.delete(uuid).await?;
|
// We remove the index from the resolver synchronously, and effectively perform the index
|
||||||
|
// deletion as a background task.
|
||||||
|
let update_handle = self.update_handle.clone();
|
||||||
|
let index_handle = self.index_handle.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = update_handle.delete(uuid).await {
|
||||||
|
error!("Error while deleting index: {}", e);
|
||||||
|
}
|
||||||
|
if let Err(e) = index_handle.delete(uuid).await {
|
||||||
|
error!("Error while deleting index: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -428,7 +428,8 @@ impl UpdateStore {
|
|||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Delete all updates for an index from the update store.
|
/// Delete all updates for an index from the update store. If the currently processing update
|
||||||
|
/// is for `index_uuid`, the call will block until the update is terminated.
|
||||||
pub fn delete_all(&self, index_uuid: Uuid) -> Result<()> {
|
pub fn delete_all(&self, index_uuid: Uuid) -> Result<()> {
|
||||||
let mut txn = self.env.write_txn()?;
|
let mut txn = self.env.write_txn()?;
|
||||||
// Contains all the content file paths that we need to be removed if the deletion was successful.
|
// Contains all the content file paths that we need to be removed if the deletion was successful.
|
||||||
@ -469,8 +470,14 @@ impl UpdateStore {
|
|||||||
let _ = remove_file(path);
|
let _ = remove_file(path);
|
||||||
});
|
});
|
||||||
|
|
||||||
// We don't care about the currently processing update, since it will be removed by itself
|
// If the currently processing update is from our index, we wait until it is
|
||||||
// once its done processing, and we can't abort a running update.
|
// finished before returning. This ensure that no write to the index occurs after we delete it.
|
||||||
|
if let State::Processing(uuid, _) = *self.state.read() {
|
||||||
|
if uuid == index_uuid {
|
||||||
|
// wait for a write lock, do nothing with it.
|
||||||
|
self.state.write();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user