Add some documentation to the index scheduler

This commit is contained in:
Loïc Lecrenier 2022-10-20 10:25:34 +02:00 committed by Clément Renault
parent 66c3b93ef1
commit 169f386418
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
6 changed files with 180 additions and 64 deletions

View File

@ -1,3 +1,10 @@
/*!
The autobatcher is responsible for combining the next enqueued
tasks affecting a single index into a [batch](crate::batch::Batch).
The main function of the autobatcher is [`next_autobatch`].
*/
use meilisearch_types::milli::update::IndexDocumentsMethod::{ use meilisearch_types::milli::update::IndexDocumentsMethod::{
self, ReplaceDocuments, UpdateDocuments, self, ReplaceDocuments, UpdateDocuments,
}; };
@ -6,8 +13,10 @@ use std::ops::ControlFlow::{self, Break, Continue};
use crate::KindWithContent; use crate::KindWithContent;
/// This enum contain the minimal necessary informations /// Succinctly describes a task's [`Kind`](meilisearch_types::tasks::Kind)
/// to make the autobatcher works. /// for the purpose of simplifying the implementation of the autobatcher.
///
/// Only the non-prioritised tasks that can be grouped in a batch have a corresponding [`AutobatchKind`]
enum AutobatchKind { enum AutobatchKind {
DocumentImport { DocumentImport {
method: IndexDocumentsMethod, method: IndexDocumentsMethod,
@ -387,6 +396,16 @@ impl BatchKind {
} }
} }
/// Create a batch from an ordered list of tasks.
///
/// ## Preconditions
/// 1. The tasks must be enqueued and given in the order in which they were enqueued
/// 2. The tasks must not be prioritised tasks (e.g. task cancellation, dump, snapshot, task deletion)
/// 3. The tasks must all be related to the same index
///
/// ## Return
/// `None` if the list of tasks is empty. Otherwise, an [`AutoBatch`] that represents
/// a subset of the given tasks.
pub fn autobatch(enqueued: Vec<(TaskId, KindWithContent)>) -> Option<BatchKind> { pub fn autobatch(enqueued: Vec<(TaskId, KindWithContent)>) -> Option<BatchKind> {
let mut enqueued = enqueued.into_iter(); let mut enqueued = enqueued.into_iter();
let (id, kind) = enqueued.next()?; let (id, kind) = enqueued.next()?;

View File

@ -1,3 +1,22 @@
/*!
This module handles the creation and processing of batch operations.
A batch is a combination of multiple tasks that can be processed at once.
Executing a batch operation should always be functionally equivalent to
executing each of its tasks' operations individually and in order.
For example, if the user sends two tasks:
1. import documents X
2. import documents Y
We can combine the two tasks in a single batch:
1. import documents X and Y
Processing this batch is functionally equivalent to processing the two
tasks individally, but should be much faster since we are only performing
one indexing operation.
*/
use std::collections::HashSet; use std::collections::HashSet;
use std::fs::File; use std::fs::File;
use std::io::BufWriter; use std::io::BufWriter;
@ -26,6 +45,11 @@ use roaring::RoaringBitmap;
use time::OffsetDateTime; use time::OffsetDateTime;
use uuid::Uuid; use uuid::Uuid;
/// Represents a combination of tasks that can all be processed at the same time.
///
/// A batch contains the set of tasks that it represents (accessible through
/// [`self.ids()`](Batch::ids)), as well as additional information on how to
/// be processed.
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum Batch { pub(crate) enum Batch {
TaskCancelation(Task), TaskCancelation(Task),
@ -49,6 +73,7 @@ pub(crate) enum Batch {
}, },
} }
/// A [batch](Batch) that combines multiple tasks operating on an index.
#[derive(Debug)] #[derive(Debug)]
pub(crate) enum IndexOperation { pub(crate) enum IndexOperation {
DocumentImport { DocumentImport {
@ -102,6 +127,7 @@ pub(crate) enum IndexOperation {
} }
impl Batch { impl Batch {
/// Return the task ids associated with this batch.
pub fn ids(&self) -> Vec<TaskId> { pub fn ids(&self) -> Vec<TaskId> {
match self { match self {
Batch::TaskCancelation(task) Batch::TaskCancelation(task)
@ -135,6 +161,12 @@ impl Batch {
} }
impl IndexScheduler { impl IndexScheduler {
/// Convert an [`BatchKind`](crate::autobatcher::BatchKind) into a [`Batch`].
///
/// ## Arguments
/// - `rtxn`: read transaction
/// - `index_uid`: name of the index affected by the operations of the autobatch
/// - `batch`: the result of the autobatcher
pub(crate) fn create_next_batch_index( pub(crate) fn create_next_batch_index(
&self, &self,
rtxn: &RoTxn, rtxn: &RoTxn,
@ -456,6 +488,12 @@ impl IndexScheduler {
Ok(None) Ok(None)
} }
/// Apply the operation associated with the given batch.
///
/// ## Return
/// The list of tasks that were processed. The metadata of each task in the returned
/// list is updated accordingly, with the exception of the its date fields
/// [`finished_at`](meilisearch_types::tasks::Task::finished_at) and [`started_at`](meilisearch_types::tasks::Task::started_at).
pub(crate) fn process_batch(&self, batch: Batch) -> Result<Vec<Task>> { pub(crate) fn process_batch(&self, batch: Batch) -> Result<Vec<Task>> {
match batch { match batch {
Batch::TaskCancelation(mut task) => { Batch::TaskCancelation(mut task) => {
@ -741,6 +779,10 @@ impl IndexScheduler {
} }
} }
/// Process the index operation on the given index.
///
/// ## Return
/// The list of processed tasks.
fn apply_index_operation<'txn, 'i>( fn apply_index_operation<'txn, 'i>(
&self, &self,
index_wtxn: &'txn mut RwTxn<'i, '_>, index_wtxn: &'txn mut RwTxn<'i, '_>,

View File

@ -16,23 +16,29 @@ use crate::{Error, Result};
const INDEX_MAPPING: &str = "index-mapping"; const INDEX_MAPPING: &str = "index-mapping";
/// Structure managing meilisearch's indexes.
///
/// It is responsible for:
/// 1. Creating new indexes
/// 2. Opening indexes and storing references to these opened indexes
/// 3. Accessing indexes through their uuid
/// 4. Mapping a user-defined name to each index uuid.
#[derive(Clone)] #[derive(Clone)]
pub struct IndexMapper { pub struct IndexMapper {
// Keep track of the opened indexes and is used /// Keep track of the opened indexes. Used mainly by the index resolver.
// mainly by the index resolver.
index_map: Arc<RwLock<HashMap<Uuid, IndexStatus>>>, index_map: Arc<RwLock<HashMap<Uuid, IndexStatus>>>,
// TODO create a UUID Codec that uses the 16 bytes representation // TODO create a UUID Codec that uses the 16 bytes representation
// Map an index name with an index uuid currently available on disk. /// Map an index name with an index uuid currently available on disk.
index_mapping: Database<Str, SerdeBincode<Uuid>>, index_mapping: Database<Str, SerdeBincode<Uuid>>,
/// Path to the folder where the LMDB environments of each index are.
base_path: PathBuf, base_path: PathBuf,
index_size: usize, index_size: usize,
pub indexer_config: Arc<IndexerConfig>, pub indexer_config: Arc<IndexerConfig>,
} }
/// Weither the index must not be inserted back /// Whether the index is available for use or is forbidden to be inserted back in the index map
/// or it is available for use.
#[derive(Clone)] #[derive(Clone)]
pub enum IndexStatus { pub enum IndexStatus {
/// Do not insert it back in the index map as it is currently being deleted. /// Do not insert it back in the index map as it is currently being deleted.
@ -167,6 +173,7 @@ impl IndexMapper {
Ok(index) Ok(index)
} }
/// Return all indexes, may open them if they weren't already opened.
pub fn indexes(&self, rtxn: &RoTxn) -> Result<Vec<(String, Index)>> { pub fn indexes(&self, rtxn: &RoTxn) -> Result<Vec<(String, Index)>> {
self.index_mapping self.index_mapping
.iter(rtxn)? .iter(rtxn)?

View File

@ -1,3 +1,23 @@
/*!
This crate defines the index scheduler, which is responsible for:
1. Keeping references to meilisearch's indexes and mapping them to their
user-defined names.
2. Scheduling tasks given by the user and executing them, in batch if possible.
When an `IndexScheduler` is created, a new thread containing a reference to the
scheduler is created. This thread runs the scheduler's run loop, where the
scheduler waits to be woken up to process new tasks. It wakes up when:
1. it is launched for the first time
2. a new task is registered
3. a batch of tasks has been processed
It is only within this thread that the scheduler is allowed to process tasks.
On the other hand, the publicly accessible methods of the scheduler can be
called asynchronously from any thread. These methods can either query the
content of the scheduler or enqueue new tasks.
*/
mod autobatcher; mod autobatcher;
mod batch; mod batch;
pub mod error; pub mod error;
@ -36,26 +56,50 @@ use crate::index_mapper::IndexMapper;
type BEI128 = meilisearch_types::heed::zerocopy::I128<meilisearch_types::heed::byteorder::BE>; type BEI128 = meilisearch_types::heed::zerocopy::I128<meilisearch_types::heed::byteorder::BE>;
/// Defines a subset of tasks to be retrieved from the [`IndexScheduler`].
///
/// An empty/default query (where each field is set to `None`) matches all tasks.
/// Each non-null field restricts the set of tasks further.
#[derive(Default, Debug, Clone, PartialEq, Eq)] #[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct Query { pub struct Query {
/// The maximum number of tasks to be matched
pub limit: Option<u32>, pub limit: Option<u32>,
/// The minimum [task id](`meilisearch_types::tasks::Task::uid`) to be matched
pub from: Option<u32>, pub from: Option<u32>,
/// The allowed [statuses](`meilisearch_types::tasks::Task::status`) of the matched tasls
pub status: Option<Vec<Status>>, pub status: Option<Vec<Status>>,
/// The allowed [kinds](meilisearch_types::tasks::Kind) of the matched tasks.
///
/// The kind of a task is given by:
/// ```
/// # use meilisearch_types::tasks::{Task, Kind};
/// # fn doc_func(task: Task) -> Kind {
/// task.kind.as_kind()
/// # }
/// ```
pub kind: Option<Vec<Kind>>, pub kind: Option<Vec<Kind>>,
/// The allowed [index ids](meilisearch_types::tasks::Task::index_uid) of the matched tasks
pub index_uid: Option<Vec<String>>, pub index_uid: Option<Vec<String>>,
/// The [task ids](`meilisearch_types::tasks::Task::uid`) to be matched
pub uid: Option<Vec<TaskId>>, pub uid: Option<Vec<TaskId>>,
/// Exclusive upper bound of the matched tasks' [`enqueued_at`](meilisearch_types::tasks::Task::enqueued_at) field.
pub before_enqueued_at: Option<OffsetDateTime>, pub before_enqueued_at: Option<OffsetDateTime>,
/// Exclusive lower bound of the matched tasks' [`enqueued_at`](meilisearch_types::tasks::Task::enqueued_at) field.
pub after_enqueued_at: Option<OffsetDateTime>, pub after_enqueued_at: Option<OffsetDateTime>,
/// Exclusive upper bound of the matched tasks' [`started_at`](meilisearch_types::tasks::Task::started_at) field.
pub before_started_at: Option<OffsetDateTime>, pub before_started_at: Option<OffsetDateTime>,
/// Exclusive lower bound of the matched tasks' [`started_at`](meilisearch_types::tasks::Task::started_at) field.
pub after_started_at: Option<OffsetDateTime>, pub after_started_at: Option<OffsetDateTime>,
/// Exclusive upper bound of the matched tasks' [`finished_at`](meilisearch_types::tasks::Task::finished_at) field.
pub before_finished_at: Option<OffsetDateTime>, pub before_finished_at: Option<OffsetDateTime>,
/// Exclusive lower bound of the matched tasks' [`finished_at`](meilisearch_types::tasks::Task::finished_at) field.
pub after_finished_at: Option<OffsetDateTime>, pub after_finished_at: Option<OffsetDateTime>,
} }
impl Query { impl Query {
/// Return `true` iff every field of the query is set to `None`, such that the query /// Return `true` iff every field of the query is set to `None`, such that the query
/// would match all tasks. /// matches all tasks.
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
matches!( matches!(
self, self,
@ -75,24 +119,8 @@ impl Query {
} }
) )
} }
pub fn with_status(self, status: Status) -> Self {
let mut status_vec = self.status.unwrap_or_default();
status_vec.push(status);
Self {
status: Some(status_vec),
..self
}
}
pub fn with_kind(self, kind: Kind) -> Self {
let mut kind_vec = self.kind.unwrap_or_default();
kind_vec.push(kind);
Self {
kind: Some(kind_vec),
..self
}
}
/// Add an [index id](meilisearch_types::tasks::Task::index_uid) to the list of permitted indexes.
pub fn with_index(self, index_uid: String) -> Self { pub fn with_index(self, index_uid: String) -> Self {
let mut index_vec = self.index_uid.unwrap_or_default(); let mut index_vec = self.index_uid.unwrap_or_default();
index_vec.push(index_uid); index_vec.push(index_uid);
@ -101,22 +129,6 @@ impl Query {
..self ..self
} }
} }
pub fn with_uid(self, uid: TaskId) -> Self {
let mut task_vec = self.uid.unwrap_or_default();
task_vec.push(uid);
Self {
uid: Some(task_vec),
..self
}
}
pub fn with_limit(self, limit: u32) -> Self {
Self {
limit: Some(limit),
..self
}
}
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -182,16 +194,19 @@ mod db_name {
pub const FINISHED_AT: &str = "finished-at"; pub const FINISHED_AT: &str = "finished-at";
} }
/// This module is responsible for two things; /// Structure which holds meilisearch's indexes and schedules the tasks
/// 1. Resolve the name of the indexes. /// to be performed on them.
/// 2. Schedule the tasks.
pub struct IndexScheduler { pub struct IndexScheduler {
/// The LMDB environment which the DBs are associated with. /// The LMDB environment which the DBs are associated with.
pub(crate) env: Env, pub(crate) env: Env,
/// A boolean that can be set to true to stop the currently processing tasks. /// A boolean that can be set to true to stop the currently processing tasks.
pub(crate) must_stop_processing: MustStopProcessing, pub(crate) must_stop_processing: MustStopProcessing,
/// The list of tasks currently processing
pub(crate) processing_tasks: Arc<RwLock<ProcessingTasks>>, pub(crate) processing_tasks: Arc<RwLock<ProcessingTasks>>,
/// The list of files referenced by the tasks
pub(crate) file_store: FileStore, pub(crate) file_store: FileStore,
// The main database, it contains all the tasks accessible by their Id. // The main database, it contains all the tasks accessible by their Id.
@ -248,6 +263,17 @@ pub enum Breakpoint {
} }
impl IndexScheduler { impl IndexScheduler {
/// Create an index scheduler and start its run loop.
///
/// ## Arguments
/// - `tasks_path`: the path to the folder containing the task databases
/// - `update_file_path`: the path to the file store containing the files associated to the tasks
/// - `indexes_path`: the path to the folder containing meilisearch's indexes
/// - `dumps_path`: the path to the folder containing the dumps
/// - `index_size`: the maximum size, in bytes, of each meilisearch index
/// - `indexer_config`: configuration used during indexing for each meilisearch index
/// - `autobatching_enabled`: `true` iff the index scheduler is allowed to automatically batch tasks
/// together, to process multiple tasks at once.
pub fn new( pub fn new(
tasks_path: PathBuf, tasks_path: PathBuf,
update_file_path: PathBuf, update_file_path: PathBuf,
@ -296,7 +322,10 @@ impl IndexScheduler {
Ok(this) Ok(this)
} }
/// This function will execute in a different thread and must be called only once. /// Start the run loop for the given index scheduler.
///
/// This function will execute in a different thread and must be called
/// only once per index scheduler.
fn run(&self) { fn run(&self) {
let run = Self { let run = Self {
must_stop_processing: MustStopProcessing::default(), must_stop_processing: MustStopProcessing::default(),
@ -334,9 +363,10 @@ impl IndexScheduler {
&self.index_mapper.indexer_config &self.index_mapper.indexer_config
} }
/// Return the index corresponding to the name. If it wasn't opened before /// Return the index corresponding to the name.
/// it'll be opened. But if it doesn't exist on disk it'll throw an ///
/// `IndexNotFound` error. /// * If the index wasn't opened before, the index will be opened.
/// * If the index doesn't exist on disk, the `IndexNotFoundError` is thrown.
pub fn index(&self, name: &str) -> Result<Index> { pub fn index(&self, name: &str) -> Result<Index> {
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
self.index_mapper.index(&rtxn, name) self.index_mapper.index(&rtxn, name)
@ -348,7 +378,7 @@ impl IndexScheduler {
self.index_mapper.indexes(&rtxn) self.index_mapper.indexes(&rtxn)
} }
/// Return the task ids corresponding to the query /// Return the task ids matched by the given query.
pub fn get_task_ids(&self, query: &Query) -> Result<RoaringBitmap> { pub fn get_task_ids(&self, query: &Query) -> Result<RoaringBitmap> {
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
@ -410,7 +440,7 @@ impl IndexScheduler {
Ok(tasks) Ok(tasks)
} }
/// Returns the tasks corresponding to the query. /// Returns the tasks matched by the given query.
pub fn get_tasks(&self, query: Query) -> Result<Vec<Task>> { pub fn get_tasks(&self, query: Query) -> Result<Vec<Task>> {
let tasks = self.get_task_ids(&query)?; let tasks = self.get_task_ids(&query)?;
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
@ -450,8 +480,9 @@ impl IndexScheduler {
} }
} }
/// Register a new task in the scheduler. If it fails and data was associated with the task /// Register a new task in the scheduler.
/// it tries to delete the file. ///
/// If it fails and data was associated with the task, it tries to delete the associated data.
pub fn register(&self, kind: KindWithContent) -> Result<Task> { pub fn register(&self, kind: KindWithContent) -> Result<Task> {
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
@ -645,6 +676,11 @@ impl IndexScheduler {
Ok(index) Ok(index)
} }
/// Create a file and register it in the index scheduler.
///
/// The returned file and uuid can be used to associate
/// some data to a task. The file will be kept until
/// the task has been fully processed.
pub fn create_update_file(&self) -> Result<(Uuid, file_store::File)> { pub fn create_update_file(&self) -> Result<(Uuid, file_store::File)> {
Ok(self.file_store.new_update()?) Ok(self.file_store.new_update()?)
} }
@ -654,11 +690,23 @@ impl IndexScheduler {
Ok(self.file_store.new_update_with_uuid(uuid)?) Ok(self.file_store.new_update_with_uuid(uuid)?)
} }
/// Delete a file from the index scheduler.
///
/// Counterpart to the [`create_update_file`](IndexScheduler::create_update_file) method.
pub fn delete_update_file(&self, uuid: Uuid) -> Result<()> { pub fn delete_update_file(&self, uuid: Uuid) -> Result<()> {
Ok(self.file_store.delete(uuid)?) Ok(self.file_store.delete(uuid)?)
} }
/// Create and execute and store the result of one batch of registered tasks. /// Perform one iteration of the run loop.
///
/// 1. Find the next batch of tasks to be processed.
/// 2. Update the information of these tasks following the start of their processing.
/// 3. Update the in-memory list of processed tasks accordingly.
/// 4. Process the batch:
/// - perform the actions of each batched task
/// - update the information of each batched task following the end
/// of their processing.
/// 5. Reset the in-memory list of processed tasks.
/// ///
/// Returns the number of processed tasks. /// Returns the number of processed tasks.
fn tick(&self) -> Result<usize> { fn tick(&self) -> Result<usize> {

View File

@ -215,12 +215,12 @@ impl IndexStats {
index_uid: String, index_uid: String,
) -> Result<Self, ResponseError> { ) -> Result<Self, ResponseError> {
// we check if there is currently a task processing associated with this index. // we check if there is currently a task processing associated with this index.
let processing_task = index_scheduler.get_tasks( let processing_task = index_scheduler.get_tasks(Query {
Query::default() status: Some(vec![Status::Processing]),
.with_status(Status::Processing) index_uid: Some(vec![index_uid.clone()]),
.with_index(index_uid.clone()) limit: Some(1),
.with_limit(1), ..Query::default()
)?; })?;
let is_processing = !processing_task.is_empty(); let is_processing = !processing_task.is_empty();
let index = index_scheduler.index(&index_uid)?; let index = index_scheduler.index(&index_uid)?;

View File

@ -293,11 +293,11 @@ pub fn create_all_stats(
let mut last_task: Option<OffsetDateTime> = None; let mut last_task: Option<OffsetDateTime> = None;
let mut indexes = BTreeMap::new(); let mut indexes = BTreeMap::new();
let mut database_size = 0; let mut database_size = 0;
let processing_task = index_scheduler.get_tasks( let processing_task = index_scheduler.get_tasks(Query {
Query::default() status: Some(vec![Status::Processing]),
.with_status(Status::Processing) limit: Some(1),
.with_limit(1), ..Query::default()
)?; })?;
let processing_index = processing_task let processing_index = processing_task
.first() .first()
.and_then(|task| task.index_uid().clone()); .and_then(|task| task.index_uid().clone());