When processing tasks, make the update file deletion atomic

This commit is contained in:
Tamo 2024-02-22 14:56:22 +01:00
parent 5ee6aaddc4
commit 91cdd502f8
5 changed files with 56 additions and 16 deletions

10
Cargo.lock generated
View File

@ -1728,6 +1728,7 @@ dependencies = [
"faux",
"tempfile",
"thiserror",
"tracing",
"uuid",
]
@ -2393,6 +2394,7 @@ dependencies = [
"meilisearch-types",
"page_size 0.5.0",
"puffin",
"rayon",
"roaring",
"serde",
"serde_json",
@ -4077,9 +4079,9 @@ dependencies = [
[[package]]
name = "rayon"
version = "1.8.0"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c27db03db7734835b3f53954b534c91069375ce6ccaa2e065441e07d9b6cdb1"
checksum = "fa7237101a77a10773db45d62004a272517633fbcc3df19d96455ede1122e051"
dependencies = [
"either",
"rayon-core",
@ -4098,9 +4100,9 @@ dependencies = [
[[package]]
name = "rayon-core"
version = "1.12.0"
version = "1.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ce3fb6ad83f861aac485e76e1985cd109d9a3713802152be56c3b1f0e0658ed"
checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2"
dependencies = [
"crossbeam-deque",
"crossbeam-utils",

View File

@ -13,6 +13,7 @@ license.workspace = true
[dependencies]
tempfile = "3.9.0"
thiserror = "1.0.56"
tracing = "0.1.40"
uuid = { version = "1.6.1", features = ["serde", "v4"] }
[dev-dependencies]

View File

@ -75,7 +75,13 @@ impl FileStore {
/// Returns the file corresponding to the requested uuid.
pub fn get_update(&self, uuid: Uuid) -> Result<StdFile> {
let path = self.get_update_path(uuid);
let file = StdFile::open(path)?;
let file = match StdFile::open(path) {
Ok(file) => file,
Err(e) => {
tracing::error!("Can't access update file {uuid}: {e}");
return Err(e.into());
}
};
Ok(file)
}
@ -110,8 +116,12 @@ impl FileStore {
pub fn delete(&self, uuid: Uuid) -> Result<()> {
let path = self.path.join(uuid.to_string());
std::fs::remove_file(path)?;
Ok(())
if let Err(e) = std::fs::remove_file(path) {
tracing::error!("Can't delete file {uuid}: {e}");
Err(e.into())
} else {
Ok(())
}
}
/// List the Uuids of the files in the FileStore

View File

@ -23,6 +23,7 @@ meilisearch-auth = { path = "../meilisearch-auth" }
meilisearch-types = { path = "../meilisearch-types" }
page_size = "0.5.0"
puffin = { version = "0.16.0", features = ["serialization"] }
rayon = "1.8.1"
roaring = { version = "0.10.2", features = ["serde"] }
serde = { version = "1.0.195", features = ["derive"] }
serde_json = { version = "1.0.111", features = ["preserve_order"] }

View File

@ -60,6 +60,7 @@ use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmap
use meilisearch_types::task_view::TaskView;
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use puffin::FrameView;
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use roaring::RoaringBitmap;
use synchronoise::SignalEvent;
use time::format_description::well_known::Rfc3339;
@ -1175,6 +1176,9 @@ impl IndexScheduler {
#[cfg(test)]
self.breakpoint(Breakpoint::ProcessBatchSucceeded);
let mut success = 0;
let mut failure = 0;
#[allow(unused_variables)]
for (i, mut task) in tasks.into_iter().enumerate() {
task.started_at = Some(started_at);
@ -1187,13 +1191,15 @@ impl IndexScheduler {
},
)?;
match task.error {
Some(_) => failure += 1,
None => success += 1,
}
self.update_task(&mut wtxn, &task)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?;
if let Err(e) = self.delete_persisted_task_data(&task) {
tracing::error!("Failure to delete the content files associated with task {}. Error: {e}", task.uid);
}
}
tracing::info!("A batch of tasks was successfully completed.");
tracing::info!("A batch of tasks was successfully completed with {success} successful tasks and {failure} failed tasks.");
}
// If we have an abortion error we must stop the tick here and re-schedule tasks.
Err(Error::Milli(milli::Error::InternalError(
@ -1204,6 +1210,7 @@ impl IndexScheduler {
self.breakpoint(Breakpoint::AbortedIndexation);
wtxn.abort();
tracing::info!("A batch of tasks was aborted.");
// We make sure that we don't call `stop_processing` on the `processing_tasks`,
// this is because we want to let the next tick call `create_next_batch` and keep
// the `started_at` date times and `processings` of the current processing tasks.
@ -1225,6 +1232,8 @@ impl IndexScheduler {
self.index_mapper.resize_index(&wtxn, &index_uid)?;
wtxn.abort();
tracing::info!("The max database size was reached. Resizing the index.");
return Ok(TickOutcome::TickAgain(0));
}
// In case of a failure we must get back and patch all the tasks with the error.
@ -1232,9 +1241,9 @@ impl IndexScheduler {
#[cfg(test)]
self.breakpoint(Breakpoint::ProcessBatchFailed);
let error: ResponseError = err.into();
for id in ids {
for id in ids.iter() {
let mut task = self
.get_task(&wtxn, id)
.get_task(&wtxn, *id)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?
.ok_or(Error::CorruptedTaskQueue)?;
task.started_at = Some(started_at);
@ -1246,9 +1255,8 @@ impl IndexScheduler {
#[cfg(test)]
self.maybe_fail(tests::FailureLocation::UpdatingTaskAfterProcessBatchFailure)?;
if let Err(e) = self.delete_persisted_task_data(&task) {
tracing::error!("Failure to delete the content files associated with task {}. Error: {e}", task.uid);
}
tracing::info!("Batch failed {}", error);
self.update_task(&mut wtxn, &task)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?;
}
@ -1262,6 +1270,24 @@ impl IndexScheduler {
wtxn.commit().map_err(Error::HeedTransaction)?;
// Once the tasks are commited, we should delete all the update files associated ASAP to avoid leaking files in case of a restart
tracing::debug!("Deleting the upadate files");
ids.into_par_iter().try_for_each(|id| -> Result<()> {
let rtxn = self.read_txn()?;
let task = self
.get_task(&rtxn, id)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?
.ok_or(Error::CorruptedTaskQueue)?;
if let Err(e) = self.delete_persisted_task_data(&task) {
tracing::error!(
"Failure to delete the content files associated with task {}. Error: {e}",
task.uid
);
}
Ok(())
})?;
// We shouldn't crash the tick function if we can't send data to the webhook.
let _ = self.notify_webhook(&processed);