MeiliSearch/index-scheduler/src/lib.rs

1343 lines
47 KiB
Rust
Raw Normal View History

/*!
This crate defines the index scheduler, which is responsible for:
1. Keeping references to meilisearch's indexes and mapping them to their
user-defined names.
2. Scheduling tasks given by the user and executing them, in batch if possible.
When an `IndexScheduler` is created, a new thread containing a reference to the
scheduler is created. This thread runs the scheduler's run loop, where the
scheduler waits to be woken up to process new tasks. It wakes up when:
1. it is launched for the first time
2. a new task is registered
3. a batch of tasks has been processed
It is only within this thread that the scheduler is allowed to process tasks.
On the other hand, the publicly accessible methods of the scheduler can be
called asynchronously from any thread. These methods can either query the
content of the scheduler or enqueue new tasks.
*/
mod autobatcher;
mod batch;
pub mod error;
mod index_mapper;
#[cfg(test)]
mod snapshot;
mod utils;
2022-09-06 23:49:19 +02:00
pub type Result<T> = std::result::Result<T, Error>;
pub type TaskId = u32;
2022-10-16 01:39:01 +02:00
use dump::{KindDump, TaskDump, UpdateFile};
pub use error::Error;
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use utils::keep_tasks_within_datetimes;
2022-09-15 12:23:41 +02:00
use std::path::PathBuf;
2022-10-17 13:54:35 +02:00
use std::sync::atomic::{AtomicBool, Ordering::Relaxed};
use std::sync::{Arc, RwLock};
2022-10-16 01:39:01 +02:00
use file_store::FileStore;
use meilisearch_types::error::ResponseError;
use meilisearch_types::milli;
use roaring::RoaringBitmap;
use synchronoise::SignalEvent;
use time::OffsetDateTime;
use uuid::Uuid;
use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{self, Database, Env};
use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::{CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32};
use crate::index_mapper::IndexMapper;
type BEI128 = meilisearch_types::heed::zerocopy::I128<meilisearch_types::heed::byteorder::BE>;
/// Defines a subset of tasks to be retrieved from the [`IndexScheduler`].
///
/// An empty/default query (where each field is set to `None`) matches all tasks.
/// Each non-null field restricts the set of tasks further.
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct Query {
/// The maximum number of tasks to be matched
pub limit: Option<u32>,
/// The minimum [task id](`meilisearch_types::tasks::Task::uid`) to be matched
pub from: Option<u32>,
/// The allowed [statuses](`meilisearch_types::tasks::Task::status`) of the matched tasls
pub status: Option<Vec<Status>>,
/// The allowed [kinds](meilisearch_types::tasks::Kind) of the matched tasks.
///
/// The kind of a task is given by:
/// ```
/// # use meilisearch_types::tasks::{Task, Kind};
/// # fn doc_func(task: Task) -> Kind {
/// task.kind.as_kind()
/// # }
/// ```
pub kind: Option<Vec<Kind>>,
/// The allowed [index ids](meilisearch_types::tasks::Task::index_uid) of the matched tasks
pub index_uid: Option<Vec<String>>,
/// The [task ids](`meilisearch_types::tasks::Task::uid`) to be matched
pub uid: Option<Vec<TaskId>>,
/// Exclusive upper bound of the matched tasks' [`enqueued_at`](meilisearch_types::tasks::Task::enqueued_at) field.
pub before_enqueued_at: Option<OffsetDateTime>,
/// Exclusive lower bound of the matched tasks' [`enqueued_at`](meilisearch_types::tasks::Task::enqueued_at) field.
pub after_enqueued_at: Option<OffsetDateTime>,
/// Exclusive upper bound of the matched tasks' [`started_at`](meilisearch_types::tasks::Task::started_at) field.
pub before_started_at: Option<OffsetDateTime>,
/// Exclusive lower bound of the matched tasks' [`started_at`](meilisearch_types::tasks::Task::started_at) field.
pub after_started_at: Option<OffsetDateTime>,
/// Exclusive upper bound of the matched tasks' [`finished_at`](meilisearch_types::tasks::Task::finished_at) field.
pub before_finished_at: Option<OffsetDateTime>,
/// Exclusive lower bound of the matched tasks' [`finished_at`](meilisearch_types::tasks::Task::finished_at) field.
pub after_finished_at: Option<OffsetDateTime>,
}
impl Query {
/// Return `true` iff every field of the query is set to `None`, such that the query
/// matches all tasks.
pub fn is_empty(&self) -> bool {
matches!(
self,
Query {
limit: None,
from: None,
status: None,
kind: None,
index_uid: None,
uid: None,
before_enqueued_at: None,
after_enqueued_at: None,
before_started_at: None,
after_started_at: None,
before_finished_at: None,
after_finished_at: None,
}
)
}
/// Add an [index id](meilisearch_types::tasks::Task::index_uid) to the list of permitted indexes.
pub fn with_index(self, index_uid: String) -> Self {
let mut index_vec = self.index_uid.unwrap_or_default();
index_vec.push(index_uid);
Self {
index_uid: Some(index_vec),
..self
}
}
}
2022-10-17 13:54:35 +02:00
#[derive(Debug, Clone)]
struct ProcessingTasks {
/// The date and time at which the indexation started.
started_at: OffsetDateTime,
/// The list of tasks ids that are currently running.
processing: RoaringBitmap,
}
impl ProcessingTasks {
/// Creates an empty `ProcessingAt` struct.
fn new() -> ProcessingTasks {
ProcessingTasks {
started_at: OffsetDateTime::now_utc(),
processing: RoaringBitmap::new(),
}
}
/// Stores the currently processing tasks, and the date time at which it started.
2022-10-17 13:54:35 +02:00
fn start_processing_at(&mut self, started_at: OffsetDateTime, processing: RoaringBitmap) {
self.started_at = started_at;
self.processing = processing;
}
/// Set the processing tasks to an empty list.
2022-10-17 13:54:35 +02:00
fn stop_processing_at(&mut self, stopped_at: OffsetDateTime) {
self.started_at = stopped_at;
self.processing = RoaringBitmap::new();
}
/// Returns `true` if there, at least, is one task that is currently processing we must stop.
fn must_cancel_processing_tasks(&self, canceled_tasks: &RoaringBitmap) -> bool {
!self.processing.is_disjoint(canceled_tasks)
}
}
#[derive(Default, Clone, Debug)]
struct MustStopProcessing(Arc<AtomicBool>);
impl MustStopProcessing {
fn get(&self) -> bool {
self.0.load(Relaxed)
}
fn must_stop(&self) {
self.0.store(true, Relaxed);
}
fn reset(&self) {
self.0.store(false, Relaxed);
2022-10-17 13:54:35 +02:00
}
}
/// Database const names for the `IndexScheduler`.
mod db_name {
pub const ALL_TASKS: &str = "all-tasks";
pub const STATUS: &str = "status";
pub const KIND: &str = "kind";
pub const INDEX_TASKS: &str = "index-tasks";
pub const ENQUEUED_AT: &str = "enqueued-at";
pub const STARTED_AT: &str = "started-at";
pub const FINISHED_AT: &str = "finished-at";
}
/// Structure which holds meilisearch's indexes and schedules the tasks
/// to be performed on them.
pub struct IndexScheduler {
/// The LMDB environment which the DBs are associated with.
pub(crate) env: Env,
/// A boolean that can be set to true to stop the currently processing tasks.
pub(crate) must_stop_processing: MustStopProcessing,
/// The list of tasks currently processing
2022-10-17 13:54:35 +02:00
pub(crate) processing_tasks: Arc<RwLock<ProcessingTasks>>,
/// The list of files referenced by the tasks
2022-10-17 13:54:35 +02:00
pub(crate) file_store: FileStore,
// The main database, it contains all the tasks accessible by their Id.
pub(crate) all_tasks: Database<OwnedType<BEU32>, SerdeJson<Task>>,
/// All the tasks ids grouped by their status.
// TODO we should not be able to serialize a `Status::Processing` in this database.
pub(crate) status: Database<SerdeBincode<Status>, RoaringBitmapCodec>,
/// All the tasks ids grouped by their kind.
pub(crate) kind: Database<SerdeBincode<Kind>, RoaringBitmapCodec>,
/// Store the tasks associated to an index.
pub(crate) index_tasks: Database<Str, RoaringBitmapCodec>,
/// Store the task ids of tasks which were enqueued at a specific date
///
/// Note that since we store the date with nanosecond-level precision, it would be
/// reasonable to assume that there is only one task per key. However, it is not a
/// theoretical certainty, and we might want to make it possible to enqueue multiple
/// tasks at a time in the future.
pub(crate) enqueued_at: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
/// Store the task ids of finished tasks which started being processed at a specific date
pub(crate) started_at: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
/// Store the task ids of tasks which finished at a specific date
pub(crate) finished_at: Database<OwnedType<BEI128>, CboRoaringBitmapCodec>,
/// In charge of creating, opening, storing and returning indexes.
pub(crate) index_mapper: IndexMapper,
/// Get a signal when a batch needs to be processed.
pub(crate) wake_up: Arc<SignalEvent>,
2022-10-17 13:54:35 +02:00
/// Whether auto-batching is enabled or not.
pub(crate) autobatching_enabled: bool,
2022-10-13 15:02:59 +02:00
/// The path used to create the dumps.
pub(crate) dumps_path: PathBuf,
// ================= test
/// The next entry is dedicated to the tests.
/// It provide a way to break in multiple part of the scheduler.
#[cfg(test)]
test_breakpoint_sdr: crossbeam::channel::Sender<Breakpoint>,
}
#[cfg(test)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Breakpoint {
Start,
BatchCreated,
BeforeProcessing,
AfterProcessing,
}
impl IndexScheduler {
/// Create an index scheduler and start its run loop.
///
/// ## Arguments
/// - `tasks_path`: the path to the folder containing the task databases
/// - `update_file_path`: the path to the file store containing the files associated to the tasks
/// - `indexes_path`: the path to the folder containing meilisearch's indexes
/// - `dumps_path`: the path to the folder containing the dumps
/// - `index_size`: the maximum size, in bytes, of each meilisearch index
/// - `indexer_config`: configuration used during indexing for each meilisearch index
/// - `autobatching_enabled`: `true` iff the index scheduler is allowed to automatically batch tasks
/// together, to process multiple tasks at once.
pub fn new(
tasks_path: PathBuf,
update_file_path: PathBuf,
indexes_path: PathBuf,
2022-10-13 15:02:59 +02:00
dumps_path: PathBuf,
index_size: usize,
indexer_config: IndexerConfig,
autobatching_enabled: bool,
#[cfg(test)] test_breakpoint_sdr: crossbeam::channel::Sender<Breakpoint>,
) -> Result<Self> {
std::fs::create_dir_all(&tasks_path)?;
std::fs::create_dir_all(&update_file_path)?;
std::fs::create_dir_all(&indexes_path)?;
2022-10-13 15:02:59 +02:00
std::fs::create_dir_all(&dumps_path)?;
let mut options = heed::EnvOpenOptions::new();
options.max_dbs(9);
let env = options.open(tasks_path)?;
let file_store = FileStore::new(&update_file_path)?;
// allow unreachable_code to get rids of the warning in the case of a test build.
let this = Self {
must_stop_processing: MustStopProcessing::default(),
processing_tasks: Arc::new(RwLock::new(ProcessingTasks::new())),
file_store,
all_tasks: env.create_database(Some(db_name::ALL_TASKS))?,
status: env.create_database(Some(db_name::STATUS))?,
kind: env.create_database(Some(db_name::KIND))?,
index_tasks: env.create_database(Some(db_name::INDEX_TASKS))?,
enqueued_at: env.create_database(Some(db_name::ENQUEUED_AT))?,
started_at: env.create_database(Some(db_name::STARTED_AT))?,
finished_at: env.create_database(Some(db_name::FINISHED_AT))?,
index_mapper: IndexMapper::new(&env, indexes_path, index_size, indexer_config)?,
env,
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
wake_up: Arc::new(SignalEvent::auto(true)),
autobatching_enabled,
2022-10-13 15:02:59 +02:00
dumps_path,
#[cfg(test)]
test_breakpoint_sdr,
};
this.run();
Ok(this)
}
/// Start the run loop for the given index scheduler.
///
/// This function will execute in a different thread and must be called
/// only once per index scheduler.
fn run(&self) {
let run = Self {
must_stop_processing: MustStopProcessing::default(),
processing_tasks: self.processing_tasks.clone(),
file_store: self.file_store.clone(),
env: self.env.clone(),
all_tasks: self.all_tasks,
status: self.status,
kind: self.kind,
index_tasks: self.index_tasks,
enqueued_at: self.enqueued_at,
started_at: self.started_at,
finished_at: self.finished_at,
index_mapper: self.index_mapper.clone(),
wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled,
2022-10-13 15:02:59 +02:00
dumps_path: self.dumps_path.clone(),
#[cfg(test)]
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
};
std::thread::spawn(move || loop {
run.wake_up.wait();
match run.tick() {
Ok(0) => (),
Ok(_) => run.wake_up.signal(),
Err(e) => log::error!("{}", e),
}
});
}
2022-10-16 01:39:01 +02:00
pub fn indexer_config(&self) -> &IndexerConfig {
&self.index_mapper.indexer_config
}
/// Return the index corresponding to the name.
///
/// * If the index wasn't opened before, the index will be opened.
/// * If the index doesn't exist on disk, the `IndexNotFoundError` is thrown.
pub fn index(&self, name: &str) -> Result<Index> {
let rtxn = self.env.read_txn()?;
self.index_mapper.index(&rtxn, name)
}
/// Return and open all the indexes.
pub fn indexes(&self) -> Result<Vec<(String, Index)>> {
let rtxn = self.env.read_txn()?;
self.index_mapper.indexes(&rtxn)
}
/// Return the task ids matched by the given query.
pub fn get_task_ids(&self, query: &Query) -> Result<RoaringBitmap> {
let rtxn = self.env.read_txn()?;
// This is the list of all the tasks.
let mut tasks = self.all_task_ids(&rtxn)?;
if let Some(uids) = &query.uid {
tasks &= RoaringBitmap::from_iter(uids);
}
if let Some(status) = &query.status {
let mut status_tasks = RoaringBitmap::new();
for status in status {
status_tasks |= self.get_status(&rtxn, *status)?;
}
tasks &= status_tasks;
}
if let Some(kind) = &query.kind {
let mut kind_tasks = RoaringBitmap::new();
for kind in kind {
kind_tasks |= self.get_kind(&rtxn, *kind)?;
}
tasks &= kind_tasks;
}
if let Some(index) = &query.index_uid {
let mut index_tasks = RoaringBitmap::new();
for index in index {
index_tasks |= self.index_tasks(&rtxn, &index)?;
}
tasks &= index_tasks;
}
2022-10-17 16:30:18 +02:00
keep_tasks_within_datetimes(
&rtxn,
&mut tasks,
self.enqueued_at,
query.after_enqueued_at,
query.before_enqueued_at,
)?;
keep_tasks_within_datetimes(
&rtxn,
&mut tasks,
self.started_at,
query.after_started_at,
query.before_started_at,
)?;
keep_tasks_within_datetimes(
&rtxn,
&mut tasks,
self.finished_at,
query.after_finished_at,
query.before_finished_at,
)?;
Ok(tasks)
}
/// Returns the tasks matched by the given query.
pub fn get_tasks(&self, query: Query) -> Result<Vec<Task>> {
let tasks = self.get_task_ids(&query)?;
let rtxn = self.env.read_txn()?;
let tasks = self.get_existing_tasks(
&rtxn,
tasks
.into_iter()
.rev()
.take(query.limit.unwrap_or(u32::MAX) as usize),
)?;
let ProcessingTasks {
started_at,
processing,
..
} = self
.processing_tasks
.read()
.map_err(|_| Error::CorruptedTaskQueue)?
.clone();
2022-10-12 03:21:25 +02:00
let ret = tasks.into_iter();
if processing.is_empty() {
Ok(ret.collect())
} else {
Ok(ret
.map(|task| match processing.contains(task.uid) {
2022-10-12 03:21:25 +02:00
true => Task {
status: Status::Processing,
started_at: Some(started_at),
..task
},
false => task,
})
.collect())
}
}
/// Register a new task in the scheduler.
///
/// If it fails and data was associated with the task, it tries to delete the associated data.
pub fn register(&self, kind: KindWithContent) -> Result<Task> {
let mut wtxn = self.env.write_txn()?;
let task = Task {
uid: self.next_task_id(&wtxn)?,
enqueued_at: time::OffsetDateTime::now_utc(),
started_at: None,
finished_at: None,
error: None,
canceled_by: None,
details: kind.default_details(),
status: Status::Enqueued,
kind: kind.clone(),
};
self.all_tasks
.append(&mut wtxn, &BEU32::new(task.uid), &task)?;
if let Some(indexes) = task.indexes() {
for index in indexes {
self.update_index(&mut wtxn, index, |bitmap| {
bitmap.insert(task.uid);
})?;
}
}
self.update_status(&mut wtxn, Status::Enqueued, |bitmap| {
bitmap.insert(task.uid);
})?;
self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| {
(bitmap.insert(task.uid));
})?;
utils::insert_task_datetime(&mut wtxn, self.enqueued_at, task.enqueued_at, task.uid)?;
if let Err(e) = wtxn.commit() {
self.delete_persisted_task_data(&task)?;
return Err(e.into());
}
// If the registered task is a task cancelation
// we inform the processing tasks to stop (if necessary).
if let KindWithContent::TaskCancelation { tasks, .. } = kind {
let tasks_to_cancel = RoaringBitmap::from_iter(tasks);
if self
.processing_tasks
.read()
.unwrap()
.must_cancel_processing_tasks(&tasks_to_cancel)
{
self.must_stop_processing.must_stop();
}
}
// notify the scheduler loop to execute a new tick
self.wake_up.signal();
2022-10-12 03:21:25 +02:00
Ok(task)
}
2022-10-16 01:39:01 +02:00
/// Register a new task comming from a dump in the scheduler.
/// By takinig a mutable ref we're pretty sure no one will ever import a dump while actix is running.
pub fn register_dumped_task(
2022-10-16 01:39:01 +02:00
&mut self,
task: TaskDump,
content_file: Option<Box<UpdateFile>>,
) -> Result<Task> {
// Currently we don't need to access the tasks queue while loading a dump thus I can block everything.
let mut wtxn = self.env.write_txn()?;
2022-10-16 02:38:36 +02:00
let content_uuid = match content_file {
Some(content_file) if task.status == Status::Enqueued => {
let (uuid, mut file) = self.create_update_file()?;
let mut builder = DocumentsBatchBuilder::new(file.as_file_mut());
for doc in content_file {
builder.append_json_object(&doc?)?;
}
builder.into_inner()?;
file.persist()?;
Some(uuid)
2022-10-16 01:39:01 +02:00
}
2022-10-16 02:38:36 +02:00
// If the task isn't `Enqueued` then just generate a recognisable `Uuid`
// in case we try to open it later.
_ if task.status != Status::Enqueued => Some(Uuid::nil()),
2022-10-16 02:38:36 +02:00
_ => None,
2022-10-16 01:39:01 +02:00
};
let task = Task {
uid: task.uid,
enqueued_at: task.enqueued_at,
started_at: task.started_at,
finished_at: task.finished_at,
error: task.error,
canceled_by: task.canceled_by,
2022-10-16 01:39:01 +02:00
details: task.details,
status: task.status,
kind: match task.kind {
KindDump::DocumentImport {
primary_key,
method,
documents_count,
allow_index_creation,
} => KindWithContent::DocumentImport {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
method,
content_file: content_uuid.ok_or(Error::CorruptedDump)?,
documents_count,
allow_index_creation,
},
KindDump::DocumentDeletion { documents_ids } => KindWithContent::DocumentDeletion {
documents_ids,
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::DocumentClear => KindWithContent::DocumentClear {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::Settings {
settings,
is_deletion,
allow_index_creation,
} => KindWithContent::Settings {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
new_settings: settings,
is_deletion,
allow_index_creation,
},
KindDump::IndexDeletion => KindWithContent::IndexDeletion {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::IndexCreation { primary_key } => KindWithContent::IndexCreation {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
},
KindDump::IndexUpdate { primary_key } => KindWithContent::IndexUpdate {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
},
2022-10-17 16:30:18 +02:00
KindDump::IndexSwap { swaps } => KindWithContent::IndexSwap { swaps },
KindDump::TaskCancelation { query, tasks } => {
KindWithContent::TaskCancelation { query, tasks }
}
2022-10-17 18:11:28 +02:00
KindDump::TasksDeletion { query, tasks } => {
2022-10-17 15:11:35 +02:00
KindWithContent::TaskDeletion { query, tasks }
2022-10-16 01:39:01 +02:00
}
KindDump::DumpExport {
2022-10-16 01:39:01 +02:00
dump_uid,
keys,
instance_uid,
} => KindWithContent::DumpExport {
dump_uid,
keys,
2022-10-16 01:39:01 +02:00
instance_uid,
},
KindDump::Snapshot => KindWithContent::Snapshot,
},
};
self.all_tasks
.put(&mut wtxn, &BEU32::new(task.uid), &task)?;
2022-10-16 01:39:01 +02:00
if let Some(indexes) = task.indexes() {
for index in indexes {
self.update_index(&mut wtxn, index, |bitmap| {
bitmap.insert(task.uid);
})?;
}
}
self.update_status(&mut wtxn, task.status, |bitmap| {
bitmap.insert(task.uid);
})?;
self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| {
(bitmap.insert(task.uid));
})?;
2022-10-17 17:14:44 +02:00
wtxn.commit()?;
self.wake_up.signal();
2022-10-17 17:14:44 +02:00
2022-10-16 01:39:01 +02:00
Ok(task)
}
/// Create a new index without any associated task.
pub fn create_raw_index(&self, name: &str) -> Result<Index> {
let mut wtxn = self.env.write_txn()?;
2022-10-16 03:14:01 +02:00
let index = self.index_mapper.create_index(&mut wtxn, name)?;
wtxn.commit()?;
Ok(index)
2022-10-16 01:39:01 +02:00
}
/// Create a file and register it in the index scheduler.
///
/// The returned file and uuid can be used to associate
/// some data to a task. The file will be kept until
/// the task has been fully processed.
2022-10-16 01:39:01 +02:00
pub fn create_update_file(&self) -> Result<(Uuid, file_store::File)> {
Ok(self.file_store.new_update()?)
}
#[cfg(test)]
2022-10-16 01:39:01 +02:00
pub fn create_update_file_with_uuid(&self, uuid: u128) -> Result<(Uuid, file_store::File)> {
Ok(self.file_store.new_update_with_uuid(uuid)?)
}
/// Delete a file from the index scheduler.
///
/// Counterpart to the [`create_update_file`](IndexScheduler::create_update_file) method.
pub fn delete_update_file(&self, uuid: Uuid) -> Result<()> {
Ok(self.file_store.delete(uuid)?)
}
/// Perform one iteration of the run loop.
///
/// 1. Find the next batch of tasks to be processed.
/// 2. Update the information of these tasks following the start of their processing.
/// 3. Update the in-memory list of processed tasks accordingly.
/// 4. Process the batch:
/// - perform the actions of each batched task
/// - update the information of each batched task following the end
/// of their processing.
/// 5. Reset the in-memory list of processed tasks.
///
/// Returns the number of processed tasks.
fn tick(&self) -> Result<usize> {
#[cfg(test)]
self.test_breakpoint_sdr.send(Breakpoint::Start).unwrap();
let rtxn = self.env.read_txn()?;
let batch = match self.create_next_batch(&rtxn)? {
Some(batch) => batch,
None => return Ok(0),
};
drop(rtxn);
// 1. store the starting date with the bitmap of processing tasks.
let mut ids = batch.ids();
ids.sort_unstable();
let processed_tasks = ids.len();
let processing_tasks = RoaringBitmap::from_sorted_iter(ids.iter().copied()).unwrap();
let started_at = OffsetDateTime::now_utc();
// We reset the must_stop flag to be sure that we don't stop processing tasks
self.must_stop_processing.reset();
2022-10-17 13:54:35 +02:00
self.processing_tasks
.write()
.unwrap()
.start_processing_at(started_at, processing_tasks);
#[cfg(test)]
{
self.test_breakpoint_sdr
.send(Breakpoint::BatchCreated)
.unwrap();
self.test_breakpoint_sdr
.send(Breakpoint::BeforeProcessing)
.unwrap();
}
// 2. Process the tasks
let res = self.process_batch(batch);
let mut wtxn = self.env.write_txn()?;
2022-10-17 16:30:18 +02:00
let finished_at = OffsetDateTime::now_utc();
match res {
Ok(tasks) => {
for mut task in tasks {
task.started_at = Some(started_at);
task.finished_at = Some(finished_at);
self.update_task(&mut wtxn, &task)?;
self.delete_persisted_task_data(&task)?;
}
2022-10-13 15:02:59 +02:00
log::info!("A batch of tasks was successfully completed.");
}
// If we have an abortion error we must stop the tick here and re-schedule tasks.
Err(Error::Milli(milli::Error::InternalError(
milli::InternalError::AbortedIndexation,
))) => {
// TODO should we add a breakpoint here?
wtxn.abort()?;
return Ok(0);
}
// In case of a failure we must get back and patch all the tasks with the error.
Err(err) => {
let error: ResponseError = err.into();
for id in ids {
let mut task = self.get_task(&wtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
task.started_at = Some(started_at);
task.finished_at = Some(finished_at);
task.status = Status::Failed;
task.error = Some(error.clone());
self.update_task(&mut wtxn, &task)?;
}
}
}
2022-10-17 13:54:35 +02:00
self.processing_tasks
.write()
.unwrap()
.stop_processing_at(finished_at);
wtxn.commit()?;
#[cfg(test)]
self.test_breakpoint_sdr
.send(Breakpoint::AfterProcessing)
.unwrap();
Ok(processed_tasks)
}
pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> {
match task.content_uuid() {
Some(content_file) => self.delete_update_file(*content_file),
None => Ok(()),
}
}
}
2022-09-15 12:23:41 +02:00
#[cfg(test)]
mod tests {
use big_s::S;
2022-10-17 19:24:06 +02:00
use file_store::File;
use meili_snap::snapshot;
use meilisearch_types::milli::update::IndexDocumentsMethod::ReplaceDocuments;
use tempfile::TempDir;
use uuid::Uuid;
2022-10-10 17:02:28 +02:00
use crate::snapshot::snapshot_index_scheduler;
use super::*;
2022-10-10 17:02:28 +02:00
/// Return a `KindWithContent::IndexCreation` task
fn index_creation_task(index: &'static str, primary_key: &'static str) -> KindWithContent {
KindWithContent::IndexCreation {
index_uid: S(index),
primary_key: Some(S(primary_key)),
}
}
/// Create a `KindWithContent::DocumentImport` task that imports documents.
///
/// - `index_uid` is given as parameter
/// - `primary_key` is given as parameter
/// - `method` is set to `ReplaceDocuments`
/// - `content_file` is given as parameter
/// - `documents_count` is given as parameter
/// - `allow_index_creation` is set to `true`
fn replace_document_import_task(
index: &'static str,
primary_key: Option<&'static str>,
content_file_uuid: u128,
documents_count: u64,
) -> KindWithContent {
KindWithContent::DocumentImport {
index_uid: S(index),
primary_key: primary_key.map(ToOwned::to_owned),
method: ReplaceDocuments,
content_file: Uuid::from_u128(content_file_uuid),
documents_count: documents_count,
allow_index_creation: true,
}
}
/// Create an update file with the given file uuid.
///
/// The update file contains just one simple document whose id is given by `document_id`.
///
/// The uuid of the file and its documents count is returned.
fn sample_documents(
index_scheduler: &IndexScheduler,
file_uuid: u128,
document_id: usize,
) -> (File, u64) {
let content = format!(
r#"
{{
"id" : "{document_id}"
}}"#
);
let (_uuid, mut file) = index_scheduler
.create_update_file_with_uuid(file_uuid)
.unwrap();
let documents_count =
meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut())
.unwrap() as u64;
2022-10-10 17:02:28 +02:00
(file, documents_count)
}
impl IndexScheduler {
pub fn test(autobatching: bool) -> (Self, IndexSchedulerHandle) {
let tempdir = TempDir::new().unwrap();
let (sender, receiver) = crossbeam::channel::bounded(0);
let index_scheduler = Self::new(
tempdir.path().join("db_path"),
tempdir.path().join("file_store"),
tempdir.path().join("indexes"),
2022-10-13 15:02:59 +02:00
tempdir.path().join("dumps"),
1024 * 1024,
IndexerConfig::default(),
autobatching, // enable autobatching
sender,
)
.unwrap();
let index_scheduler_handle = IndexSchedulerHandle {
_tempdir: tempdir,
test_breakpoint_rcv: receiver,
};
(index_scheduler, index_scheduler_handle)
}
}
pub struct IndexSchedulerHandle {
_tempdir: TempDir,
test_breakpoint_rcv: crossbeam::channel::Receiver<Breakpoint>,
}
impl IndexSchedulerHandle {
/// Wait until the provided breakpoint is reached.
fn wait_till(&self, breakpoint: Breakpoint) {
self.test_breakpoint_rcv.iter().find(|b| *b == breakpoint);
}
2022-10-10 17:02:28 +02:00
#[allow(unused)]
/// Wait until the provided breakpoint is reached.
fn next_breakpoint(&self) -> Breakpoint {
self.test_breakpoint_rcv.recv().unwrap()
}
/// The scheduler will not stop on breakpoints anymore.
fn dont_block(self) {
std::thread::spawn(move || loop {
// unroll and ignore all the state the scheduler is going to send us.
self.test_breakpoint_rcv.iter().last();
});
}
}
#[test]
fn register() {
2022-10-10 17:02:28 +02:00
// In this test, the handle doesn't make any progress, we only check that the tasks are registered
let (index_scheduler, _handle) = IndexScheduler::test(true);
let kinds = [
2022-10-10 17:02:28 +02:00
index_creation_task("catto", "mouse"),
replace_document_import_task("catto", None, 0, 12),
replace_document_import_task("catto", None, 1, 50),
replace_document_import_task("doggo", Some("bone"), 2, 5000),
];
for (idx, kind) in kinds.into_iter().enumerate() {
let k = kind.as_kind();
let task = index_scheduler.register(kind).unwrap();
assert_eq!(task.uid, idx as u32);
assert_eq!(task.status, Status::Enqueued);
2022-10-12 03:21:25 +02:00
assert_eq!(task.kind.as_kind(), k);
}
snapshot!(snapshot_index_scheduler(&index_scheduler));
}
#[test]
fn insert_task_while_another_task_is_processing() {
let (index_scheduler, handle) = IndexScheduler::test(true);
2022-10-17 16:30:18 +02:00
index_scheduler
.register(index_creation_task("index_a", "id"))
.unwrap();
handle.wait_till(Breakpoint::BatchCreated);
// while the task is processing can we register another task?
2022-10-17 16:30:18 +02:00
index_scheduler
.register(index_creation_task("index_b", "id"))
.unwrap();
index_scheduler
.register(KindWithContent::IndexDeletion {
2022-10-17 16:30:18 +02:00
index_uid: S("index_a"),
})
.unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler));
}
/// We send a lot of tasks but notify the tasks scheduler only once as
/// we send them very fast, we must make sure that they are all processed.
#[test]
fn process_tasks_inserted_without_new_signal() {
let (index_scheduler, handle) = IndexScheduler::test(true);
index_scheduler
.register(KindWithContent::IndexCreation {
index_uid: S("doggos"),
primary_key: None,
})
.unwrap();
index_scheduler
.register(KindWithContent::IndexCreation {
index_uid: S("cattos"),
primary_key: None,
})
.unwrap();
index_scheduler
.register(KindWithContent::IndexDeletion {
index_uid: S("doggos"),
})
.unwrap();
handle.wait_till(Breakpoint::Start);
handle.wait_till(Breakpoint::AfterProcessing);
handle.wait_till(Breakpoint::AfterProcessing);
handle.wait_till(Breakpoint::AfterProcessing);
let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap();
tasks.reverse();
assert_eq!(tasks.len(), 3);
assert_eq!(tasks[0].status, Status::Succeeded);
assert_eq!(tasks[1].status, Status::Succeeded);
assert_eq!(tasks[2].status, Status::Succeeded);
}
#[test]
fn process_tasks_without_autobatching() {
let (index_scheduler, handle) = IndexScheduler::test(false);
index_scheduler
.register(KindWithContent::IndexCreation {
index_uid: S("doggos"),
primary_key: None,
})
.unwrap();
index_scheduler
.register(KindWithContent::DocumentClear {
index_uid: S("doggos"),
})
.unwrap();
index_scheduler
.register(KindWithContent::DocumentClear {
index_uid: S("doggos"),
})
.unwrap();
index_scheduler
.register(KindWithContent::DocumentClear {
index_uid: S("doggos"),
})
.unwrap();
handle.wait_till(Breakpoint::AfterProcessing);
handle.wait_till(Breakpoint::AfterProcessing);
handle.wait_till(Breakpoint::AfterProcessing);
handle.wait_till(Breakpoint::AfterProcessing);
let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap();
tasks.reverse();
assert_eq!(tasks.len(), 4);
assert_eq!(tasks[0].status, Status::Succeeded);
assert_eq!(tasks[1].status, Status::Succeeded);
assert_eq!(tasks[2].status, Status::Succeeded);
assert_eq!(tasks[3].status, Status::Succeeded);
}
#[test]
2022-10-10 17:02:28 +02:00
fn task_deletion_undeleteable() {
let (index_scheduler, handle) = IndexScheduler::test(true);
let to_enqueue = [
2022-10-10 17:02:28 +02:00
index_creation_task("catto", "mouse"),
replace_document_import_task("catto", None, 0, 12),
replace_document_import_task("doggo", Some("bone"), 1, 5000),
];
for task in to_enqueue {
let _ = index_scheduler.register(task).unwrap();
}
2022-10-10 17:02:28 +02:00
// here we have registered all the tasks, but the index scheduler
// has not progressed at all
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_enqueued");
2022-10-10 17:02:28 +02:00
index_scheduler
.register(KindWithContent::TaskDeletion {
2022-10-10 17:02:28 +02:00
query: "test_query".to_owned(),
tasks: RoaringBitmap::from_iter(&[0, 1]),
2022-10-10 17:02:28 +02:00
})
.unwrap();
// again, no progress made at all, but one more task is registered
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_enqueued");
2022-10-10 17:02:28 +02:00
// now we create the first batch
handle.wait_till(Breakpoint::BatchCreated);
2022-10-10 17:02:28 +02:00
// the task deletion should now be "processing"
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_processing");
handle.wait_till(Breakpoint::AfterProcessing);
2022-10-10 17:02:28 +02:00
// after the task deletion is processed, no task should actually have been deleted,
// because the tasks with ids 0 and 1 were still "enqueued", and thus undeleteable
// the "task deletion" task should be marked as "succeeded" and, in its details, the
// number of deleted tasks should be 0
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_done");
2022-10-10 17:02:28 +02:00
}
#[test]
fn task_deletion_deleteable() {
let (index_scheduler, handle) = IndexScheduler::test(true);
2022-10-10 17:02:28 +02:00
let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0);
let (file1, documents_count1) = sample_documents(&index_scheduler, 1, 1);
let to_enqueue = [
replace_document_import_task("catto", None, 0, documents_count0),
replace_document_import_task("doggo", Some("bone"), 1, documents_count1),
];
for task in to_enqueue {
let _ = index_scheduler.register(task).unwrap();
}
file0.persist().unwrap();
file1.persist().unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_enqueued");
2022-10-10 17:02:28 +02:00
handle.wait_till(Breakpoint::AfterProcessing);
// first addition of documents should be successful
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_processed");
2022-10-10 17:02:28 +02:00
// Now we delete the first task
index_scheduler
.register(KindWithContent::TaskDeletion {
2022-10-10 17:02:28 +02:00
query: "test_query".to_owned(),
tasks: RoaringBitmap::from_iter(&[0]),
2022-10-10 17:02:28 +02:00
})
.unwrap();
handle.wait_till(Breakpoint::AfterProcessing);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_processed");
}
#[test]
fn task_deletion_delete_same_task_twice() {
let (index_scheduler, handle) = IndexScheduler::test(true);
let (file0, documents_count0) = sample_documents(&index_scheduler, 0, 0);
let (file1, documents_count1) = sample_documents(&index_scheduler, 1, 1);
let to_enqueue = [
replace_document_import_task("catto", None, 0, documents_count0),
replace_document_import_task("doggo", Some("bone"), 1, documents_count1),
];
for task in to_enqueue {
let _ = index_scheduler.register(task).unwrap();
}
file0.persist().unwrap();
file1.persist().unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_enqueued");
handle.wait_till(Breakpoint::AfterProcessing);
// first addition of documents should be successful
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_processed");
// Now we delete the first task multiple times in a row
for _ in 0..2 {
index_scheduler
.register(KindWithContent::TaskDeletion {
query: "test_query".to_owned(),
tasks: RoaringBitmap::from_iter(&[0]),
})
.unwrap();
}
for _ in 0..2 {
handle.wait_till(Breakpoint::AfterProcessing);
}
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_processed");
}
#[test]
fn document_addition() {
let (index_scheduler, handle) = IndexScheduler::test(true);
let content = r#"
{
"id": 1,
"doggo": "bob"
}"#;
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
let documents_count =
meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut())
.unwrap() as u64;
index_scheduler
.register(KindWithContent::DocumentImport {
index_uid: S("doggos"),
primary_key: Some(S("id")),
method: ReplaceDocuments,
content_file: uuid,
documents_count,
allow_index_creation: true,
})
.unwrap();
file.persist().unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler));
handle.wait_till(Breakpoint::BatchCreated);
snapshot!(snapshot_index_scheduler(&index_scheduler));
handle.wait_till(Breakpoint::AfterProcessing);
snapshot!(snapshot_index_scheduler(&index_scheduler));
}
2022-10-19 16:44:42 +02:00
#[test]
fn document_addition_and_index_deletion() {
let (index_scheduler, handle) = IndexScheduler::test(true);
let content = r#"
{
"id": 1,
"doggo": "bob"
}"#;
index_scheduler
.register(KindWithContent::IndexCreation {
index_uid: S("doggos"),
primary_key: None,
})
.unwrap();
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
let documents_count =
meilisearch_types::document_formats::read_json(content.as_bytes(), file.as_file_mut())
.unwrap() as u64;
file.persist().unwrap();
index_scheduler
.register(KindWithContent::DocumentImport {
index_uid: S("doggos"),
primary_key: Some(S("id")),
method: ReplaceDocuments,
content_file: uuid,
documents_count,
allow_index_creation: true,
})
.unwrap();
index_scheduler
.register(KindWithContent::IndexDeletion {
index_uid: S("doggos"),
})
.unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler));
handle.wait_till(Breakpoint::Start); // The index creation.
handle.wait_till(Breakpoint::Start); // before anything happens.
handle.wait_till(Breakpoint::Start); // after the execution of the two tasks in a single batch.
snapshot!(snapshot_index_scheduler(&index_scheduler));
}
#[test]
fn do_not_batch_task_of_different_indexes() {
let (index_scheduler, handle) = IndexScheduler::test(true);
let index_names = ["doggos", "cattos", "girafos"];
for name in index_names {
index_scheduler
.register(KindWithContent::IndexCreation {
index_uid: name.to_string(),
primary_key: None,
})
.unwrap();
}
for name in index_names {
index_scheduler
.register(KindWithContent::DocumentClear {
index_uid: name.to_string(),
})
.unwrap();
}
for _ in 0..(index_names.len() * 2) {
handle.wait_till(Breakpoint::AfterProcessing);
}
let mut tasks = index_scheduler.get_tasks(Query::default()).unwrap();
tasks.reverse();
assert_eq!(tasks.len(), 6);
assert_eq!(tasks[0].status, Status::Succeeded);
assert_eq!(tasks[1].status, Status::Succeeded);
assert_eq!(tasks[2].status, Status::Succeeded);
assert_eq!(tasks[3].status, Status::Succeeded);
assert_eq!(tasks[4].status, Status::Succeeded);
assert_eq!(tasks[5].status, Status::Succeeded);
}
2022-10-17 16:30:18 +02:00
#[test]
fn swap_indexes() {
let (index_scheduler, handle) = IndexScheduler::test(true);
let to_enqueue = [
index_creation_task("a", "id"),
index_creation_task("b", "id"),
index_creation_task("c", "id"),
index_creation_task("d", "id"),
];
for task in to_enqueue {
let _ = index_scheduler.register(task).unwrap();
}
handle.wait_till(Breakpoint::AfterProcessing);
handle.wait_till(Breakpoint::AfterProcessing);
handle.wait_till(Breakpoint::AfterProcessing);
handle.wait_till(Breakpoint::AfterProcessing);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_processed");
index_scheduler
.register(KindWithContent::IndexSwap {
swaps: vec![
("a".to_owned(), "b".to_owned()),
("c".to_owned(), "d".to_owned()),
],
})
.unwrap();
handle.wait_till(Breakpoint::AfterProcessing);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "first_swap_processed");
index_scheduler
.register(KindWithContent::IndexSwap {
swaps: vec![("a".to_owned(), "c".to_owned())],
})
.unwrap();
handle.wait_till(Breakpoint::AfterProcessing);
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "second_swap_processed");
}
2022-09-15 12:23:41 +02:00
#[macro_export]
macro_rules! debug_snapshot {
($value:expr, @$snapshot:literal) => {{
let value = format!("{:?}", $value);
meili_snap::snapshot!(value, @$snapshot);
}};
}
#[test]
fn simple_new() {
2022-10-17 16:30:18 +02:00
let (_index_scheduler, _handle) = crate::IndexScheduler::test(true);
}
2022-09-15 12:23:41 +02:00
}