takes only one read transaction per thread

This commit is contained in:
Tamo 2024-02-26 10:43:04 +01:00
parent 91cdd502f8
commit 066a7a3cde
2 changed files with 36 additions and 25 deletions

View File

@ -142,22 +142,28 @@ pub(crate) enum IndexOperation {
impl Batch { impl Batch {
/// Return the task ids associated with this batch. /// Return the task ids associated with this batch.
pub fn ids(&self) -> Vec<TaskId> { pub fn ids(&self) -> RoaringBitmap {
match self { match self {
Batch::TaskCancelation { task, .. } Batch::TaskCancelation { task, .. }
| Batch::Dump(task) | Batch::Dump(task)
| Batch::IndexCreation { task, .. } | Batch::IndexCreation { task, .. }
| Batch::IndexUpdate { task, .. } => vec![task.uid], | Batch::IndexUpdate { task, .. } => {
RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap()
}
Batch::SnapshotCreation(tasks) Batch::SnapshotCreation(tasks)
| Batch::TaskDeletions(tasks) | Batch::TaskDeletions(tasks)
| Batch::IndexDeletion { tasks, .. } => tasks.iter().map(|task| task.uid).collect(), | Batch::IndexDeletion { tasks, .. } => {
RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid))
}
Batch::IndexOperation { op, .. } => match op { Batch::IndexOperation { op, .. } => match op {
IndexOperation::DocumentOperation { tasks, .. } IndexOperation::DocumentOperation { tasks, .. }
| IndexOperation::Settings { tasks, .. } | IndexOperation::Settings { tasks, .. }
| IndexOperation::DocumentClear { tasks, .. } => { | IndexOperation::DocumentClear { tasks, .. } => {
tasks.iter().map(|task| task.uid).collect() RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid))
}
IndexOperation::IndexDocumentDeletionByFilter { task, .. } => {
RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap()
} }
IndexOperation::IndexDocumentDeletionByFilter { task, .. } => vec![task.uid],
IndexOperation::SettingsAndDocumentOperation { IndexOperation::SettingsAndDocumentOperation {
document_import_tasks: tasks, document_import_tasks: tasks,
settings_tasks: other, settings_tasks: other,
@ -167,9 +173,11 @@ impl Batch {
cleared_tasks: tasks, cleared_tasks: tasks,
settings_tasks: other, settings_tasks: other,
.. ..
} => tasks.iter().chain(other).map(|task| task.uid).collect(), } => RoaringBitmap::from_iter(tasks.iter().chain(other).map(|task| task.uid)),
}, },
Batch::IndexSwap { task } => vec![task.uid], Batch::IndexSwap { task } => {
RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap()
}
} }
} }

View File

@ -37,8 +37,8 @@ use std::fs::File;
use std::io::{self, BufReader, Read}; use std::io::{self, BufReader, Read};
use std::ops::{Bound, RangeBounds}; use std::ops::{Bound, RangeBounds};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::{self, Relaxed};
use std::sync::atomic::Ordering::Relaxed; use std::sync::atomic::{AtomicBool, AtomicU32};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::Duration; use std::time::Duration;
@ -60,6 +60,7 @@ use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmap
use meilisearch_types::task_view::TaskView; use meilisearch_types::task_view::TaskView;
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use puffin::FrameView; use puffin::FrameView;
use rayon::current_num_threads;
use rayon::prelude::{IntoParallelIterator, ParallelIterator}; use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use roaring::RoaringBitmap; use roaring::RoaringBitmap;
use synchronoise::SignalEvent; use synchronoise::SignalEvent;
@ -1139,15 +1140,13 @@ impl IndexScheduler {
drop(rtxn); drop(rtxn);
// 1. store the starting date with the bitmap of processing tasks. // 1. store the starting date with the bitmap of processing tasks.
let mut ids = batch.ids(); let ids = batch.ids();
ids.sort_unstable();
let processed_tasks = ids.len(); let processed_tasks = ids.len();
let processing_tasks = RoaringBitmap::from_sorted_iter(ids.iter().copied()).unwrap();
let started_at = OffsetDateTime::now_utc(); let started_at = OffsetDateTime::now_utc();
// We reset the must_stop flag to be sure that we don't stop processing tasks // We reset the must_stop flag to be sure that we don't stop processing tasks
self.must_stop_processing.reset(); self.must_stop_processing.reset();
self.processing_tasks.write().unwrap().start_processing_at(started_at, processing_tasks); self.processing_tasks.write().unwrap().start_processing_at(started_at, ids.clone());
#[cfg(test)] #[cfg(test)]
self.breakpoint(Breakpoint::BatchCreated); self.breakpoint(Breakpoint::BatchCreated);
@ -1243,7 +1242,7 @@ impl IndexScheduler {
let error: ResponseError = err.into(); let error: ResponseError = err.into();
for id in ids.iter() { for id in ids.iter() {
let mut task = self let mut task = self
.get_task(&wtxn, *id) .get_task(&wtxn, id)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?
.ok_or(Error::CorruptedTaskQueue)?; .ok_or(Error::CorruptedTaskQueue)?;
task.started_at = Some(started_at); task.started_at = Some(started_at);
@ -1273,17 +1272,21 @@ impl IndexScheduler {
// Once the tasks are commited, we should delete all the update files associated ASAP to avoid leaking files in case of a restart // Once the tasks are commited, we should delete all the update files associated ASAP to avoid leaking files in case of a restart
tracing::debug!("Deleting the upadate files"); tracing::debug!("Deleting the upadate files");
ids.into_par_iter().try_for_each(|id| -> Result<()> { //We take one read transaction **per thread**. Then, every thread is going to pull out new IDs from the roaring bitmap with the help of an atomic shared index into the bitmap
let idx = AtomicU32::new(0);
(0..current_num_threads()).into_par_iter().try_for_each(|_| -> Result<()> {
let rtxn = self.read_txn()?; let rtxn = self.read_txn()?;
let task = self while let Some(id) = ids.select(idx.fetch_add(1, Ordering::Relaxed)) {
.get_task(&rtxn, id) let task = self
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? .get_task(&rtxn, id)
.ok_or(Error::CorruptedTaskQueue)?; .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?
if let Err(e) = self.delete_persisted_task_data(&task) { .ok_or(Error::CorruptedTaskQueue)?;
tracing::error!( if let Err(e) = self.delete_persisted_task_data(&task) {
"Failure to delete the content files associated with task {}. Error: {e}", tracing::error!(
task.uid "Failure to delete the content files associated with task {}. Error: {e}",
); task.uid
);
}
} }
Ok(()) Ok(())
})?; })?;
@ -1696,7 +1699,7 @@ pub enum TickOutcome {
/// The scheduler should immediately attempt another `tick`. /// The scheduler should immediately attempt another `tick`.
/// ///
/// The `usize` field contains the number of processed tasks. /// The `usize` field contains the number of processed tasks.
TickAgain(usize), TickAgain(u64),
/// The scheduler should wait for an external signal before attempting another `tick`. /// The scheduler should wait for an external signal before attempting another `tick`.
WaitForSignal, WaitForSignal,
} }