inital implementation of the progress

This commit is contained in:
Tamo 2024-12-10 16:30:48 +01:00
parent 8c19cb0a0b
commit df9b68f8ed
No known key found for this signature in database
GPG key ID: 20CD8020AFA88D69
29 changed files with 585 additions and 414 deletions

View file

@ -26,6 +26,7 @@ mod index_mapper;
#[cfg(test)]
mod insta_snapshot;
mod lru;
mod processing;
mod utils;
pub mod uuid_codec;
@ -56,12 +57,12 @@ use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str, I128};
use meilisearch_types::heed::{self, Database, Env, PutFlags, RoTxn, RwTxn};
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::milli::index::IndexEmbeddingConfig;
use meilisearch_types::milli::update::new::indexer::document_changes::Progress;
use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs};
use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32};
use meilisearch_types::task_view::TaskView;
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task, TaskProgress};
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use processing::ProcessingTasks;
use rayon::current_num_threads;
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use roaring::RoaringBitmap;
@ -72,7 +73,8 @@ use utils::{filter_out_references_to_newer_tasks, keep_ids_within_datetimes, map
use uuid::Uuid;
use crate::index_mapper::IndexMapper;
use crate::utils::{check_index_swap_validity, clamp_to_page_size, ProcessingBatch};
use crate::processing::{AtomicTaskStep, BatchProgress};
use crate::utils::{check_index_swap_validity, clamp_to_page_size};
pub(crate) type BEI128 = I128<BE>;
@ -163,48 +165,6 @@ impl Query {
}
}
#[derive(Debug, Clone)]
pub struct ProcessingTasks {
batch: Option<ProcessingBatch>,
/// The list of tasks ids that are currently running.
processing: RoaringBitmap,
/// The progress on processing tasks
progress: Option<TaskProgress>,
}
impl ProcessingTasks {
/// Creates an empty `ProcessingAt` struct.
fn new() -> ProcessingTasks {
ProcessingTasks { batch: None, processing: RoaringBitmap::new(), progress: None }
}
/// Stores the currently processing tasks, and the date time at which it started.
fn start_processing(&mut self, processing_batch: ProcessingBatch, processing: RoaringBitmap) {
self.batch = Some(processing_batch);
self.processing = processing;
}
fn update_progress(&mut self, progress: Progress) -> TaskProgress {
self.progress.get_or_insert_with(TaskProgress::default).update(progress)
}
/// Set the processing tasks to an empty list
fn stop_processing(&mut self) -> Self {
self.progress = None;
Self {
batch: std::mem::take(&mut self.batch),
processing: std::mem::take(&mut self.processing),
progress: None,
}
}
/// Returns `true` if there, at least, is one task that is currently processing that we must stop.
fn must_cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) -> bool {
!self.processing.is_disjoint(canceled_tasks)
}
}
#[derive(Default, Clone, Debug)]
struct MustStopProcessing(Arc<AtomicBool>);
@ -813,7 +773,7 @@ impl IndexScheduler {
let mut batch_tasks = RoaringBitmap::new();
for batch_uid in batch_uids {
if processing_batch.as_ref().map_or(false, |batch| batch.uid == *batch_uid) {
batch_tasks |= &processing_tasks;
batch_tasks |= &*processing_tasks;
} else {
batch_tasks |= self.tasks_in_batch(rtxn, *batch_uid)?;
}
@ -827,13 +787,13 @@ impl IndexScheduler {
match status {
// special case for Processing tasks
Status::Processing => {
status_tasks |= &processing_tasks;
status_tasks |= &*processing_tasks;
}
status => status_tasks |= &self.get_status(rtxn, *status)?,
};
}
if !status.contains(&Status::Processing) {
tasks -= &processing_tasks;
tasks -= &*processing_tasks;
}
tasks &= status_tasks;
}
@ -882,7 +842,7 @@ impl IndexScheduler {
// Once we have filtered the two subsets, we put them back together and assign it back to `tasks`.
tasks = {
let (mut filtered_non_processing_tasks, mut filtered_processing_tasks) =
(&tasks - &processing_tasks, &tasks & &processing_tasks);
(&tasks - &*processing_tasks, &tasks & &*processing_tasks);
// special case for Processing tasks
// A closure that clears the filtered_processing_tasks if their started_at date falls outside the given bounds
@ -1090,7 +1050,7 @@ impl IndexScheduler {
// Once we have filtered the two subsets, we put them back together and assign it back to `batches`.
batches = {
let (mut filtered_non_processing_batches, mut filtered_processing_batches) =
(&batches - &processing.processing, &batches & &processing.processing);
(&batches - &*processing.processing, &batches & &*processing.processing);
// special case for Processing batches
// A closure that clears the filtered_processing_batches if their started_at date falls outside the given bounds
@ -1606,7 +1566,8 @@ impl IndexScheduler {
// We reset the must_stop flag to be sure that we don't stop processing tasks
self.must_stop_processing.reset();
self.processing_tasks
let progress = self
.processing_tasks
.write()
.unwrap()
// We can clone the processing batch here because we don't want its modification to affect the view of the processing batches
@ -1619,11 +1580,12 @@ impl IndexScheduler {
let res = {
let cloned_index_scheduler = self.private_clone();
let processing_batch = &mut processing_batch;
let progress = progress.clone();
std::thread::scope(|s| {
let handle = std::thread::Builder::new()
.name(String::from("batch-operation"))
.spawn_scoped(s, move || {
cloned_index_scheduler.process_batch(batch, processing_batch)
cloned_index_scheduler.process_batch(batch, processing_batch, progress)
})
.unwrap();
handle.join().unwrap_or(Err(Error::ProcessBatchPanicked))
@ -1636,6 +1598,7 @@ impl IndexScheduler {
#[cfg(test)]
self.maybe_fail(tests::FailureLocation::AcquiringWtxn)?;
progress.update_progress(BatchProgress::WritingTasksToDisk);
processing_batch.finished();
let mut wtxn = self.env.write_txn().map_err(Error::HeedTransaction)?;
let mut canceled = RoaringBitmap::new();
@ -1645,12 +1608,15 @@ impl IndexScheduler {
#[cfg(test)]
self.breakpoint(Breakpoint::ProcessBatchSucceeded);
let (task_progress, task_progress_obj) = AtomicTaskStep::new(tasks.len() as u32);
progress.update_progress(task_progress_obj);
let mut success = 0;
let mut failure = 0;
let mut canceled_by = None;
#[allow(unused_variables)]
for (i, mut task) in tasks.into_iter().enumerate() {
task_progress.fetch_add(1, Ordering::Relaxed);
processing_batch.update(&mut task);
if task.status == Status::Canceled {
canceled.insert(task.uid);
@ -1718,8 +1684,12 @@ impl IndexScheduler {
Err(err) => {
#[cfg(test)]
self.breakpoint(Breakpoint::ProcessBatchFailed);
let (task_progress, task_progress_obj) = AtomicTaskStep::new(ids.len() as u32);
progress.update_progress(task_progress_obj);
let error: ResponseError = err.into();
for id in ids.iter() {
task_progress.fetch_add(1, Ordering::Relaxed);
let mut task = self
.get_task(&wtxn, id)
.map_err(|e| Error::TaskDatabaseUpdate(Box::new(e)))?