Extract the must_stop flag out of the RwLock

This commit is contained in:
Kerollmops 2022-10-19 11:22:59 +02:00 committed by Clément Renault
parent 3cbfacb616
commit b373d19831
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 43 additions and 22 deletions

View File

@ -680,10 +680,10 @@ impl IndexScheduler {
self.index_mapper.indexer_config(),
);
builder.set_primary_key(primary_key);
let must_stop = self.processing_tasks.read().unwrap().must_stop.clone();
let must_stop_processing = self.must_stop_processing.clone();
builder.execute(
|indexing_step| debug!("update: {:?}", indexing_step),
|| must_stop.load(Relaxed),
|| must_stop_processing.get(),
)?;
index_wtxn.commit()?;
}
@ -762,7 +762,7 @@ impl IndexScheduler {
content_files,
mut tasks,
} => {
let must_stop = self.processing_tasks.read().unwrap().must_stop.clone();
let must_stop_processing = self.must_stop_processing.clone();
let indexer_config = self.index_mapper.indexer_config();
// TODO use the code from the IndexCreate operation
if let Some(primary_key) = primary_key {
@ -772,7 +772,7 @@ impl IndexScheduler {
builder.set_primary_key(primary_key);
builder.execute(
|indexing_step| debug!("update: {:?}", indexing_step),
|| must_stop.clone().load(Relaxed),
|| must_stop_processing.clone().get(),
)?;
}
}
@ -788,7 +788,7 @@ impl IndexScheduler {
indexer_config,
config,
|indexing_step| debug!("update: {:?}", indexing_step),
|| must_stop.load(Relaxed),
|| must_stop_processing.get(),
)?;
let mut results = Vec::new();
@ -882,10 +882,10 @@ impl IndexScheduler {
let mut builder =
milli::update::Settings::new(index_wtxn, index, indexer_config);
apply_settings_to_builder(&checked_settings, &mut builder);
let must_stop = self.processing_tasks.read().unwrap().must_stop.clone();
let must_stop_processing = self.must_stop_processing.clone();
builder.execute(
|indexing_step| debug!("update: {:?}", indexing_step),
|| must_stop.load(Relaxed),
|| must_stop_processing.get(),
)?;
task.status = Status::Succeeded;

View File

@ -125,31 +125,41 @@ struct ProcessingTasks {
started_at: OffsetDateTime,
/// The list of tasks ids that are currently running.
processing: RoaringBitmap,
/// A boolean that can be set to true to stop the currently processing tasks.
must_stop: Arc<AtomicBool>,
}
impl ProcessingTasks {
/// Stores the currently processing tasks, the date time at which it started
/// and resets the _must stop_ flag.
/// Stores the currently processing tasks, and the date time at which it started.
fn start_processing_at(&mut self, started_at: OffsetDateTime, processing: RoaringBitmap) {
self.started_at = started_at;
self.processing = processing;
self.must_stop.store(false, Relaxed);
}
/// Resets the processing tasks to an empty list.
/// Set the processing tasks to an empty list.
fn stop_processing_at(&mut self, stopped_at: OffsetDateTime) {
self.started_at = stopped_at;
self.processing = RoaringBitmap::new();
}
/// Forces the currently processing tasks to stop running if necessary.
fn cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) {
// If there, at least, is one task that is currently processing we must stop.
if !self.processing.is_disjoint(canceled_tasks) {
self.must_stop.store(true, Relaxed);
}
/// Returns `true` if there, at least, is one task that is currently processing we must stop.
fn must_cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) -> bool {
!self.processing.is_disjoint(canceled_tasks)
}
}
#[derive(Default, Clone, Debug)]
struct MustStopProcessing(Arc<AtomicBool>);
impl MustStopProcessing {
fn get(&self) -> bool {
self.0.load(Relaxed)
}
fn must_stop(&self) {
self.0.store(true, Relaxed);
}
fn reset(&self) {
self.0.store(false, Relaxed);
}
}
@ -168,6 +178,8 @@ pub struct IndexScheduler {
/// The LMDB environment which the DBs are associated with.
pub(crate) env: Env,
/// A boolean that can be set to true to stop the currently processing tasks.
pub(crate) must_stop_processing: MustStopProcessing,
pub(crate) processing_tasks: Arc<RwLock<ProcessingTasks>>,
pub(crate) file_store: FileStore,
@ -233,12 +245,12 @@ impl IndexScheduler {
let processing_tasks = ProcessingTasks {
started_at: OffsetDateTime::now_utc(),
processing: RoaringBitmap::new(),
must_stop: Arc::new(AtomicBool::new(false)),
};
let file_store = FileStore::new(&update_file_path)?;
// allow unreachable_code to get rids of the warning in the case of a test build.
let this = Self {
must_stop_processing: MustStopProcessing::default(),
processing_tasks: Arc::new(RwLock::new(processing_tasks)),
file_store,
all_tasks: env.create_database(Some(db_name::ALL_TASKS))?,
@ -263,6 +275,7 @@ impl IndexScheduler {
/// This function will execute in a different thread and must be called only once.
fn run(&self) {
let run = Self {
must_stop_processing: MustStopProcessing::default(),
processing_tasks: self.processing_tasks.clone(),
file_store: self.file_store.clone(),
env: self.env.clone(),
@ -433,10 +446,14 @@ impl IndexScheduler {
// we inform the processing tasks to stop (if necessary).
if let KindWithContent::TaskCancelation { tasks, .. } = kind {
let tasks_to_cancel = RoaringBitmap::from_iter(tasks);
self.processing_tasks
if self
.processing_tasks
.read()
.unwrap()
.cancel_processing_tasks(&tasks_to_cancel);
.must_cancel_processing_tasks(&tasks_to_cancel)
{
self.must_stop_processing.must_stop();
}
}
// notify the scheduler loop to execute a new tick
@ -612,6 +629,9 @@ impl IndexScheduler {
let processed_tasks = ids.len();
let processing_tasks = RoaringBitmap::from_sorted_iter(ids.iter().copied()).unwrap();
let started_at = OffsetDateTime::now_utc();
// We reset the must_stop flag to be sure that we don't stop processing tasks
self.must_stop_processing.reset();
self.processing_tasks
.write()
.unwrap()

View File

@ -14,6 +14,7 @@ use crate::{index_mapper::IndexMapper, IndexScheduler, Kind, Status};
pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
let IndexScheduler {
autobatching_enabled,
must_stop_processing,
processing_tasks,
file_store,
env,