diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 8e2eb26a0..b7e31c136 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -142,22 +142,28 @@ pub(crate) enum IndexOperation { impl Batch { /// Return the task ids associated with this batch. - pub fn ids(&self) -> Vec { + pub fn ids(&self) -> RoaringBitmap { match self { Batch::TaskCancelation { task, .. } | Batch::Dump(task) | Batch::IndexCreation { task, .. } - | Batch::IndexUpdate { task, .. } => vec![task.uid], + | Batch::IndexUpdate { task, .. } => { + RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap() + } Batch::SnapshotCreation(tasks) | Batch::TaskDeletions(tasks) - | Batch::IndexDeletion { tasks, .. } => tasks.iter().map(|task| task.uid).collect(), + | Batch::IndexDeletion { tasks, .. } => { + RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid)) + } Batch::IndexOperation { op, .. } => match op { IndexOperation::DocumentOperation { tasks, .. } | IndexOperation::Settings { tasks, .. } | IndexOperation::DocumentClear { tasks, .. } => { - tasks.iter().map(|task| task.uid).collect() + RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid)) + } + IndexOperation::IndexDocumentDeletionByFilter { task, .. } => { + RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap() } - IndexOperation::IndexDocumentDeletionByFilter { task, .. } => vec![task.uid], IndexOperation::SettingsAndDocumentOperation { document_import_tasks: tasks, settings_tasks: other, @@ -167,9 +173,11 @@ impl Batch { cleared_tasks: tasks, settings_tasks: other, .. - } => tasks.iter().chain(other).map(|task| task.uid).collect(), + } => RoaringBitmap::from_iter(tasks.iter().chain(other).map(|task| task.uid)), }, - Batch::IndexSwap { task } => vec![task.uid], + Batch::IndexSwap { task } => { + RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap() + } } } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 535b5a36e..38a999ad7 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -37,8 +37,8 @@ use std::fs::File; use std::io::{self, BufReader, Read}; use std::ops::{Bound, RangeBounds}; use std::path::{Path, PathBuf}; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering::Relaxed; +use std::sync::atomic::Ordering::{self, Relaxed}; +use std::sync::atomic::{AtomicBool, AtomicU32}; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -60,6 +60,7 @@ use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmap use meilisearch_types::task_view::TaskView; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use puffin::FrameView; +use rayon::current_num_threads; use rayon::prelude::{IntoParallelIterator, ParallelIterator}; use roaring::RoaringBitmap; use synchronoise::SignalEvent; @@ -1139,15 +1140,13 @@ impl IndexScheduler { drop(rtxn); // 1. store the starting date with the bitmap of processing tasks. - let mut ids = batch.ids(); - ids.sort_unstable(); + let ids = batch.ids(); let processed_tasks = ids.len(); - let processing_tasks = RoaringBitmap::from_sorted_iter(ids.iter().copied()).unwrap(); let started_at = OffsetDateTime::now_utc(); // We reset the must_stop flag to be sure that we don't stop processing tasks self.must_stop_processing.reset(); - self.processing_tasks.write().unwrap().start_processing_at(started_at, processing_tasks); + self.processing_tasks.write().unwrap().start_processing_at(started_at, ids.clone()); #[cfg(test)] self.breakpoint(Breakpoint::BatchCreated); @@ -1243,7 +1242,7 @@ impl IndexScheduler { let error: ResponseError = err.into(); for id in ids.iter() { let mut task = self - .get_task(&wtxn, *id) + .get_task(&wtxn, id) .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? .ok_or(Error::CorruptedTaskQueue)?; task.started_at = Some(started_at); @@ -1273,17 +1272,21 @@ impl IndexScheduler { // Once the tasks are commited, we should delete all the update files associated ASAP to avoid leaking files in case of a restart tracing::debug!("Deleting the upadate files"); - ids.into_par_iter().try_for_each(|id| -> Result<()> { + //We take one read transaction **per thread**. Then, every thread is going to pull out new IDs from the roaring bitmap with the help of an atomic shared index into the bitmap + let idx = AtomicU32::new(0); + (0..current_num_threads()).into_par_iter().try_for_each(|_| -> Result<()> { let rtxn = self.read_txn()?; - let task = self - .get_task(&rtxn, id) - .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? - .ok_or(Error::CorruptedTaskQueue)?; - if let Err(e) = self.delete_persisted_task_data(&task) { - tracing::error!( - "Failure to delete the content files associated with task {}. Error: {e}", - task.uid - ); + while let Some(id) = ids.select(idx.fetch_add(1, Ordering::Relaxed)) { + let task = self + .get_task(&rtxn, id) + .map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))? + .ok_or(Error::CorruptedTaskQueue)?; + if let Err(e) = self.delete_persisted_task_data(&task) { + tracing::error!( + "Failure to delete the content files associated with task {}. Error: {e}", + task.uid + ); + } } Ok(()) })?; @@ -1696,7 +1699,7 @@ pub enum TickOutcome { /// The scheduler should immediately attempt another `tick`. /// /// The `usize` field contains the number of processed tasks. - TickAgain(usize), + TickAgain(u64), /// The scheduler should wait for an external signal before attempting another `tick`. WaitForSignal, }