Introduce the task cancelation task type

This commit is contained in:
Kerollmops 2022-10-17 17:19:17 +02:00 committed by Clément Renault
parent 4a7b5c7836
commit bcb7d1744a
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
7 changed files with 131 additions and 41 deletions

View file

@ -1,3 +1,4 @@
use std::sync::atomic::Ordering::Relaxed;
use std::collections::HashSet;
use std::fs::File;
use std::io::BufWriter;
@ -5,10 +6,8 @@ use std::io::BufWriter;
use crate::{autobatcher::BatchKind, Error, IndexScheduler, Result, TaskId};
use dump::IndexMetadata;
use meilisearch_types::milli::documents::obkv_to_object;
use meilisearch_types::tasks::{Details, Kind, KindWithContent, Status, Task};
use log::{debug, info};
use meilisearch_types::milli::update::IndexDocumentsConfig;
use meilisearch_types::milli::update::{
DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod,
@ -16,7 +15,9 @@ use meilisearch_types::milli::update::{
use meilisearch_types::milli::{
self, documents::DocumentsBatchReader, update::Settings as MilliSettings, BEU32,
};
use meilisearch_types::milli::documents::obkv_to_object;
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
use meilisearch_types::tasks::{Details, Kind, KindWithContent, Status, Task};
use meilisearch_types::{
heed::{RoTxn, RwTxn},
Index,
@ -27,7 +28,7 @@ use uuid::Uuid;
#[derive(Debug)]
pub(crate) enum Batch {
Cancel(Task),
TaskCancelation(Task),
TaskDeletion(Task),
Snapshot(Vec<Task>),
Dump(Task),
@ -103,7 +104,7 @@ pub(crate) enum IndexOperation {
impl Batch {
pub fn ids(&self) -> Vec<TaskId> {
match self {
Batch::Cancel(task)
Batch::TaskCancelation(task)
| Batch::TaskDeletion(task)
| Batch::Dump(task)
| Batch::IndexCreation { task, .. }
@ -378,11 +379,11 @@ impl IndexScheduler {
/// 5. We get the *next* tasks to process for a specific index.
pub(crate) fn create_next_batch(&self, rtxn: &RoTxn) -> Result<Option<Batch>> {
let enqueued = &self.get_status(rtxn, Status::Enqueued)?;
let to_cancel = self.get_kind(rtxn, Kind::CancelTask)? & enqueued;
let to_cancel = self.get_kind(rtxn, Kind::TaskCancelation)? & enqueued;
// 1. we get the last task to cancel.
if let Some(task_id) = to_cancel.max() {
return Ok(Some(Batch::Cancel(
return Ok(Some(Batch::TaskCancelation(
self.get_task(rtxn, task_id)?
.ok_or(Error::CorruptedTaskQueue)?,
)));
@ -457,7 +458,33 @@ impl IndexScheduler {
pub(crate) fn process_batch(&self, batch: Batch) -> Result<Vec<Task>> {
match batch {
Batch::Cancel(_) => todo!(),
Batch::TaskCancelation(mut task) => {
// 1. Retrieve the tasks that matched the query at enqueue-time.
let matched_tasks =
if let KindWithContent::TaskCancelation { tasks, query: _ } = &task.kind {
tasks
} else {
unreachable!()
};
let mut wtxn = self.env.write_txn()?;
let nbr_canceled_tasks = self.cancel_matched_tasks(&mut wtxn, matched_tasks)?;
task.status = Status::Succeeded;
match &mut task.details {
Some(Details::TaskCancelation {
matched_tasks: _,
canceled_tasks,
original_query: _,
}) => {
*canceled_tasks = Some(nbr_canceled_tasks);
}
_ => unreachable!(),
}
wtxn.commit()?;
Ok(vec![task])
}
Batch::TaskDeletion(mut task) => {
// 1. Retrieve the tasks that matched the query at enqueue-time.
let matched_tasks =
@ -652,7 +679,11 @@ impl IndexScheduler {
self.index_mapper.indexer_config(),
);
builder.set_primary_key(primary_key);
builder.execute(|_| ())?;
let must_stop = self.processing_tasks.read().unwrap().must_stop.clone();
builder.execute(
|indexing_step| debug!("update: {:?}", indexing_step),
|| must_stop.load(Relaxed),
)?;
index_wtxn.commit()?;
}
@ -730,6 +761,7 @@ impl IndexScheduler {
content_files,
mut tasks,
} => {
let must_stop = self.processing_tasks.read().unwrap().must_stop.clone();
let indexer_config = self.index_mapper.indexer_config();
// TODO use the code from the IndexCreate operation
if let Some(primary_key) = primary_key {
@ -737,7 +769,10 @@ impl IndexScheduler {
let mut builder =
milli::update::Settings::new(index_wtxn, index, indexer_config);
builder.set_primary_key(primary_key);
builder.execute(|_| ())?;
builder.execute(
|indexing_step| debug!("update: {:?}", indexing_step),
|| must_stop.clone().load(Relaxed),
)?;
}
}
@ -752,6 +787,7 @@ impl IndexScheduler {
indexer_config,
config,
|indexing_step| debug!("update: {:?}", indexing_step),
|| must_stop.load(Relaxed),
)?;
let mut results = Vec::new();
@ -845,9 +881,11 @@ impl IndexScheduler {
let mut builder =
milli::update::Settings::new(index_wtxn, index, indexer_config);
apply_settings_to_builder(&checked_settings, &mut builder);
builder.execute(|indexing_step| {
debug!("update: {:?}", indexing_step);
})?;
let must_stop = self.processing_tasks.read().unwrap().must_stop.clone();
builder.execute(
|indexing_step| debug!("update: {:?}", indexing_step),
|| must_stop.load(Relaxed),
)?;
task.status = Status::Succeeded;
}