add the batch_id to the tasks

This commit is contained in:
Tamo 2024-11-13 11:27:12 +01:00
parent 057fcb3993
commit 6062914654
No known key found for this signature in database
GPG key ID: 20CD8020AFA88D69
126 changed files with 755 additions and 158 deletions

View file

@ -24,6 +24,7 @@ use std::fs::{self, File};
use std::io::BufWriter;
use dump::IndexMetadata;
use meilisearch_types::batches::BatchId;
use meilisearch_types::error::Code;
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
@ -279,18 +280,22 @@ impl IndexScheduler {
rtxn: &RoTxn,
index_uid: String,
batch: BatchKind,
batch_id: BatchId,
must_create_index: bool,
) -> Result<Option<Batch>> {
match batch {
BatchKind::DocumentClear { ids } => Ok(Some(Batch::IndexOperation {
op: IndexOperation::DocumentClear {
tasks: self.get_existing_tasks(rtxn, ids)?,
tasks: self.get_existing_tasks_with_batch_id(rtxn, batch_id, ids)?,
index_uid,
},
must_create_index,
})),
BatchKind::DocumentEdition { id } => {
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
let task = self
.get_task(rtxn, id)?
.ok_or(Error::CorruptedTaskQueue)?
.with_batch_id(batch_id);
match &task.kind {
KindWithContent::DocumentEdition { index_uid, .. } => {
Ok(Some(Batch::IndexOperation {
@ -305,7 +310,7 @@ impl IndexScheduler {
}
}
BatchKind::DocumentOperation { method, operation_ids, .. } => {
let tasks = self.get_existing_tasks(rtxn, operation_ids)?;
let tasks = self.get_existing_tasks_with_batch_id(rtxn, batch_id, operation_ids)?;
let primary_key = tasks
.iter()
.find_map(|task| match task.kind {
@ -352,7 +357,7 @@ impl IndexScheduler {
}))
}
BatchKind::DocumentDeletion { deletion_ids, includes_by_filter: _ } => {
let tasks = self.get_existing_tasks(rtxn, deletion_ids)?;
let tasks = self.get_existing_tasks_with_batch_id(rtxn, batch_id, deletion_ids)?;
Ok(Some(Batch::IndexOperation {
op: IndexOperation::DocumentDeletion { index_uid, tasks },
@ -360,7 +365,7 @@ impl IndexScheduler {
}))
}
BatchKind::Settings { settings_ids, .. } => {
let tasks = self.get_existing_tasks(rtxn, settings_ids)?;
let tasks = self.get_existing_tasks_with_batch_id(rtxn, batch_id, settings_ids)?;
let mut settings = Vec::new();
for task in &tasks {
@ -383,6 +388,7 @@ impl IndexScheduler {
rtxn,
index_uid,
BatchKind::Settings { settings_ids, allow_index_creation },
batch_id,
must_create_index,
)?
.unwrap()
@ -398,6 +404,7 @@ impl IndexScheduler {
rtxn,
index_uid,
BatchKind::DocumentClear { ids: other },
batch_id,
must_create_index,
)?
.unwrap()
@ -430,6 +437,7 @@ impl IndexScheduler {
rtxn,
index_uid.clone(),
BatchKind::Settings { settings_ids, allow_index_creation },
batch_id,
must_create_index,
)?;
@ -442,6 +450,7 @@ impl IndexScheduler {
primary_key,
operation_ids,
},
batch_id,
must_create_index,
)?;
@ -479,7 +488,10 @@ impl IndexScheduler {
}
}
BatchKind::IndexCreation { id } => {
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
let task = self
.get_task(rtxn, id)?
.ok_or(Error::CorruptedTaskQueue)?
.with_batch_id(batch_id);
let (index_uid, primary_key) = match &task.kind {
KindWithContent::IndexCreation { index_uid, primary_key } => {
(index_uid.clone(), primary_key.clone())
@ -489,7 +501,10 @@ impl IndexScheduler {
Ok(Some(Batch::IndexCreation { index_uid, primary_key, task }))
}
BatchKind::IndexUpdate { id } => {
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
let task = self
.get_task(rtxn, id)?
.ok_or(Error::CorruptedTaskQueue)?
.with_batch_id(batch_id);
let primary_key = match &task.kind {
KindWithContent::IndexUpdate { primary_key, .. } => primary_key.clone(),
_ => unreachable!(),
@ -499,10 +514,13 @@ impl IndexScheduler {
BatchKind::IndexDeletion { ids } => Ok(Some(Batch::IndexDeletion {
index_uid,
index_has_been_created: must_create_index,
tasks: self.get_existing_tasks(rtxn, ids)?,
tasks: self.get_existing_tasks_with_batch_id(rtxn, batch_id, ids)?,
})),
BatchKind::IndexSwap { id } => {
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
let task = self
.get_task(rtxn, id)?
.ok_or(Error::CorruptedTaskQueue)?
.with_batch_id(batch_id);
Ok(Some(Batch::IndexSwap { task }))
}
}
@ -515,10 +533,11 @@ impl IndexScheduler {
/// 4. We get the *next* dump to process.
/// 5. We get the *next* tasks to process for a specific index.
#[tracing::instrument(level = "trace", skip(self, rtxn), target = "indexing::scheduler")]
pub(crate) fn create_next_batch(&self, rtxn: &RoTxn) -> Result<Option<Batch>> {
pub(crate) fn create_next_batch(&self, rtxn: &RoTxn) -> Result<Option<(Batch, BatchId)>> {
#[cfg(test)]
self.maybe_fail(crate::tests::FailureLocation::InsideCreateBatch)?;
let batch_id = self.next_batch_id(rtxn)?;
let enqueued = &self.get_status(rtxn, Status::Enqueued)?;
let to_cancel = self.get_kind(rtxn, Kind::TaskCancelation)? & enqueued;
@ -526,39 +545,65 @@ impl IndexScheduler {
if let Some(task_id) = to_cancel.max() {
// We retrieve the tasks that were processing before this tasks cancelation started.
// We must *not* reset the processing tasks before calling this method.
let ProcessingTasks { started_at, processing } =
// Displaying the `batch_id` would make a strange error message since this task cancelation is going to
// replace the canceled batch. It's better to avoid mentioning it in the error message.
let ProcessingTasks { started_at, batch_id: _, processing } =
&*self.processing_tasks.read().unwrap();
return Ok(Some(Batch::TaskCancelation {
task: self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?,
previous_started_at: *started_at,
previous_processing_tasks: processing.clone(),
}));
return Ok(Some((
Batch::TaskCancelation {
task: self
.get_task(rtxn, task_id)?
.ok_or(Error::CorruptedTaskQueue)?
.with_batch_id(batch_id),
previous_started_at: *started_at,
previous_processing_tasks: processing.clone(),
},
batch_id,
)));
}
// 2. we get the next task to delete
let to_delete = self.get_kind(rtxn, Kind::TaskDeletion)? & enqueued;
if !to_delete.is_empty() {
let tasks = self.get_existing_tasks(rtxn, to_delete)?;
return Ok(Some(Batch::TaskDeletions(tasks)));
let tasks = self
.get_existing_tasks(rtxn, to_delete)?
.into_iter()
.map(|task| task.with_batch_id(batch_id))
.collect();
return Ok(Some((Batch::TaskDeletions(tasks), batch_id)));
}
// 3. we batch the snapshot.
let to_snapshot = self.get_kind(rtxn, Kind::SnapshotCreation)? & enqueued;
if !to_snapshot.is_empty() {
return Ok(Some(Batch::SnapshotCreation(self.get_existing_tasks(rtxn, to_snapshot)?)));
return Ok(Some((
Batch::SnapshotCreation(
self.get_existing_tasks(rtxn, to_snapshot)?
.into_iter()
.map(|task| task.with_batch_id(batch_id))
.collect(),
),
batch_id,
)));
}
// 4. we batch the dumps.
let to_dump = self.get_kind(rtxn, Kind::DumpCreation)? & enqueued;
if let Some(to_dump) = to_dump.min() {
return Ok(Some(Batch::Dump(
self.get_task(rtxn, to_dump)?.ok_or(Error::CorruptedTaskQueue)?,
return Ok(Some((
Batch::Dump(
self.get_task(rtxn, to_dump)?
.ok_or(Error::CorruptedTaskQueue)?
.with_batch_id(batch_id),
),
batch_id,
)));
}
// 5. We make a batch from the unprioritised tasks. Start by taking the next enqueued task.
let task_id = if let Some(task_id) = enqueued.min() { task_id } else { return Ok(None) };
let task = self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
let task =
self.get_task(rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?.with_batch_id(batch_id);
// If the task is not associated with any index, verify that it is an index swap and
// create the batch directly. Otherwise, get the index name associated with the task
@ -568,7 +613,7 @@ impl IndexScheduler {
index_name
} else {
assert!(matches!(&task.kind, KindWithContent::IndexSwap { swaps } if swaps.is_empty()));
return Ok(Some(Batch::IndexSwap { task }));
return Ok(Some((Batch::IndexSwap { task }, batch_id)));
};
let index_already_exists = self.index_mapper.exists(rtxn, index_name)?;
@ -599,12 +644,15 @@ impl IndexScheduler {
if let Some((batchkind, create_index)) =
autobatcher::autobatch(enqueued, index_already_exists, primary_key.as_deref())
{
return self.create_next_batch_index(
rtxn,
index_name.to_string(),
batchkind,
create_index,
);
return Ok(self
.create_next_batch_index(
rtxn,
index_name.to_string(),
batchkind,
batch_id,
create_index,
)?
.map(|batch| (batch, batch_id)));
}
// If we found no tasks then we were notified for something that got autobatched

View file

@ -79,7 +79,9 @@ pub enum Error {
)]
InvalidTaskDate { field: DateField, date: String },
#[error("Task uid `{task_uid}` is invalid. It should only contain numeric characters.")]
InvalidTaskUids { task_uid: String },
InvalidTaskUid { task_uid: String },
#[error("Batch uid `{batch_uid}` is invalid. It should only contain numeric characters.")]
InvalidBatchUid { batch_uid: String },
#[error(
"Task status `{status}` is invalid. Available task statuses are {}.",
enum_iterator::all::<Status>()
@ -172,7 +174,8 @@ impl Error {
| Error::SwapIndexesNotFound(_)
| Error::CorruptedDump
| Error::InvalidTaskDate { .. }
| Error::InvalidTaskUids { .. }
| Error::InvalidTaskUid { .. }
| Error::InvalidBatchUid { .. }
| Error::InvalidTaskStatuses { .. }
| Error::InvalidTaskTypes { .. }
| Error::InvalidTaskCanceledBy { .. }
@ -216,7 +219,8 @@ impl ErrorCode for Error {
Error::SwapIndexNotFound(_) => Code::IndexNotFound,
Error::SwapIndexesNotFound(_) => Code::IndexNotFound,
Error::InvalidTaskDate { field, .. } => (*field).into(),
Error::InvalidTaskUids { .. } => Code::InvalidTaskUids,
Error::InvalidTaskUid { .. } => Code::InvalidTaskUids,
Error::InvalidBatchUid { .. } => Code::InvalidBatchUids,
Error::InvalidTaskStatuses { .. } => Code::InvalidTaskStatuses,
Error::InvalidTaskTypes { .. } => Code::InvalidTaskTypes,
Error::InvalidTaskCanceledBy { .. } => Code::InvalidTaskCanceledBy,

View file

@ -24,6 +24,8 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
file_store,
env,
all_tasks,
all_batches,
// task reverse index
status,
kind,
index_tasks,
@ -31,6 +33,16 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
enqueued_at,
started_at,
finished_at,
// batch reverse index
batch_status,
batch_kind,
batch_index_tasks,
batch_canceled_by,
batch_enqueued_at,
batch_started_at,
batch_finished_at,
index_mapper,
features: _,
max_number_of_tasks: _,
@ -145,6 +157,7 @@ pub fn snapshot_task(task: &Task) -> String {
let mut snap = String::new();
let Task {
uid,
batch_uid,
enqueued_at: _,
started_at: _,
finished_at: _,
@ -156,6 +169,9 @@ pub fn snapshot_task(task: &Task) -> String {
} = task;
snap.push('{');
snap.push_str(&format!("uid: {uid}, "));
if let Some(batch_uid) = batch_uid {
snap.push_str(&format!("batch_uid: {batch_uid}, "));
}
snap.push_str(&format!("status: {status}, "));
if let Some(canceled_by) = canceled_by {
snap.push_str(&format!("canceled_by: {canceled_by}, "));

View file

@ -48,6 +48,7 @@ pub use features::RoFeatures;
use file_store::FileStore;
use flate2::bufread::GzEncoder;
use flate2::Compression;
use meilisearch_types::batches::{Batch, BatchId};
use meilisearch_types::error::ResponseError;
use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFeatures};
use meilisearch_types::heed::byteorder::BE;
@ -162,6 +163,8 @@ impl Query {
struct ProcessingTasks {
/// The date and time at which the indexation started.
started_at: OffsetDateTime,
/// The id of the batch processing
batch_id: Option<BatchId>,
/// The list of tasks ids that are currently running.
processing: RoaringBitmap,
}
@ -169,17 +172,28 @@ struct ProcessingTasks {
impl ProcessingTasks {
/// Creates an empty `ProcessingAt` struct.
fn new() -> ProcessingTasks {
ProcessingTasks { started_at: OffsetDateTime::now_utc(), processing: RoaringBitmap::new() }
ProcessingTasks {
started_at: OffsetDateTime::now_utc(),
batch_id: None,
processing: RoaringBitmap::new(),
}
}
/// Stores the currently processing tasks, and the date time at which it started.
fn start_processing_at(&mut self, started_at: OffsetDateTime, processing: RoaringBitmap) {
fn start_processing(
&mut self,
started_at: OffsetDateTime,
batch_id: BatchId,
processing: RoaringBitmap,
) {
self.started_at = started_at;
self.batch_id = Some(batch_id);
self.processing = processing;
}
/// Set the processing tasks to an empty list
fn stop_processing(&mut self) -> RoaringBitmap {
self.batch_id = None;
std::mem::take(&mut self.processing)
}
@ -209,6 +223,7 @@ impl MustStopProcessing {
/// Database const names for the `IndexScheduler`.
mod db_name {
pub const ALL_TASKS: &str = "all-tasks";
pub const ALL_BATCHES: &str = "all-batches";
pub const STATUS: &str = "status";
pub const KIND: &str = "kind";
pub const INDEX_TASKS: &str = "index-tasks";
@ -216,6 +231,14 @@ mod db_name {
pub const ENQUEUED_AT: &str = "enqueued-at";
pub const STARTED_AT: &str = "started-at";
pub const FINISHED_AT: &str = "finished-at";
pub const BATCH_STATUS: &str = "batch-status";
pub const BATCH_KIND: &str = "batch-kind";
pub const BATCH_INDEX_TASKS: &str = "batch-index-tasks";
pub const BATCH_CANCELED_BY: &str = "batch-canceled_by";
pub const BATCH_ENQUEUED_AT: &str = "batch-enqueued-at";
pub const BATCH_STARTED_AT: &str = "batch-started-at";
pub const BATCH_FINISHED_AT: &str = "batch-finished-at";
}
#[cfg(test)]
@ -300,6 +323,28 @@ pub struct IndexScheduler {
// The main database, it contains all the tasks accessible by their Id.
pub(crate) all_tasks: Database<BEU32, SerdeJson<Task>>,
// Contains all the batches accessible by their Id.
pub(crate) all_batches: Database<BEU32, SerdeJson<Batch>>,
/// All the batches containing a task matching the selected status.
pub(crate) batch_status: Database<SerdeBincode<Status>, RoaringBitmapCodec>,
/// All the batches ids grouped by the kind of their task.
pub(crate) batch_kind: Database<SerdeBincode<Kind>, RoaringBitmapCodec>,
/// Store the batches associated to an index.
pub(crate) batch_index_tasks: Database<Str, RoaringBitmapCodec>,
/// Store the batches containing a task canceled by a task uid
pub(crate) batch_canceled_by: Database<BEU32, RoaringBitmapCodec>,
/// Store the batches containing tasks which were enqueued at a specific date
pub(crate) batch_enqueued_at: Database<BEI128, CboRoaringBitmapCodec>,
/// Store the batches containing finished tasks started at a specific date
pub(crate) batch_started_at: Database<BEI128, CboRoaringBitmapCodec>,
/// Store the batches containing tasks finished at a specific date
pub(crate) batch_finished_at: Database<BEI128, CboRoaringBitmapCodec>,
/// All the tasks ids grouped by their status.
// TODO we should not be able to serialize a `Status::Processing` in this database.
pub(crate) status: Database<SerdeBincode<Status>, RoaringBitmapCodec>,
@ -388,6 +433,9 @@ impl IndexScheduler {
processing_tasks: self.processing_tasks.clone(),
file_store: self.file_store.clone(),
all_tasks: self.all_tasks,
all_batches: self.all_batches,
// Tasks reverse index
status: self.status,
kind: self.kind,
index_tasks: self.index_tasks,
@ -395,6 +443,16 @@ impl IndexScheduler {
enqueued_at: self.enqueued_at,
started_at: self.started_at,
finished_at: self.finished_at,
// Batches reverse index
batch_status: self.batch_status,
batch_kind: self.batch_kind,
batch_index_tasks: self.batch_index_tasks,
batch_canceled_by: self.batch_canceled_by,
batch_enqueued_at: self.batch_enqueued_at,
batch_started_at: self.batch_started_at,
batch_finished_at: self.batch_finished_at,
index_mapper: self.index_mapper.clone(),
wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled,
@ -454,7 +512,7 @@ impl IndexScheduler {
let env = unsafe {
heed::EnvOpenOptions::new()
.max_dbs(11)
.max_dbs(19)
.map_size(budget.task_db_size)
.open(options.tasks_path)
}?;
@ -465,6 +523,7 @@ impl IndexScheduler {
let mut wtxn = env.write_txn()?;
let all_tasks = env.create_database(&mut wtxn, Some(db_name::ALL_TASKS))?;
let all_batches = env.create_database(&mut wtxn, Some(db_name::ALL_BATCHES))?;
let status = env.create_database(&mut wtxn, Some(db_name::STATUS))?;
let kind = env.create_database(&mut wtxn, Some(db_name::KIND))?;
let index_tasks = env.create_database(&mut wtxn, Some(db_name::INDEX_TASKS))?;
@ -472,6 +531,14 @@ impl IndexScheduler {
let enqueued_at = env.create_database(&mut wtxn, Some(db_name::ENQUEUED_AT))?;
let started_at = env.create_database(&mut wtxn, Some(db_name::STARTED_AT))?;
let finished_at = env.create_database(&mut wtxn, Some(db_name::FINISHED_AT))?;
let batch_status = env.create_database(&mut wtxn, Some(db_name::STATUS))?;
let batch_kind = env.create_database(&mut wtxn, Some(db_name::KIND))?;
let batch_index_tasks = env.create_database(&mut wtxn, Some(db_name::INDEX_TASKS))?;
let batch_canceled_by = env.create_database(&mut wtxn, Some(db_name::CANCELED_BY))?;
let batch_enqueued_at = env.create_database(&mut wtxn, Some(db_name::ENQUEUED_AT))?;
let batch_started_at = env.create_database(&mut wtxn, Some(db_name::STARTED_AT))?;
let batch_finished_at = env.create_database(&mut wtxn, Some(db_name::FINISHED_AT))?;
wtxn.commit()?;
// allow unreachable_code to get rids of the warning in the case of a test build.
@ -480,6 +547,8 @@ impl IndexScheduler {
processing_tasks: Arc::new(RwLock::new(ProcessingTasks::new())),
file_store,
all_tasks,
all_batches,
// Task reverse indexes
status,
kind,
index_tasks,
@ -487,6 +556,16 @@ impl IndexScheduler {
enqueued_at,
started_at,
finished_at,
// Batch reverse indexes
batch_status,
batch_kind,
batch_index_tasks,
batch_canceled_by,
batch_enqueued_at,
batch_started_at,
batch_finished_at,
index_mapper: IndexMapper::new(
&env,
options.indexes_path,
@ -946,6 +1025,52 @@ impl IndexScheduler {
Ok((tasks, total_tasks.len()))
}
/// Return the batch ids matching the query along with the total number of batches
/// by ignoring the from and limit parameters from the user's point of view.
///
/// There are two differences between an internal query and a query executed by
/// the user.
///
/// 1. IndexSwap tasks are not publicly associated with any index, but they are associated
/// with many indexes internally.
/// 2. The user may not have the rights to access the tasks (internally) associated with all indexes.
pub fn get_task_ids_from_authorized_indexes(
&self,
rtxn: &RoTxn,
query: &Query,
filters: &meilisearch_auth::AuthFilter,
) -> Result<(RoaringBitmap, u64)> {
// compute all batches matching the filter by ignoring the limits, to find the number of batches matching
// the filter.
// As this causes us to compute the filter twice it is slightly inefficient, but doing it this way spares
// us from modifying the underlying implementation, and the performance remains sufficient.
// Should this change, we would modify `get_batch_ids` to directly return the number of matching batches.
let total_batches = self.get_batch_ids(rtxn, &query.clone().without_limits())?;
let mut batches = self.get_batch_ids(rtxn, query)?;
// If the query contains a list of index uid or there is a finite list of authorized indexes,
// then we must exclude all the kinds that aren't associated to one and only one index.
if query.index_uids.is_some() || !filters.all_indexes_authorized() {
for kind in enum_iterator::all::<Kind>().filter(|kind| !kind.related_to_one_index()) {
batches -= self.get_kind(rtxn, kind)?;
}
}
// Any task that is internally associated with a non-authorized index
// must be discarded.
if !filters.all_indexes_authorized() {
let all_indexes_iter = self.index_tasks.iter(rtxn)?;
for result in all_indexes_iter {
let (index, index_tasks) = result?;
if !filters.is_index_authorized(index) {
tasks -= index_tasks;
}
}
}
Ok((tasks, total_tasks.len()))
}
/// Return the tasks matching the query from the user's point of view along
/// with the total number of tasks matching the query, ignoring from and limit.
///
@ -971,7 +1096,7 @@ impl IndexScheduler {
let tasks =
self.get_existing_tasks(&rtxn, tasks.take(query.limit.unwrap_or(u32::MAX) as usize))?;
let ProcessingTasks { started_at, processing, .. } =
let ProcessingTasks { started_at, batch_id, processing } =
self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone();
let ret = tasks.into_iter();
@ -981,7 +1106,60 @@ impl IndexScheduler {
Ok((
ret.map(|task| {
if processing.contains(task.uid) {
Task { status: Status::Processing, started_at: Some(started_at), ..task }
Task {
status: Status::Processing,
batch_uid: batch_id,
started_at: Some(started_at),
..task
}
} else {
task
}
})
.collect(),
total,
))
}
}
/// Return the batches matching the query from the user's point of view along
/// with the total number of batches matching the query, ignoring from and limit.
///
/// There are two differences between an internal query and a query executed by
/// the user.
///
/// 1. IndexSwap tasks are not publicly associated with any index, but they are associated
/// with many indexes internally.
/// 2. The user may not have the rights to access the tasks (internally) associated with all indexes.
pub fn get_batches_from_authorized_indexes(
&self,
query: Query,
filters: &meilisearch_auth::AuthFilter,
) -> Result<(Vec<Batch>, u64)> {
let rtxn = self.env.read_txn()?;
let (tasks, total) = self.get_batch_ids_from_authorized_indexes(&rtxn, &query, filters)?;
let tasks = self.get_existing_batches(
&rtxn,
tasks.into_iter().rev().take(query.limit.unwrap_or(u32::MAX) as usize),
)?;
let ProcessingTasks { started_at, batch_id, processing } =
self.processing_tasks.read().map_err(|_| Error::CorruptedTaskQueue)?.clone();
let ret = tasks.into_iter();
if processing.is_empty() {
Ok((ret.collect(), total))
} else {
Ok((
ret.map(|task| {
if processing.contains(task.uid) {
Task {
status: Status::Processing,
batch_uid: batch_id,
started_at: Some(started_at),
..task
}
} else {
task
}
@ -1020,6 +1198,8 @@ impl IndexScheduler {
let mut task = Task {
uid: task_id.unwrap_or(next_task_id),
// The batch is defined once we starts processing the task
batch_uid: None,
enqueued_at: OffsetDateTime::now_utc(),
started_at: None,
finished_at: None,
@ -1155,7 +1335,7 @@ impl IndexScheduler {
}
let rtxn = self.env.read_txn().map_err(Error::HeedTransaction)?;
let batch =
let (batch, batch_id) =
match self.create_next_batch(&rtxn).map_err(|e| Error::CreateBatch(Box::new(e)))? {
Some(batch) => batch,
None => return Ok(TickOutcome::WaitForSignal),
@ -1170,7 +1350,7 @@ impl IndexScheduler {
// We reset the must_stop flag to be sure that we don't stop processing tasks
self.must_stop_processing.reset();
self.processing_tasks.write().unwrap().start_processing_at(started_at, ids.clone());
self.processing_tasks.write().unwrap().start_processing(started_at, batch_id, ids.clone());
#[cfg(test)]
self.breakpoint(Breakpoint::BatchCreated);
@ -1268,7 +1448,8 @@ impl IndexScheduler {
let mut task = self
.get_task(&wtxn, id)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?
.ok_or(Error::CorruptedTaskQueue)?;
.ok_or(Error::CorruptedTaskQueue)?
.with_batch_id(batch_id);
task.started_at = Some(started_at);
task.finished_at = Some(finished_at);
task.status = Status::Failed;
@ -1286,6 +1467,12 @@ impl IndexScheduler {
}
}
self.all_batches.put(
&mut wtxn,
&batch_id,
&Batch { uid: batch_id, started_at, finished_at: Some(finished_at) },
)?;
let processed = self.processing_tasks.write().unwrap().stop_processing();
#[cfg(test)]
@ -1600,6 +1787,7 @@ impl<'a> Dump<'a> {
let task = Task {
uid: task.uid,
batch_uid: task.batch_uid,
enqueued_at: task.enqueued_at,
started_at: task.started_at,
finished_at: task.finished_at,

View file

@ -3,6 +3,7 @@
use std::collections::{BTreeSet, HashSet};
use std::ops::Bound;
use meilisearch_types::batches::BatchId;
use meilisearch_types::heed::types::DecodeIgnore;
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
use meilisearch_types::milli::CboRoaringBitmapCodec;
@ -25,10 +26,37 @@ impl IndexScheduler {
Ok(self.last_task_id(rtxn)?.unwrap_or_default())
}
pub(crate) fn next_batch_id(&self, rtxn: &RoTxn) -> Result<BatchId> {
Ok(self
.all_batches
.remap_data_type::<DecodeIgnore>()
.last(rtxn)?
.map(|(k, _)| k + 1)
.unwrap_or_default())
}
pub(crate) fn get_task(&self, rtxn: &RoTxn, task_id: TaskId) -> Result<Option<Task>> {
Ok(self.all_tasks.get(rtxn, &task_id)?)
}
/// Convert an iterator to a `Vec` of tasks. The tasks MUST exist or a
/// `CorruptedTaskQueue` error will be throwed.
pub(crate) fn get_existing_tasks_with_batch_id(
&self,
rtxn: &RoTxn,
batch_id: BatchId,
tasks: impl IntoIterator<Item = TaskId>,
) -> Result<Vec<Task>> {
tasks
.into_iter()
.map(|task_id| {
self.get_task(rtxn, task_id)
.and_then(|task| task.ok_or(Error::CorruptedTaskQueue))
.map(|task| task.with_batch_id(batch_id))
})
.collect::<Result<_>>()
}
/// Convert an iterator to a `Vec` of tasks. The tasks MUST exist or a
/// `CorruptedTaskQueue` error will be throwed.
pub(crate) fn get_existing_tasks(
@ -342,6 +370,8 @@ impl IndexScheduler {
let Task {
uid,
/// We should iterate over the list of batch to ensure this task is effectively in the right batch
batch_uid,
enqueued_at,
started_at,
finished_at,