Add some documentation to the index scheduler

This commit is contained in:
Loïc Lecrenier 2022-10-20 10:25:34 +02:00 committed by Clément Renault
parent 2f54dade04
commit 835745ac22
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
6 changed files with 180 additions and 64 deletions

View file

@ -1,3 +1,23 @@
/*!
This crate defines the index scheduler, which is responsible for:
1. Keeping references to meilisearch's indexes and mapping them to their
user-defined names.
2. Scheduling tasks given by the user and executing them, in batch if possible.
When an `IndexScheduler` is created, a new thread containing a reference to the
scheduler is created. This thread runs the scheduler's run loop, where the
scheduler waits to be woken up to process new tasks. It wakes up when:
1. it is launched for the first time
2. a new task is registered
3. a batch of tasks has been processed
It is only within this thread that the scheduler is allowed to process tasks.
On the other hand, the publicly accessible methods of the scheduler can be
called asynchronously from any thread. These methods can either query the
content of the scheduler or enqueue new tasks.
*/
mod autobatcher;
mod batch;
pub mod error;
@ -36,26 +56,50 @@ use crate::index_mapper::IndexMapper;
type BEI128 = meilisearch_types::heed::zerocopy::I128<meilisearch_types::heed::byteorder::BE>;
/// Defines a subset of tasks to be retrieved from the [`IndexScheduler`].
///
/// An empty/default query (where each field is set to `None`) matches all tasks.
/// Each non-null field restricts the set of tasks further.
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct Query {
/// The maximum number of tasks to be matched
pub limit: Option<u32>,
/// The minimum [task id](`meilisearch_types::tasks::Task::uid`) to be matched
pub from: Option<u32>,
/// The allowed [statuses](`meilisearch_types::tasks::Task::status`) of the matched tasls
pub status: Option<Vec<Status>>,
/// The allowed [kinds](meilisearch_types::tasks::Kind) of the matched tasks.
///
/// The kind of a task is given by:
/// ```
/// # use meilisearch_types::tasks::{Task, Kind};
/// # fn doc_func(task: Task) -> Kind {
/// task.kind.as_kind()
/// # }
/// ```
pub kind: Option<Vec<Kind>>,
/// The allowed [index ids](meilisearch_types::tasks::Task::index_uid) of the matched tasks
pub index_uid: Option<Vec<String>>,
/// The [task ids](`meilisearch_types::tasks::Task::uid`) to be matched
pub uid: Option<Vec<TaskId>>,
/// Exclusive upper bound of the matched tasks' [`enqueued_at`](meilisearch_types::tasks::Task::enqueued_at) field.
pub before_enqueued_at: Option<OffsetDateTime>,
/// Exclusive lower bound of the matched tasks' [`enqueued_at`](meilisearch_types::tasks::Task::enqueued_at) field.
pub after_enqueued_at: Option<OffsetDateTime>,
/// Exclusive upper bound of the matched tasks' [`started_at`](meilisearch_types::tasks::Task::started_at) field.
pub before_started_at: Option<OffsetDateTime>,
/// Exclusive lower bound of the matched tasks' [`started_at`](meilisearch_types::tasks::Task::started_at) field.
pub after_started_at: Option<OffsetDateTime>,
/// Exclusive upper bound of the matched tasks' [`finished_at`](meilisearch_types::tasks::Task::finished_at) field.
pub before_finished_at: Option<OffsetDateTime>,
/// Exclusive lower bound of the matched tasks' [`finished_at`](meilisearch_types::tasks::Task::finished_at) field.
pub after_finished_at: Option<OffsetDateTime>,
}
impl Query {
/// Return `true` iff every field of the query is set to `None`, such that the query
/// would match all tasks.
/// matches all tasks.
pub fn is_empty(&self) -> bool {
matches!(
self,
@ -75,24 +119,8 @@ impl Query {
}
)
}
pub fn with_status(self, status: Status) -> Self {
let mut status_vec = self.status.unwrap_or_default();
status_vec.push(status);
Self {
status: Some(status_vec),
..self
}
}
pub fn with_kind(self, kind: Kind) -> Self {
let mut kind_vec = self.kind.unwrap_or_default();
kind_vec.push(kind);
Self {
kind: Some(kind_vec),
..self
}
}
/// Add an [index id](meilisearch_types::tasks::Task::index_uid) to the list of permitted indexes.
pub fn with_index(self, index_uid: String) -> Self {
let mut index_vec = self.index_uid.unwrap_or_default();
index_vec.push(index_uid);
@ -101,22 +129,6 @@ impl Query {
..self
}
}
pub fn with_uid(self, uid: TaskId) -> Self {
let mut task_vec = self.uid.unwrap_or_default();
task_vec.push(uid);
Self {
uid: Some(task_vec),
..self
}
}
pub fn with_limit(self, limit: u32) -> Self {
Self {
limit: Some(limit),
..self
}
}
}
#[derive(Debug, Clone)]
@ -182,16 +194,19 @@ mod db_name {
pub const FINISHED_AT: &str = "finished-at";
}
/// This module is responsible for two things;
/// 1. Resolve the name of the indexes.
/// 2. Schedule the tasks.
/// Structure which holds meilisearch's indexes and schedules the tasks
/// to be performed on them.
pub struct IndexScheduler {
/// The LMDB environment which the DBs are associated with.
pub(crate) env: Env,
/// A boolean that can be set to true to stop the currently processing tasks.
pub(crate) must_stop_processing: MustStopProcessing,
/// The list of tasks currently processing
pub(crate) processing_tasks: Arc<RwLock<ProcessingTasks>>,
/// The list of files referenced by the tasks
pub(crate) file_store: FileStore,
// The main database, it contains all the tasks accessible by their Id.
@ -248,6 +263,17 @@ pub enum Breakpoint {
}
impl IndexScheduler {
/// Create an index scheduler and start its run loop.
///
/// ## Arguments
/// - `tasks_path`: the path to the folder containing the task databases
/// - `update_file_path`: the path to the file store containing the files associated to the tasks
/// - `indexes_path`: the path to the folder containing meilisearch's indexes
/// - `dumps_path`: the path to the folder containing the dumps
/// - `index_size`: the maximum size, in bytes, of each meilisearch index
/// - `indexer_config`: configuration used during indexing for each meilisearch index
/// - `autobatching_enabled`: `true` iff the index scheduler is allowed to automatically batch tasks
/// together, to process multiple tasks at once.
pub fn new(
tasks_path: PathBuf,
update_file_path: PathBuf,
@ -296,7 +322,10 @@ impl IndexScheduler {
Ok(this)
}
/// This function will execute in a different thread and must be called only once.
/// Start the run loop for the given index scheduler.
///
/// This function will execute in a different thread and must be called
/// only once per index scheduler.
fn run(&self) {
let run = Self {
must_stop_processing: MustStopProcessing::default(),
@ -334,9 +363,10 @@ impl IndexScheduler {
&self.index_mapper.indexer_config
}
/// Return the index corresponding to the name. If it wasn't opened before
/// it'll be opened. But if it doesn't exist on disk it'll throw an
/// `IndexNotFound` error.
/// Return the index corresponding to the name.
///
/// * If the index wasn't opened before, the index will be opened.
/// * If the index doesn't exist on disk, the `IndexNotFoundError` is thrown.
pub fn index(&self, name: &str) -> Result<Index> {
let rtxn = self.env.read_txn()?;
self.index_mapper.index(&rtxn, name)
@ -348,7 +378,7 @@ impl IndexScheduler {
self.index_mapper.indexes(&rtxn)
}
/// Return the task ids corresponding to the query
/// Return the task ids matched by the given query.
pub fn get_task_ids(&self, query: &Query) -> Result<RoaringBitmap> {
let rtxn = self.env.read_txn()?;
@ -410,7 +440,7 @@ impl IndexScheduler {
Ok(tasks)
}
/// Returns the tasks corresponding to the query.
/// Returns the tasks matched by the given query.
pub fn get_tasks(&self, query: Query) -> Result<Vec<Task>> {
let tasks = self.get_task_ids(&query)?;
let rtxn = self.env.read_txn()?;
@ -450,8 +480,9 @@ impl IndexScheduler {
}
}
/// Register a new task in the scheduler. If it fails and data was associated with the task
/// it tries to delete the file.
/// Register a new task in the scheduler.
///
/// If it fails and data was associated with the task, it tries to delete the associated data.
pub fn register(&self, kind: KindWithContent) -> Result<Task> {
let mut wtxn = self.env.write_txn()?;
@ -645,6 +676,11 @@ impl IndexScheduler {
Ok(index)
}
/// Create a file and register it in the index scheduler.
///
/// The returned file and uuid can be used to associate
/// some data to a task. The file will be kept until
/// the task has been fully processed.
pub fn create_update_file(&self) -> Result<(Uuid, file_store::File)> {
Ok(self.file_store.new_update()?)
}
@ -654,11 +690,23 @@ impl IndexScheduler {
Ok(self.file_store.new_update_with_uuid(uuid)?)
}
/// Delete a file from the index scheduler.
///
/// Counterpart to the [`create_update_file`](IndexScheduler::create_update_file) method.
pub fn delete_update_file(&self, uuid: Uuid) -> Result<()> {
Ok(self.file_store.delete(uuid)?)
}
/// Create and execute and store the result of one batch of registered tasks.
/// Perform one iteration of the run loop.
///
/// 1. Find the next batch of tasks to be processed.
/// 2. Update the information of these tasks following the start of their processing.
/// 3. Update the in-memory list of processed tasks accordingly.
/// 4. Process the batch:
/// - perform the actions of each batched task
/// - update the information of each batched task following the end
/// of their processing.
/// 5. Reset the in-memory list of processed tasks.
///
/// Returns the number of processed tasks.
fn tick(&self) -> Result<usize> {