938 lines
36 KiB
Rust
Raw Normal View History

//! Utility functions on the DBs. Mainly getter and setters.
use std::collections::{BTreeSet, HashSet};
use std::ops::Bound;
2024-11-19 18:19:41 +01:00
use meilisearch_types::batches::{Batch, BatchId, BatchStats};
use meilisearch_types::heed::types::DecodeIgnore;
2022-10-20 18:00:07 +02:00
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
use meilisearch_types::milli::CboRoaringBitmapCodec;
2024-11-19 17:06:00 +01:00
use meilisearch_types::task_view::DetailsView;
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status};
2022-10-18 11:02:46 +02:00
use roaring::{MultiOps, RoaringBitmap};
use time::OffsetDateTime;
2024-11-19 19:40:31 +01:00
use crate::{Error, IndexScheduler, ProcessingTasks, Result, Task, TaskId, BEI128};
/// This structure contains all the information required to write a batch in the database without reading the tasks.
/// It'll stay in RAM so it must be small.
2024-11-19 18:19:41 +01:00
/// The usage is the following:
/// 1. Create the structure with its batch id.
/// 2. Call `processing` on all the task that we know are currently processing in the batch (it can change in the future)
/// 3. Call `finished` once the batch has been processed.
/// 4. Call `update` on all the tasks.
#[derive(Debug, Clone)]
pub(crate) struct ProcessingBatch {
pub uid: BatchId,
2024-11-19 17:06:00 +01:00
pub details: DetailsView,
2024-11-19 18:19:41 +01:00
pub stats: BatchStats,
2024-11-19 17:06:00 +01:00
pub statuses: HashSet<Status>,
pub kinds: HashSet<Kind>,
pub indexes: HashSet<String>,
pub canceled_by: HashSet<TaskId>,
pub oldest_enqueued_at: Option<OffsetDateTime>,
pub earliest_enqueued_at: Option<OffsetDateTime>,
pub started_at: OffsetDateTime,
2024-11-19 18:19:41 +01:00
pub finished_at: Option<OffsetDateTime>,
}
impl ProcessingBatch {
pub fn new(uid: BatchId) -> Self {
// At the beginning, all the tasks are processing
let mut statuses = HashSet::default();
statuses.insert(Status::Processing);
Self {
uid,
2024-11-19 17:06:00 +01:00
details: DetailsView::default(),
2024-11-19 18:19:41 +01:00
stats: BatchStats::default(),
2024-11-19 17:06:00 +01:00
statuses,
kinds: HashSet::default(),
indexes: HashSet::default(),
canceled_by: HashSet::default(),
oldest_enqueued_at: None,
earliest_enqueued_at: None,
started_at: OffsetDateTime::now_utc(),
2024-11-19 18:19:41 +01:00
finished_at: None,
}
}
/// Update itself with the content of the task and update the batch id in the task.
pub fn processing<'a>(&mut self, tasks: impl IntoIterator<Item = &'a mut Task>) {
for task in tasks.into_iter() {
self.stats.total_nb_tasks += 1;
task.batch_uid = Some(self.uid);
// We don't store the statuses in the map since they're all enqueued but we must
// still store them in the stats since that can be displayed.
*self.stats.status.entry(task.status).or_default() += 1;
self.kinds.insert(task.kind.as_kind());
*self.stats.types.entry(task.kind.as_kind()).or_default() += 1;
self.indexes.extend(task.indexes().iter().map(|s| s.to_string()));
if let Some(index_uid) = task.index_uid() {
*self.stats.index_uids.entry(index_uid.to_string()).or_default() += 1;
}
if let Some(ref details) = task.details {
self.details.accumulate(&DetailsView::from(details.clone()));
}
if let Some(canceled_by) = task.canceled_by {
self.canceled_by.insert(canceled_by);
}
self.oldest_enqueued_at =
Some(self.oldest_enqueued_at.map_or(task.enqueued_at, |oldest_enqueued_at| {
task.enqueued_at.min(oldest_enqueued_at)
}));
self.earliest_enqueued_at =
Some(self.earliest_enqueued_at.map_or(task.enqueued_at, |earliest_enqueued_at| {
task.enqueued_at.max(earliest_enqueued_at)
}));
}
}
2024-11-19 18:19:41 +01:00
/// Must be called once the batch has finished processing.
pub fn finished(&mut self) {
self.details = DetailsView::default();
self.stats = BatchStats::default();
2024-11-19 18:19:41 +01:00
self.finished_at = Some(OffsetDateTime::now_utc());
// Initially we inserted ourselves as a processing batch, that's not the case anymore.
self.statuses.clear();
// We're going to recount the number of tasks AFTER processing the batch because
// tasks may add themselves to a batch while its processing.
self.stats.total_nb_tasks = 0;
}
/// Update the timestamp of the tasks and the inner structure of this sturcture.
pub fn update(&mut self, task: &mut Task) {
// We must re-set this value in case we're dealing with a task that has been added between
// the `processing` and `finished` state
// We must re-set this value in case we're dealing with a task that has been added between
// the `processing` and `finished` state or that failed.
task.batch_uid = Some(self.uid);
// Same
task.started_at = Some(self.started_at);
task.finished_at = self.finished_at;
self.statuses.insert(task.status);
// Craft an aggregation of the details of all the tasks encountered in this batch.
2024-11-19 19:40:31 +01:00
if let Some(ref details) = task.details {
self.details.accumulate(&DetailsView::from(details.clone()));
2024-11-19 18:19:41 +01:00
}
self.stats.total_nb_tasks += 1;
*self.stats.status.entry(task.status).or_default() += 1;
*self.stats.types.entry(task.kind.as_kind()).or_default() += 1;
if let Some(index_uid) = task.index_uid() {
*self.stats.index_uids.entry(index_uid.to_string()).or_default() += 1;
}
}
2024-11-19 19:40:31 +01:00
pub fn to_batch(&self) -> Batch {
Batch {
uid: self.uid,
details: self.details.clone(),
stats: self.stats.clone(),
started_at: self.started_at,
finished_at: self.finished_at,
}
}
}
impl IndexScheduler {
pub(crate) fn all_task_ids(&self, rtxn: &RoTxn) -> Result<RoaringBitmap> {
2022-10-22 16:35:42 +02:00
enum_iterator::all().map(|s| self.get_status(rtxn, s)).union()
}
2022-10-18 11:02:46 +02:00
pub(crate) fn all_batch_ids(&self, rtxn: &RoTxn) -> Result<RoaringBitmap> {
enum_iterator::all().map(|s| self.get_batch_status(rtxn, s)).union()
}
2022-09-07 11:21:53 +02:00
pub(crate) fn last_task_id(&self, rtxn: &RoTxn) -> Result<Option<TaskId>> {
Ok(self.all_tasks.remap_data_type::<DecodeIgnore>().last(rtxn)?.map(|(k, _)| k + 1))
2022-09-07 11:21:53 +02:00
}
pub(crate) fn next_task_id(&self, rtxn: &RoTxn) -> Result<TaskId> {
Ok(self.last_task_id(rtxn)?.unwrap_or_default())
}
2024-11-13 11:27:12 +01:00
pub(crate) fn next_batch_id(&self, rtxn: &RoTxn) -> Result<BatchId> {
Ok(self
.all_batches
.remap_data_type::<DecodeIgnore>()
.last(rtxn)?
.map(|(k, _)| k + 1)
.unwrap_or_default())
}
pub(crate) fn get_task(&self, rtxn: &RoTxn, task_id: TaskId) -> Result<Option<Task>> {
Ok(self.all_tasks.get(rtxn, &task_id)?)
}
pub(crate) fn get_batch(&self, rtxn: &RoTxn, batch_id: BatchId) -> Result<Option<Batch>> {
Ok(self.all_batches.get(rtxn, &batch_id)?)
}
pub(crate) fn write_batch(
&self,
wtxn: &mut RwTxn,
batch: ProcessingBatch,
tasks: &RoaringBitmap,
) -> Result<()> {
self.all_batches.put(
wtxn,
&batch.uid,
2024-11-19 17:06:00 +01:00
&Batch {
uid: batch.uid,
details: batch.details,
2024-11-19 18:19:41 +01:00
stats: batch.stats,
2024-11-19 17:06:00 +01:00
started_at: batch.started_at,
2024-11-19 18:19:41 +01:00
finished_at: batch.finished_at,
2024-11-19 17:06:00 +01:00
},
)?;
self.batch_to_tasks_mapping.put(wtxn, &batch.uid, tasks)?;
for status in batch.statuses {
self.update_batch_status(wtxn, status, |bitmap| {
bitmap.insert(batch.uid);
})?;
}
for kind in batch.kinds {
self.update_batch_kind(wtxn, kind, |bitmap| {
bitmap.insert(batch.uid);
})?;
}
for index in batch.indexes {
self.update_batch_index(wtxn, &index, |bitmap| {
bitmap.insert(batch.uid);
})?;
}
2024-11-19 01:47:42 +01:00
if let Some(enqueued_at) = batch.oldest_enqueued_at {
insert_task_datetime(wtxn, self.batch_enqueued_at, enqueued_at, batch.uid)?;
}
if let Some(enqueued_at) = batch.earliest_enqueued_at {
insert_task_datetime(wtxn, self.batch_enqueued_at, enqueued_at, batch.uid)?;
}
insert_task_datetime(wtxn, self.batch_started_at, batch.started_at, batch.uid)?;
2024-11-19 18:19:41 +01:00
insert_task_datetime(wtxn, self.batch_finished_at, batch.finished_at.unwrap(), batch.uid)?;
Ok(())
}
2024-11-20 14:40:36 +01:00
/// Convert an iterator to a `Vec` of tasks and edit the `ProcessingBatch` to add the given tasks.
///
/// The tasks MUST exist, or a `CorruptedTaskQueue` error will be thrown.
pub(crate) fn get_existing_tasks_for_processing_batch(
2024-11-13 11:27:12 +01:00
&self,
rtxn: &RoTxn,
processing_batch: &mut ProcessingBatch,
2024-11-13 11:27:12 +01:00
tasks: impl IntoIterator<Item = TaskId>,
) -> Result<Vec<Task>> {
tasks
.into_iter()
.map(|task_id| {
let mut task = self
.get_task(rtxn, task_id)
.and_then(|task| task.ok_or(Error::CorruptedTaskQueue));
processing_batch.processing(&mut task);
task
2024-11-13 11:27:12 +01:00
})
.collect::<Result<_>>()
}
/// Convert an iterator to a `Vec` of tasks. The tasks MUST exist or a
2024-11-20 14:40:36 +01:00
/// `CorruptedTaskQueue` error will be thrown.
pub(crate) fn get_existing_tasks(
&self,
rtxn: &RoTxn,
tasks: impl IntoIterator<Item = TaskId>,
) -> Result<Vec<Task>> {
tasks
.into_iter()
.map(|task_id| {
2022-10-20 18:00:07 +02:00
self.get_task(rtxn, task_id).and_then(|task| task.ok_or(Error::CorruptedTaskQueue))
})
.collect::<Result<_>>()
}
/// Convert an iterator to a `Vec` of batches. The batches MUST exist or a
2024-11-20 14:40:36 +01:00
/// `CorruptedTaskQueue` error will be thrown.
pub(crate) fn get_existing_batches(
&self,
rtxn: &RoTxn,
2024-11-19 19:40:31 +01:00
processing: &ProcessingTasks,
tasks: impl IntoIterator<Item = BatchId>,
) -> Result<Vec<Batch>> {
tasks
.into_iter()
.map(|batch_id| {
2024-11-19 19:40:31 +01:00
if Some(batch_id) == processing.batch.as_ref().map(|batch| batch.uid) {
Ok(processing.batch.as_ref().unwrap().to_batch())
} else {
self.get_batch(rtxn, batch_id)
.and_then(|task| task.ok_or(Error::CorruptedTaskQueue))
}
})
.collect::<Result<_>>()
}
2022-09-16 01:58:08 +02:00
pub(crate) fn update_task(&self, wtxn: &mut RwTxn, task: &Task) -> Result<()> {
2022-10-20 18:00:07 +02:00
let old_task = self.get_task(wtxn, task.uid)?.ok_or(Error::CorruptedTaskQueue)?;
2024-11-20 10:23:45 +01:00
debug_assert!(old_task != *task);
debug_assert_eq!(old_task.uid, task.uid);
debug_assert!(old_task.batch_uid.is_none() && task.batch_uid.is_some());
if old_task.status != task.status {
self.update_status(wtxn, old_task.status, |bitmap| {
bitmap.remove(task.uid);
})?;
self.update_status(wtxn, task.status, |bitmap| {
bitmap.insert(task.uid);
})?;
}
if old_task.kind.as_kind() != task.kind.as_kind() {
self.update_kind(wtxn, old_task.kind.as_kind(), |bitmap| {
bitmap.remove(task.uid);
})?;
self.update_kind(wtxn, task.kind.as_kind(), |bitmap| {
bitmap.insert(task.uid);
})?;
}
assert_eq!(
old_task.enqueued_at, task.enqueued_at,
2022-10-19 16:18:00 +02:00
"Cannot update a task's enqueued_at time"
);
if old_task.started_at != task.started_at {
2022-10-20 18:00:07 +02:00
assert!(old_task.started_at.is_none(), "Cannot update a task's started_at time");
if let Some(started_at) = task.started_at {
insert_task_datetime(wtxn, self.started_at, started_at, task.uid)?;
}
}
if old_task.finished_at != task.finished_at {
2022-10-20 18:00:07 +02:00
assert!(old_task.finished_at.is_none(), "Cannot update a task's finished_at time");
if let Some(finished_at) = task.finished_at {
insert_task_datetime(wtxn, self.finished_at, finished_at, task.uid)?;
}
}
self.all_tasks.put(wtxn, &task.uid, task)?;
Ok(())
}
/// Returns the whole set of tasks that belongs to this batch.
pub(crate) fn tasks_in_batch(&self, rtxn: &RoTxn, batch_id: BatchId) -> Result<RoaringBitmap> {
Ok(self.batch_to_tasks_mapping.get(rtxn, &batch_id)?.unwrap_or_default())
}
/// Returns the whole set of tasks that belongs to this index.
pub(crate) fn index_tasks(&self, rtxn: &RoTxn, index: &str) -> Result<RoaringBitmap> {
2022-10-03 15:29:37 +02:00
Ok(self.index_tasks.get(rtxn, index)?.unwrap_or_default())
}
pub(crate) fn update_index(
&self,
wtxn: &mut RwTxn,
index: &str,
f: impl Fn(&mut RoaringBitmap),
) -> Result<()> {
let mut tasks = self.index_tasks(wtxn, index)?;
f(&mut tasks);
if tasks.is_empty() {
self.index_tasks.delete(wtxn, index)?;
} else {
self.index_tasks.put(wtxn, index, &tasks)?;
}
Ok(())
}
/// Returns the whole set of batches that belongs to this index.
pub(crate) fn index_batches(&self, rtxn: &RoTxn, index: &str) -> Result<RoaringBitmap> {
Ok(self.batch_index_tasks.get(rtxn, index)?.unwrap_or_default())
}
pub(crate) fn update_batch_index(
&self,
wtxn: &mut RwTxn,
index: &str,
f: impl Fn(&mut RoaringBitmap),
) -> Result<()> {
let mut batches = self.index_batches(wtxn, index)?;
f(&mut batches);
if batches.is_empty() {
self.batch_index_tasks.delete(wtxn, index)?;
} else {
self.batch_index_tasks.put(wtxn, index, &batches)?;
}
Ok(())
}
pub(crate) fn get_status(&self, rtxn: &RoTxn, status: Status) -> Result<RoaringBitmap> {
Ok(self.status.get(rtxn, &status)?.unwrap_or_default())
}
pub(crate) fn put_status(
&self,
wtxn: &mut RwTxn,
status: Status,
bitmap: &RoaringBitmap,
) -> Result<()> {
Ok(self.status.put(wtxn, &status, bitmap)?)
}
pub(crate) fn update_status(
&self,
wtxn: &mut RwTxn,
status: Status,
f: impl Fn(&mut RoaringBitmap),
) -> Result<()> {
2022-10-03 15:29:37 +02:00
let mut tasks = self.get_status(wtxn, status)?;
f(&mut tasks);
self.put_status(wtxn, status, &tasks)?;
Ok(())
}
pub(crate) fn get_batch_status(&self, rtxn: &RoTxn, status: Status) -> Result<RoaringBitmap> {
Ok(self.batch_status.get(rtxn, &status)?.unwrap_or_default())
}
pub(crate) fn put_batch_status(
&self,
wtxn: &mut RwTxn,
status: Status,
bitmap: &RoaringBitmap,
) -> Result<()> {
Ok(self.batch_status.put(wtxn, &status, bitmap)?)
}
pub(crate) fn update_batch_status(
&self,
wtxn: &mut RwTxn,
status: Status,
f: impl Fn(&mut RoaringBitmap),
) -> Result<()> {
let mut tasks = self.get_batch_status(wtxn, status)?;
f(&mut tasks);
self.put_batch_status(wtxn, status, &tasks)?;
Ok(())
}
pub(crate) fn get_kind(&self, rtxn: &RoTxn, kind: Kind) -> Result<RoaringBitmap> {
2022-10-03 15:29:37 +02:00
Ok(self.kind.get(rtxn, &kind)?.unwrap_or_default())
}
pub(crate) fn put_kind(
&self,
wtxn: &mut RwTxn,
kind: Kind,
bitmap: &RoaringBitmap,
) -> Result<()> {
Ok(self.kind.put(wtxn, &kind, bitmap)?)
}
pub(crate) fn update_kind(
&self,
wtxn: &mut RwTxn,
kind: Kind,
f: impl Fn(&mut RoaringBitmap),
) -> Result<()> {
2022-10-03 15:29:37 +02:00
let mut tasks = self.get_kind(wtxn, kind)?;
f(&mut tasks);
self.put_kind(wtxn, kind, &tasks)?;
Ok(())
}
pub(crate) fn get_batch_kind(&self, rtxn: &RoTxn, kind: Kind) -> Result<RoaringBitmap> {
Ok(self.batch_kind.get(rtxn, &kind)?.unwrap_or_default())
}
pub(crate) fn put_batch_kind(
&self,
wtxn: &mut RwTxn,
kind: Kind,
bitmap: &RoaringBitmap,
) -> Result<()> {
Ok(self.batch_kind.put(wtxn, &kind, bitmap)?)
}
pub(crate) fn update_batch_kind(
&self,
wtxn: &mut RwTxn,
kind: Kind,
f: impl Fn(&mut RoaringBitmap),
) -> Result<()> {
let mut tasks = self.get_batch_kind(wtxn, kind)?;
f(&mut tasks);
self.put_batch_kind(wtxn, kind, &tasks)?;
Ok(())
}
}
pub(crate) fn insert_task_datetime(
wtxn: &mut RwTxn,
database: Database<BEI128, CboRoaringBitmapCodec>,
time: OffsetDateTime,
task_id: TaskId,
) -> Result<()> {
let timestamp = time.unix_timestamp_nanos();
2022-10-22 16:35:42 +02:00
let mut task_ids = database.get(wtxn, &timestamp)?.unwrap_or_default();
task_ids.insert(task_id);
2022-10-25 15:30:36 +02:00
database.put(wtxn, &timestamp, &RoaringBitmap::from_iter(task_ids))?;
Ok(())
}
pub(crate) fn remove_task_datetime(
wtxn: &mut RwTxn,
database: Database<BEI128, CboRoaringBitmapCodec>,
time: OffsetDateTime,
task_id: TaskId,
) -> Result<()> {
let timestamp = time.unix_timestamp_nanos();
2022-10-22 16:35:42 +02:00
if let Some(mut existing) = database.get(wtxn, &timestamp)? {
existing.remove(task_id);
if existing.is_empty() {
database.delete(wtxn, &timestamp)?;
} else {
2022-10-25 15:30:36 +02:00
database.put(wtxn, &timestamp, &RoaringBitmap::from_iter(existing))?;
}
}
Ok(())
}
2024-11-20 10:23:45 +01:00
pub(crate) fn keep_ids_within_datetimes(
rtxn: &RoTxn,
2024-11-20 10:23:45 +01:00
ids: &mut RoaringBitmap,
database: Database<BEI128, CboRoaringBitmapCodec>,
after: Option<OffsetDateTime>,
before: Option<OffsetDateTime>,
) -> Result<()> {
let (start, end) = match (&after, &before) {
(None, None) => return Ok(()),
(None, Some(before)) => (Bound::Unbounded, Bound::Excluded(*before)),
(Some(after), None) => (Bound::Excluded(*after), Bound::Unbounded),
(Some(after), Some(before)) => (Bound::Excluded(*after), Bound::Excluded(*before)),
};
2024-11-20 10:23:45 +01:00
let mut collected_ids = RoaringBitmap::new();
let start = map_bound(start, |b| b.unix_timestamp_nanos());
let end = map_bound(end, |b| b.unix_timestamp_nanos());
2022-10-22 16:35:42 +02:00
let iter = database.range(rtxn, &(start, end))?;
for r in iter {
2024-11-20 10:23:45 +01:00
let (_timestamp, ids) = r?;
collected_ids |= ids;
}
2024-11-20 10:23:45 +01:00
*ids &= collected_ids;
Ok(())
}
// TODO: remove when Bound::map ( https://github.com/rust-lang/rust/issues/86026 ) is available on stable
pub(crate) fn map_bound<T, U>(bound: Bound<T>, map: impl FnOnce(T) -> U) -> Bound<U> {
match bound {
Bound::Included(x) => Bound::Included(map(x)),
Bound::Excluded(x) => Bound::Excluded(map(x)),
Bound::Unbounded => Bound::Unbounded,
}
}
2022-10-17 16:30:18 +02:00
pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) {
use KindWithContent as K;
let mut index_uids = vec![];
match &mut task.kind {
K::DocumentAdditionOrUpdate { index_uid, .. } => index_uids.push(index_uid),
K::DocumentEdition { index_uid, .. } => index_uids.push(index_uid),
2022-10-17 16:30:18 +02:00
K::DocumentDeletion { index_uid, .. } => index_uids.push(index_uid),
K::DocumentDeletionByFilter { index_uid, .. } => index_uids.push(index_uid),
2022-10-17 16:30:18 +02:00
K::DocumentClear { index_uid } => index_uids.push(index_uid),
K::SettingsUpdate { index_uid, .. } => index_uids.push(index_uid),
2022-10-17 16:30:18 +02:00
K::IndexDeletion { index_uid } => index_uids.push(index_uid),
K::IndexCreation { index_uid, .. } => index_uids.push(index_uid),
K::IndexUpdate { index_uid, .. } => index_uids.push(index_uid),
K::IndexSwap { swaps } => {
for IndexSwap { indexes: (lhs, rhs) } in swaps.iter_mut() {
2022-10-22 16:35:42 +02:00
if lhs == swap.0 || lhs == swap.1 {
2022-10-17 16:30:18 +02:00
index_uids.push(lhs);
}
2022-10-22 16:35:42 +02:00
if rhs == swap.0 || rhs == swap.1 {
2022-10-17 16:30:18 +02:00
index_uids.push(rhs);
}
}
}
2022-10-24 19:08:15 +02:00
K::TaskCancelation { .. }
| K::TaskDeletion { .. }
| K::DumpCreation { .. }
| K::SnapshotCreation => (),
2022-10-17 16:30:18 +02:00
};
2022-10-25 10:58:55 +02:00
if let Some(Details::IndexSwap { swaps }) = &mut task.details {
for IndexSwap { indexes: (lhs, rhs) } in swaps.iter_mut() {
2022-10-25 10:58:55 +02:00
if lhs == swap.0 || lhs == swap.1 {
index_uids.push(lhs);
}
if rhs == swap.0 || rhs == swap.1 {
index_uids.push(rhs);
}
2022-10-25 09:48:51 +02:00
}
}
2022-10-17 16:30:18 +02:00
for index_uid in index_uids {
2022-10-22 16:35:42 +02:00
if index_uid == swap.0 {
2024-05-14 17:20:57 +02:00
swap.1.clone_into(index_uid);
2022-10-22 16:35:42 +02:00
} else if index_uid == swap.1 {
2024-05-14 17:20:57 +02:00
swap.0.clone_into(index_uid);
2022-10-17 16:30:18 +02:00
}
}
}
/// Remove references to task ids that are greater than the id of the given task.
pub(crate) fn filter_out_references_to_newer_tasks(task: &mut Task) {
let new_nbr_of_matched_tasks = match &mut task.kind {
KindWithContent::TaskCancelation { tasks, .. }
| KindWithContent::TaskDeletion { tasks, .. } => {
tasks.remove_range(task.uid..);
tasks.len()
}
_ => return,
};
2022-10-26 18:03:48 +02:00
if let Some(
Details::TaskCancelation { matched_tasks, .. }
| Details::TaskDeletion { matched_tasks, .. },
) = &mut task.details
{
*matched_tasks = new_nbr_of_matched_tasks;
}
}
pub(crate) fn check_index_swap_validity(task: &Task) -> Result<()> {
let swaps =
if let KindWithContent::IndexSwap { swaps } = &task.kind { swaps } else { return Ok(()) };
let mut all_indexes = HashSet::new();
let mut duplicate_indexes = BTreeSet::new();
for IndexSwap { indexes: (lhs, rhs) } in swaps {
for name in [lhs, rhs] {
let is_new = all_indexes.insert(name);
if !is_new {
duplicate_indexes.insert(name);
}
}
}
if !duplicate_indexes.is_empty() {
if duplicate_indexes.len() == 1 {
return Err(Error::SwapDuplicateIndexFound(
duplicate_indexes.into_iter().next().unwrap().clone(),
));
} else {
return Err(Error::SwapDuplicateIndexesFound(
duplicate_indexes.into_iter().cloned().collect(),
));
}
}
Ok(())
}
/// Clamp the provided value to be a multiple of system page size.
pub fn clamp_to_page_size(size: usize) -> usize {
size / page_size::get() * page_size::get()
}
#[cfg(test)]
impl IndexScheduler {
/// Asserts that the index scheduler's content is internally consistent.
pub fn assert_internally_consistent(&self) {
let rtxn = self.env.read_txn().unwrap();
for task in self.all_tasks.iter(&rtxn).unwrap() {
let (task_id, task) = task.unwrap();
let task_index_uid = task.index_uid().map(ToOwned::to_owned);
let Task {
uid,
2024-11-13 11:27:12 +01:00
batch_uid,
enqueued_at,
started_at,
finished_at,
error: _,
canceled_by,
details,
status,
kind,
2022-10-25 15:35:06 +02:00
} = task;
assert_eq!(uid, task.uid);
if let Some(ref batch) = batch_uid {
assert!(self
.batch_to_tasks_mapping
.get(&rtxn, batch)
.unwrap()
.unwrap()
.contains(uid));
}
if let Some(task_index_uid) = &task_index_uid {
assert!(self
.index_tasks
.get(&rtxn, task_index_uid.as_str())
.unwrap()
.unwrap()
.contains(task.uid));
}
2023-11-23 12:07:35 +01:00
let db_enqueued_at =
self.enqueued_at.get(&rtxn, &enqueued_at.unix_timestamp_nanos()).unwrap().unwrap();
assert!(db_enqueued_at.contains(task_id));
if let Some(started_at) = started_at {
let db_started_at = self
.started_at
2023-11-23 12:07:35 +01:00
.get(&rtxn, &started_at.unix_timestamp_nanos())
.unwrap()
.unwrap();
assert!(db_started_at.contains(task_id));
}
if let Some(finished_at) = finished_at {
let db_finished_at = self
.finished_at
2023-11-23 12:07:35 +01:00
.get(&rtxn, &finished_at.unix_timestamp_nanos())
.unwrap()
.unwrap();
assert!(db_finished_at.contains(task_id));
}
if let Some(canceled_by) = canceled_by {
let db_canceled_tasks = self.get_status(&rtxn, Status::Canceled).unwrap();
assert!(db_canceled_tasks.contains(uid));
let db_canceling_task = self.get_task(&rtxn, canceled_by).unwrap().unwrap();
assert_eq!(db_canceling_task.status, Status::Succeeded);
match db_canceling_task.kind {
KindWithContent::TaskCancelation { query: _, tasks } => {
assert!(tasks.contains(uid));
}
_ => panic!(),
}
}
2022-10-26 18:54:15 +02:00
if let Some(details) = details {
match details {
2022-10-26 18:27:43 +02:00
Details::IndexSwap { swaps: sw1 } => {
if let KindWithContent::IndexSwap { swaps: sw2 } = &kind {
assert_eq!(&sw1, sw2);
}
2022-10-26 18:27:43 +02:00
}
Details::DocumentAdditionOrUpdate { received_documents, indexed_documents } => {
assert_eq!(kind.as_kind(), Kind::DocumentAdditionOrUpdate);
Bring back `release-v0.30.0` into `release-v0.30.0-temp` (final: into `main`) (#3145) * Fix error code of the "duplicate index found" error * Use the content of the ProcessingTasks in the tasks cancelation system * Change the missing_filters error code into missing_task_filters * WIP Introduce the invalid_task_uid error code * Use more precise error codes/message for the task routes + Allow star operator in delete/cancel tasks + rename originalQuery to originalFilters + Display error/canceled_by in task view even when they are = null + Rename task filter fields by using their plural forms + Prepare an error code for canceledBy filter + Only return global tasks if the API key action `index.*` is there * Add canceledBy task filter * Update tests following task API changes * Rename original_query to original_filters everywhere * Update more insta-snap tests * Make clippy happy They're a happy clip now. * Make rustfmt happy >:-( * Fix Index name parsing error message to fit the specification * Bump milli version to 0.35.1 * Fix the new error messages * fix the error messages and add tests * rename the error codes for the sake of consistency * refactor the way we send the cli informations + add the analytics for the config file and ssl usage * Apply suggestions from code review Co-authored-by: Clément Renault <clement@meilisearch.com> * add a comment over the new infos structure * reformat, sorry @kero * Store analytics for the documents deletions * Add analytics on all the settings * Spawn threads with names * Spawn rayon threads with names * update the distinct attributes to the spec update * update the analytics on the search route * implements the analytics on the health and version routes * Fix task details serialization * Add the question mark to the task deletion query filter * Add the question mark to the task cancelation query filter * Fix tests * add analytics on the task route * Add all the missing fields of the new task query type * Create a new analytics for the task deletion * Create a new analytics for the task creation * batch the tasks seen events * Update the finite pagination analytics * add the analytics of the swap-indexes route * Stop removing the DB when failing to read it * Rename originalFilters into originalFilters * Rename matchedDocuments into providedIds * Add `workflow_dispatch` to flaky.yml * Bump grenad to 0.4.4 * Bump milli to version v0.37.0 * Don't multiply total memory returned by sysinfo anymore sysinfo now returns bytes rather than KB * Add a dispatch to the publish binaries workflow * Fix publish release CI * Don't use gold but the default linker * Always display details for the indexDeletion task * Fix the insta tests * refactorize the whole test suite 1. Make a call to assert_internally_consistent automatically when snapshoting the scheduler. There is no point in snapshoting something broken and expect the dumb humans to notice. 2. Replace every possible call to assert_internally_consistent by a snapshot of the scheduler. It takes as many lines and ensure we never change something without noticing in any tests ever. 3. Name every snapshots: it's easier to debug when something goes wrong and easier to review in general. 4. Stop skipping breakpoints, it's too easy to miss something. Now you must explicitely show which path is the scheduler supposed to use. 5. Add a timeout on the channel.recv, it eases the process of writing tests, now when something file you get a failure instead of a deadlock. * rebase on release-v0.30 * makes clippy happy * update the snapshots after a rebase * try to remove the flakyness of the failing test * Add more analytics on the ranking rules positions * Update the dump test to check for the dumpUid dumpCreation task details * send the ranking rules as a string because amplitude is too dumb to process an array as a single value * Display a null dumpUid until we computed the dump itself on disk * Update tests * Check if the master key is missing before returning an error Co-authored-by: Loïc Lecrenier <loic.lecrenier@me.com> Co-authored-by: bors[bot] <26634292+bors[bot]@users.noreply.github.com> Co-authored-by: Kerollmops <clement@meilisearch.com> Co-authored-by: ManyTheFish <many@meilisearch.com> Co-authored-by: Tamo <tamo@meilisearch.com> Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2022-11-28 16:27:41 +01:00
match indexed_documents {
Some(indexed_documents) => {
assert!(matches!(
status,
Status::Succeeded | Status::Failed | Status::Canceled
));
match status {
Status::Succeeded => assert!(indexed_documents <= received_documents),
Status::Failed | Status::Canceled => assert_eq!(indexed_documents, 0),
status => panic!("DocumentAddition can't have an indexed_documents set if it's {}", status),
}
}
None => {
assert!(matches!(status, Status::Enqueued | Status::Processing))
}
}
}
Details::DocumentEdition { edited_documents, .. } => {
assert_eq!(kind.as_kind(), Kind::DocumentEdition);
match edited_documents {
Some(edited_documents) => {
assert!(matches!(
status,
Status::Succeeded | Status::Failed | Status::Canceled
));
match status {
Status::Succeeded => (),
Status::Failed | Status::Canceled => assert_eq!(edited_documents, 0),
status => panic!("DocumentEdition can't have an edited_documents set if it's {}", status),
}
Bring back `release-v0.30.0` into `release-v0.30.0-temp` (final: into `main`) (#3145) * Fix error code of the "duplicate index found" error * Use the content of the ProcessingTasks in the tasks cancelation system * Change the missing_filters error code into missing_task_filters * WIP Introduce the invalid_task_uid error code * Use more precise error codes/message for the task routes + Allow star operator in delete/cancel tasks + rename originalQuery to originalFilters + Display error/canceled_by in task view even when they are = null + Rename task filter fields by using their plural forms + Prepare an error code for canceledBy filter + Only return global tasks if the API key action `index.*` is there * Add canceledBy task filter * Update tests following task API changes * Rename original_query to original_filters everywhere * Update more insta-snap tests * Make clippy happy They're a happy clip now. * Make rustfmt happy >:-( * Fix Index name parsing error message to fit the specification * Bump milli version to 0.35.1 * Fix the new error messages * fix the error messages and add tests * rename the error codes for the sake of consistency * refactor the way we send the cli informations + add the analytics for the config file and ssl usage * Apply suggestions from code review Co-authored-by: Clément Renault <clement@meilisearch.com> * add a comment over the new infos structure * reformat, sorry @kero * Store analytics for the documents deletions * Add analytics on all the settings * Spawn threads with names * Spawn rayon threads with names * update the distinct attributes to the spec update * update the analytics on the search route * implements the analytics on the health and version routes * Fix task details serialization * Add the question mark to the task deletion query filter * Add the question mark to the task cancelation query filter * Fix tests * add analytics on the task route * Add all the missing fields of the new task query type * Create a new analytics for the task deletion * Create a new analytics for the task creation * batch the tasks seen events * Update the finite pagination analytics * add the analytics of the swap-indexes route * Stop removing the DB when failing to read it * Rename originalFilters into originalFilters * Rename matchedDocuments into providedIds * Add `workflow_dispatch` to flaky.yml * Bump grenad to 0.4.4 * Bump milli to version v0.37.0 * Don't multiply total memory returned by sysinfo anymore sysinfo now returns bytes rather than KB * Add a dispatch to the publish binaries workflow * Fix publish release CI * Don't use gold but the default linker * Always display details for the indexDeletion task * Fix the insta tests * refactorize the whole test suite 1. Make a call to assert_internally_consistent automatically when snapshoting the scheduler. There is no point in snapshoting something broken and expect the dumb humans to notice. 2. Replace every possible call to assert_internally_consistent by a snapshot of the scheduler. It takes as many lines and ensure we never change something without noticing in any tests ever. 3. Name every snapshots: it's easier to debug when something goes wrong and easier to review in general. 4. Stop skipping breakpoints, it's too easy to miss something. Now you must explicitely show which path is the scheduler supposed to use. 5. Add a timeout on the channel.recv, it eases the process of writing tests, now when something file you get a failure instead of a deadlock. * rebase on release-v0.30 * makes clippy happy * update the snapshots after a rebase * try to remove the flakyness of the failing test * Add more analytics on the ranking rules positions * Update the dump test to check for the dumpUid dumpCreation task details * send the ranking rules as a string because amplitude is too dumb to process an array as a single value * Display a null dumpUid until we computed the dump itself on disk * Update tests * Check if the master key is missing before returning an error Co-authored-by: Loïc Lecrenier <loic.lecrenier@me.com> Co-authored-by: bors[bot] <26634292+bors[bot]@users.noreply.github.com> Co-authored-by: Kerollmops <clement@meilisearch.com> Co-authored-by: ManyTheFish <many@meilisearch.com> Co-authored-by: Tamo <tamo@meilisearch.com> Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2022-11-28 16:27:41 +01:00
}
None => {
assert!(matches!(status, Status::Enqueued | Status::Processing))
Bring back `release-v0.30.0` into `release-v0.30.0-temp` (final: into `main`) (#3145) * Fix error code of the "duplicate index found" error * Use the content of the ProcessingTasks in the tasks cancelation system * Change the missing_filters error code into missing_task_filters * WIP Introduce the invalid_task_uid error code * Use more precise error codes/message for the task routes + Allow star operator in delete/cancel tasks + rename originalQuery to originalFilters + Display error/canceled_by in task view even when they are = null + Rename task filter fields by using their plural forms + Prepare an error code for canceledBy filter + Only return global tasks if the API key action `index.*` is there * Add canceledBy task filter * Update tests following task API changes * Rename original_query to original_filters everywhere * Update more insta-snap tests * Make clippy happy They're a happy clip now. * Make rustfmt happy >:-( * Fix Index name parsing error message to fit the specification * Bump milli version to 0.35.1 * Fix the new error messages * fix the error messages and add tests * rename the error codes for the sake of consistency * refactor the way we send the cli informations + add the analytics for the config file and ssl usage * Apply suggestions from code review Co-authored-by: Clément Renault <clement@meilisearch.com> * add a comment over the new infos structure * reformat, sorry @kero * Store analytics for the documents deletions * Add analytics on all the settings * Spawn threads with names * Spawn rayon threads with names * update the distinct attributes to the spec update * update the analytics on the search route * implements the analytics on the health and version routes * Fix task details serialization * Add the question mark to the task deletion query filter * Add the question mark to the task cancelation query filter * Fix tests * add analytics on the task route * Add all the missing fields of the new task query type * Create a new analytics for the task deletion * Create a new analytics for the task creation * batch the tasks seen events * Update the finite pagination analytics * add the analytics of the swap-indexes route * Stop removing the DB when failing to read it * Rename originalFilters into originalFilters * Rename matchedDocuments into providedIds * Add `workflow_dispatch` to flaky.yml * Bump grenad to 0.4.4 * Bump milli to version v0.37.0 * Don't multiply total memory returned by sysinfo anymore sysinfo now returns bytes rather than KB * Add a dispatch to the publish binaries workflow * Fix publish release CI * Don't use gold but the default linker * Always display details for the indexDeletion task * Fix the insta tests * refactorize the whole test suite 1. Make a call to assert_internally_consistent automatically when snapshoting the scheduler. There is no point in snapshoting something broken and expect the dumb humans to notice. 2. Replace every possible call to assert_internally_consistent by a snapshot of the scheduler. It takes as many lines and ensure we never change something without noticing in any tests ever. 3. Name every snapshots: it's easier to debug when something goes wrong and easier to review in general. 4. Stop skipping breakpoints, it's too easy to miss something. Now you must explicitely show which path is the scheduler supposed to use. 5. Add a timeout on the channel.recv, it eases the process of writing tests, now when something file you get a failure instead of a deadlock. * rebase on release-v0.30 * makes clippy happy * update the snapshots after a rebase * try to remove the flakyness of the failing test * Add more analytics on the ranking rules positions * Update the dump test to check for the dumpUid dumpCreation task details * send the ranking rules as a string because amplitude is too dumb to process an array as a single value * Display a null dumpUid until we computed the dump itself on disk * Update tests * Check if the master key is missing before returning an error Co-authored-by: Loïc Lecrenier <loic.lecrenier@me.com> Co-authored-by: bors[bot] <26634292+bors[bot]@users.noreply.github.com> Co-authored-by: Kerollmops <clement@meilisearch.com> Co-authored-by: ManyTheFish <many@meilisearch.com> Co-authored-by: Tamo <tamo@meilisearch.com> Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2022-11-28 16:27:41 +01:00
}
}
}
Details::SettingsUpdate { settings: _ } => {
assert_eq!(kind.as_kind(), Kind::SettingsUpdate);
}
Details::IndexInfo { primary_key: pk1 } => match &kind {
KindWithContent::IndexCreation { index_uid, primary_key: pk2 }
| KindWithContent::IndexUpdate { index_uid, primary_key: pk2 } => {
self.index_tasks
.get(&rtxn, index_uid.as_str())
.unwrap()
.unwrap()
.contains(uid);
assert_eq!(&pk1, pk2);
}
_ => panic!(),
},
Details::DocumentDeletion {
Bring back `release-v0.30.0` into `release-v0.30.0-temp` (final: into `main`) (#3145) * Fix error code of the "duplicate index found" error * Use the content of the ProcessingTasks in the tasks cancelation system * Change the missing_filters error code into missing_task_filters * WIP Introduce the invalid_task_uid error code * Use more precise error codes/message for the task routes + Allow star operator in delete/cancel tasks + rename originalQuery to originalFilters + Display error/canceled_by in task view even when they are = null + Rename task filter fields by using their plural forms + Prepare an error code for canceledBy filter + Only return global tasks if the API key action `index.*` is there * Add canceledBy task filter * Update tests following task API changes * Rename original_query to original_filters everywhere * Update more insta-snap tests * Make clippy happy They're a happy clip now. * Make rustfmt happy >:-( * Fix Index name parsing error message to fit the specification * Bump milli version to 0.35.1 * Fix the new error messages * fix the error messages and add tests * rename the error codes for the sake of consistency * refactor the way we send the cli informations + add the analytics for the config file and ssl usage * Apply suggestions from code review Co-authored-by: Clément Renault <clement@meilisearch.com> * add a comment over the new infos structure * reformat, sorry @kero * Store analytics for the documents deletions * Add analytics on all the settings * Spawn threads with names * Spawn rayon threads with names * update the distinct attributes to the spec update * update the analytics on the search route * implements the analytics on the health and version routes * Fix task details serialization * Add the question mark to the task deletion query filter * Add the question mark to the task cancelation query filter * Fix tests * add analytics on the task route * Add all the missing fields of the new task query type * Create a new analytics for the task deletion * Create a new analytics for the task creation * batch the tasks seen events * Update the finite pagination analytics * add the analytics of the swap-indexes route * Stop removing the DB when failing to read it * Rename originalFilters into originalFilters * Rename matchedDocuments into providedIds * Add `workflow_dispatch` to flaky.yml * Bump grenad to 0.4.4 * Bump milli to version v0.37.0 * Don't multiply total memory returned by sysinfo anymore sysinfo now returns bytes rather than KB * Add a dispatch to the publish binaries workflow * Fix publish release CI * Don't use gold but the default linker * Always display details for the indexDeletion task * Fix the insta tests * refactorize the whole test suite 1. Make a call to assert_internally_consistent automatically when snapshoting the scheduler. There is no point in snapshoting something broken and expect the dumb humans to notice. 2. Replace every possible call to assert_internally_consistent by a snapshot of the scheduler. It takes as many lines and ensure we never change something without noticing in any tests ever. 3. Name every snapshots: it's easier to debug when something goes wrong and easier to review in general. 4. Stop skipping breakpoints, it's too easy to miss something. Now you must explicitely show which path is the scheduler supposed to use. 5. Add a timeout on the channel.recv, it eases the process of writing tests, now when something file you get a failure instead of a deadlock. * rebase on release-v0.30 * makes clippy happy * update the snapshots after a rebase * try to remove the flakyness of the failing test * Add more analytics on the ranking rules positions * Update the dump test to check for the dumpUid dumpCreation task details * send the ranking rules as a string because amplitude is too dumb to process an array as a single value * Display a null dumpUid until we computed the dump itself on disk * Update tests * Check if the master key is missing before returning an error Co-authored-by: Loïc Lecrenier <loic.lecrenier@me.com> Co-authored-by: bors[bot] <26634292+bors[bot]@users.noreply.github.com> Co-authored-by: Kerollmops <clement@meilisearch.com> Co-authored-by: ManyTheFish <many@meilisearch.com> Co-authored-by: Tamo <tamo@meilisearch.com> Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2022-11-28 16:27:41 +01:00
provided_ids: received_document_ids,
deleted_documents,
} => {
assert_eq!(kind.as_kind(), Kind::DocumentDeletion);
2023-02-14 17:45:46 +01:00
let (index_uid, documents_ids) =
if let KindWithContent::DocumentDeletion {
ref index_uid,
ref documents_ids,
} = kind
{
(index_uid, documents_ids)
} else {
unreachable!()
};
assert_eq!(&task_index_uid.unwrap(), index_uid);
match status {
Status::Enqueued | Status::Processing => (),
Status::Succeeded => {
assert!(deleted_documents.unwrap() <= received_document_ids as u64);
assert!(documents_ids.len() == received_document_ids);
}
Status::Failed | Status::Canceled => {
assert!(deleted_documents == Some(0));
assert!(documents_ids.len() == received_document_ids);
}
}
}
Details::DocumentDeletionByFilter { deleted_documents, original_filter: _ } => {
assert_eq!(kind.as_kind(), Kind::DocumentDeletion);
let (index_uid, _) = if let KindWithContent::DocumentDeletionByFilter {
ref index_uid,
ref filter_expr,
} = kind
{
(index_uid, filter_expr)
} else {
unreachable!()
};
assert_eq!(&task_index_uid.unwrap(), index_uid);
match status {
Status::Enqueued | Status::Processing => (),
Status::Succeeded => {
assert!(deleted_documents.is_some());
}
Status::Failed | Status::Canceled => {
assert!(deleted_documents == Some(0));
}
}
}
Details::ClearAll { deleted_documents } => {
assert!(matches!(
kind.as_kind(),
Kind::DocumentDeletion | Kind::IndexDeletion
));
if deleted_documents.is_some() {
assert_eq!(status, Status::Succeeded);
} else {
assert_ne!(status, Status::Succeeded);
}
}
Bring back `release-v0.30.0` into `release-v0.30.0-temp` (final: into `main`) (#3145) * Fix error code of the "duplicate index found" error * Use the content of the ProcessingTasks in the tasks cancelation system * Change the missing_filters error code into missing_task_filters * WIP Introduce the invalid_task_uid error code * Use more precise error codes/message for the task routes + Allow star operator in delete/cancel tasks + rename originalQuery to originalFilters + Display error/canceled_by in task view even when they are = null + Rename task filter fields by using their plural forms + Prepare an error code for canceledBy filter + Only return global tasks if the API key action `index.*` is there * Add canceledBy task filter * Update tests following task API changes * Rename original_query to original_filters everywhere * Update more insta-snap tests * Make clippy happy They're a happy clip now. * Make rustfmt happy >:-( * Fix Index name parsing error message to fit the specification * Bump milli version to 0.35.1 * Fix the new error messages * fix the error messages and add tests * rename the error codes for the sake of consistency * refactor the way we send the cli informations + add the analytics for the config file and ssl usage * Apply suggestions from code review Co-authored-by: Clément Renault <clement@meilisearch.com> * add a comment over the new infos structure * reformat, sorry @kero * Store analytics for the documents deletions * Add analytics on all the settings * Spawn threads with names * Spawn rayon threads with names * update the distinct attributes to the spec update * update the analytics on the search route * implements the analytics on the health and version routes * Fix task details serialization * Add the question mark to the task deletion query filter * Add the question mark to the task cancelation query filter * Fix tests * add analytics on the task route * Add all the missing fields of the new task query type * Create a new analytics for the task deletion * Create a new analytics for the task creation * batch the tasks seen events * Update the finite pagination analytics * add the analytics of the swap-indexes route * Stop removing the DB when failing to read it * Rename originalFilters into originalFilters * Rename matchedDocuments into providedIds * Add `workflow_dispatch` to flaky.yml * Bump grenad to 0.4.4 * Bump milli to version v0.37.0 * Don't multiply total memory returned by sysinfo anymore sysinfo now returns bytes rather than KB * Add a dispatch to the publish binaries workflow * Fix publish release CI * Don't use gold but the default linker * Always display details for the indexDeletion task * Fix the insta tests * refactorize the whole test suite 1. Make a call to assert_internally_consistent automatically when snapshoting the scheduler. There is no point in snapshoting something broken and expect the dumb humans to notice. 2. Replace every possible call to assert_internally_consistent by a snapshot of the scheduler. It takes as many lines and ensure we never change something without noticing in any tests ever. 3. Name every snapshots: it's easier to debug when something goes wrong and easier to review in general. 4. Stop skipping breakpoints, it's too easy to miss something. Now you must explicitely show which path is the scheduler supposed to use. 5. Add a timeout on the channel.recv, it eases the process of writing tests, now when something file you get a failure instead of a deadlock. * rebase on release-v0.30 * makes clippy happy * update the snapshots after a rebase * try to remove the flakyness of the failing test * Add more analytics on the ranking rules positions * Update the dump test to check for the dumpUid dumpCreation task details * send the ranking rules as a string because amplitude is too dumb to process an array as a single value * Display a null dumpUid until we computed the dump itself on disk * Update tests * Check if the master key is missing before returning an error Co-authored-by: Loïc Lecrenier <loic.lecrenier@me.com> Co-authored-by: bors[bot] <26634292+bors[bot]@users.noreply.github.com> Co-authored-by: Kerollmops <clement@meilisearch.com> Co-authored-by: ManyTheFish <many@meilisearch.com> Co-authored-by: Tamo <tamo@meilisearch.com> Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2022-11-28 16:27:41 +01:00
Details::TaskCancelation { matched_tasks, canceled_tasks, original_filter } => {
if let Some(canceled_tasks) = canceled_tasks {
assert_eq!(status, Status::Succeeded);
assert!(canceled_tasks <= matched_tasks);
match &kind {
KindWithContent::TaskCancelation { query, tasks } => {
Bring back `release-v0.30.0` into `release-v0.30.0-temp` (final: into `main`) (#3145) * Fix error code of the "duplicate index found" error * Use the content of the ProcessingTasks in the tasks cancelation system * Change the missing_filters error code into missing_task_filters * WIP Introduce the invalid_task_uid error code * Use more precise error codes/message for the task routes + Allow star operator in delete/cancel tasks + rename originalQuery to originalFilters + Display error/canceled_by in task view even when they are = null + Rename task filter fields by using their plural forms + Prepare an error code for canceledBy filter + Only return global tasks if the API key action `index.*` is there * Add canceledBy task filter * Update tests following task API changes * Rename original_query to original_filters everywhere * Update more insta-snap tests * Make clippy happy They're a happy clip now. * Make rustfmt happy >:-( * Fix Index name parsing error message to fit the specification * Bump milli version to 0.35.1 * Fix the new error messages * fix the error messages and add tests * rename the error codes for the sake of consistency * refactor the way we send the cli informations + add the analytics for the config file and ssl usage * Apply suggestions from code review Co-authored-by: Clément Renault <clement@meilisearch.com> * add a comment over the new infos structure * reformat, sorry @kero * Store analytics for the documents deletions * Add analytics on all the settings * Spawn threads with names * Spawn rayon threads with names * update the distinct attributes to the spec update * update the analytics on the search route * implements the analytics on the health and version routes * Fix task details serialization * Add the question mark to the task deletion query filter * Add the question mark to the task cancelation query filter * Fix tests * add analytics on the task route * Add all the missing fields of the new task query type * Create a new analytics for the task deletion * Create a new analytics for the task creation * batch the tasks seen events * Update the finite pagination analytics * add the analytics of the swap-indexes route * Stop removing the DB when failing to read it * Rename originalFilters into originalFilters * Rename matchedDocuments into providedIds * Add `workflow_dispatch` to flaky.yml * Bump grenad to 0.4.4 * Bump milli to version v0.37.0 * Don't multiply total memory returned by sysinfo anymore sysinfo now returns bytes rather than KB * Add a dispatch to the publish binaries workflow * Fix publish release CI * Don't use gold but the default linker * Always display details for the indexDeletion task * Fix the insta tests * refactorize the whole test suite 1. Make a call to assert_internally_consistent automatically when snapshoting the scheduler. There is no point in snapshoting something broken and expect the dumb humans to notice. 2. Replace every possible call to assert_internally_consistent by a snapshot of the scheduler. It takes as many lines and ensure we never change something without noticing in any tests ever. 3. Name every snapshots: it's easier to debug when something goes wrong and easier to review in general. 4. Stop skipping breakpoints, it's too easy to miss something. Now you must explicitely show which path is the scheduler supposed to use. 5. Add a timeout on the channel.recv, it eases the process of writing tests, now when something file you get a failure instead of a deadlock. * rebase on release-v0.30 * makes clippy happy * update the snapshots after a rebase * try to remove the flakyness of the failing test * Add more analytics on the ranking rules positions * Update the dump test to check for the dumpUid dumpCreation task details * send the ranking rules as a string because amplitude is too dumb to process an array as a single value * Display a null dumpUid until we computed the dump itself on disk * Update tests * Check if the master key is missing before returning an error Co-authored-by: Loïc Lecrenier <loic.lecrenier@me.com> Co-authored-by: bors[bot] <26634292+bors[bot]@users.noreply.github.com> Co-authored-by: Kerollmops <clement@meilisearch.com> Co-authored-by: ManyTheFish <many@meilisearch.com> Co-authored-by: Tamo <tamo@meilisearch.com> Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2022-11-28 16:27:41 +01:00
assert_eq!(query, &original_filter);
assert_eq!(tasks.len(), matched_tasks);
}
_ => panic!(),
}
} else {
assert_ne!(status, Status::Succeeded);
}
}
Bring back `release-v0.30.0` into `release-v0.30.0-temp` (final: into `main`) (#3145) * Fix error code of the "duplicate index found" error * Use the content of the ProcessingTasks in the tasks cancelation system * Change the missing_filters error code into missing_task_filters * WIP Introduce the invalid_task_uid error code * Use more precise error codes/message for the task routes + Allow star operator in delete/cancel tasks + rename originalQuery to originalFilters + Display error/canceled_by in task view even when they are = null + Rename task filter fields by using their plural forms + Prepare an error code for canceledBy filter + Only return global tasks if the API key action `index.*` is there * Add canceledBy task filter * Update tests following task API changes * Rename original_query to original_filters everywhere * Update more insta-snap tests * Make clippy happy They're a happy clip now. * Make rustfmt happy >:-( * Fix Index name parsing error message to fit the specification * Bump milli version to 0.35.1 * Fix the new error messages * fix the error messages and add tests * rename the error codes for the sake of consistency * refactor the way we send the cli informations + add the analytics for the config file and ssl usage * Apply suggestions from code review Co-authored-by: Clément Renault <clement@meilisearch.com> * add a comment over the new infos structure * reformat, sorry @kero * Store analytics for the documents deletions * Add analytics on all the settings * Spawn threads with names * Spawn rayon threads with names * update the distinct attributes to the spec update * update the analytics on the search route * implements the analytics on the health and version routes * Fix task details serialization * Add the question mark to the task deletion query filter * Add the question mark to the task cancelation query filter * Fix tests * add analytics on the task route * Add all the missing fields of the new task query type * Create a new analytics for the task deletion * Create a new analytics for the task creation * batch the tasks seen events * Update the finite pagination analytics * add the analytics of the swap-indexes route * Stop removing the DB when failing to read it * Rename originalFilters into originalFilters * Rename matchedDocuments into providedIds * Add `workflow_dispatch` to flaky.yml * Bump grenad to 0.4.4 * Bump milli to version v0.37.0 * Don't multiply total memory returned by sysinfo anymore sysinfo now returns bytes rather than KB * Add a dispatch to the publish binaries workflow * Fix publish release CI * Don't use gold but the default linker * Always display details for the indexDeletion task * Fix the insta tests * refactorize the whole test suite 1. Make a call to assert_internally_consistent automatically when snapshoting the scheduler. There is no point in snapshoting something broken and expect the dumb humans to notice. 2. Replace every possible call to assert_internally_consistent by a snapshot of the scheduler. It takes as many lines and ensure we never change something without noticing in any tests ever. 3. Name every snapshots: it's easier to debug when something goes wrong and easier to review in general. 4. Stop skipping breakpoints, it's too easy to miss something. Now you must explicitely show which path is the scheduler supposed to use. 5. Add a timeout on the channel.recv, it eases the process of writing tests, now when something file you get a failure instead of a deadlock. * rebase on release-v0.30 * makes clippy happy * update the snapshots after a rebase * try to remove the flakyness of the failing test * Add more analytics on the ranking rules positions * Update the dump test to check for the dumpUid dumpCreation task details * send the ranking rules as a string because amplitude is too dumb to process an array as a single value * Display a null dumpUid until we computed the dump itself on disk * Update tests * Check if the master key is missing before returning an error Co-authored-by: Loïc Lecrenier <loic.lecrenier@me.com> Co-authored-by: bors[bot] <26634292+bors[bot]@users.noreply.github.com> Co-authored-by: Kerollmops <clement@meilisearch.com> Co-authored-by: ManyTheFish <many@meilisearch.com> Co-authored-by: Tamo <tamo@meilisearch.com> Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2022-11-28 16:27:41 +01:00
Details::TaskDeletion { matched_tasks, deleted_tasks, original_filter } => {
if let Some(deleted_tasks) = deleted_tasks {
assert_eq!(status, Status::Succeeded);
assert!(deleted_tasks <= matched_tasks);
match &kind {
KindWithContent::TaskDeletion { query, tasks } => {
Bring back `release-v0.30.0` into `release-v0.30.0-temp` (final: into `main`) (#3145) * Fix error code of the "duplicate index found" error * Use the content of the ProcessingTasks in the tasks cancelation system * Change the missing_filters error code into missing_task_filters * WIP Introduce the invalid_task_uid error code * Use more precise error codes/message for the task routes + Allow star operator in delete/cancel tasks + rename originalQuery to originalFilters + Display error/canceled_by in task view even when they are = null + Rename task filter fields by using their plural forms + Prepare an error code for canceledBy filter + Only return global tasks if the API key action `index.*` is there * Add canceledBy task filter * Update tests following task API changes * Rename original_query to original_filters everywhere * Update more insta-snap tests * Make clippy happy They're a happy clip now. * Make rustfmt happy >:-( * Fix Index name parsing error message to fit the specification * Bump milli version to 0.35.1 * Fix the new error messages * fix the error messages and add tests * rename the error codes for the sake of consistency * refactor the way we send the cli informations + add the analytics for the config file and ssl usage * Apply suggestions from code review Co-authored-by: Clément Renault <clement@meilisearch.com> * add a comment over the new infos structure * reformat, sorry @kero * Store analytics for the documents deletions * Add analytics on all the settings * Spawn threads with names * Spawn rayon threads with names * update the distinct attributes to the spec update * update the analytics on the search route * implements the analytics on the health and version routes * Fix task details serialization * Add the question mark to the task deletion query filter * Add the question mark to the task cancelation query filter * Fix tests * add analytics on the task route * Add all the missing fields of the new task query type * Create a new analytics for the task deletion * Create a new analytics for the task creation * batch the tasks seen events * Update the finite pagination analytics * add the analytics of the swap-indexes route * Stop removing the DB when failing to read it * Rename originalFilters into originalFilters * Rename matchedDocuments into providedIds * Add `workflow_dispatch` to flaky.yml * Bump grenad to 0.4.4 * Bump milli to version v0.37.0 * Don't multiply total memory returned by sysinfo anymore sysinfo now returns bytes rather than KB * Add a dispatch to the publish binaries workflow * Fix publish release CI * Don't use gold but the default linker * Always display details for the indexDeletion task * Fix the insta tests * refactorize the whole test suite 1. Make a call to assert_internally_consistent automatically when snapshoting the scheduler. There is no point in snapshoting something broken and expect the dumb humans to notice. 2. Replace every possible call to assert_internally_consistent by a snapshot of the scheduler. It takes as many lines and ensure we never change something without noticing in any tests ever. 3. Name every snapshots: it's easier to debug when something goes wrong and easier to review in general. 4. Stop skipping breakpoints, it's too easy to miss something. Now you must explicitely show which path is the scheduler supposed to use. 5. Add a timeout on the channel.recv, it eases the process of writing tests, now when something file you get a failure instead of a deadlock. * rebase on release-v0.30 * makes clippy happy * update the snapshots after a rebase * try to remove the flakyness of the failing test * Add more analytics on the ranking rules positions * Update the dump test to check for the dumpUid dumpCreation task details * send the ranking rules as a string because amplitude is too dumb to process an array as a single value * Display a null dumpUid until we computed the dump itself on disk * Update tests * Check if the master key is missing before returning an error Co-authored-by: Loïc Lecrenier <loic.lecrenier@me.com> Co-authored-by: bors[bot] <26634292+bors[bot]@users.noreply.github.com> Co-authored-by: Kerollmops <clement@meilisearch.com> Co-authored-by: ManyTheFish <many@meilisearch.com> Co-authored-by: Tamo <tamo@meilisearch.com> Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2022-11-28 16:27:41 +01:00
assert_eq!(query, &original_filter);
assert_eq!(tasks.len(), matched_tasks);
}
_ => panic!(),
}
} else {
assert_ne!(status, Status::Succeeded);
}
}
Bring back `release-v0.30.0` into `release-v0.30.0-temp` (final: into `main`) (#3145) * Fix error code of the "duplicate index found" error * Use the content of the ProcessingTasks in the tasks cancelation system * Change the missing_filters error code into missing_task_filters * WIP Introduce the invalid_task_uid error code * Use more precise error codes/message for the task routes + Allow star operator in delete/cancel tasks + rename originalQuery to originalFilters + Display error/canceled_by in task view even when they are = null + Rename task filter fields by using their plural forms + Prepare an error code for canceledBy filter + Only return global tasks if the API key action `index.*` is there * Add canceledBy task filter * Update tests following task API changes * Rename original_query to original_filters everywhere * Update more insta-snap tests * Make clippy happy They're a happy clip now. * Make rustfmt happy >:-( * Fix Index name parsing error message to fit the specification * Bump milli version to 0.35.1 * Fix the new error messages * fix the error messages and add tests * rename the error codes for the sake of consistency * refactor the way we send the cli informations + add the analytics for the config file and ssl usage * Apply suggestions from code review Co-authored-by: Clément Renault <clement@meilisearch.com> * add a comment over the new infos structure * reformat, sorry @kero * Store analytics for the documents deletions * Add analytics on all the settings * Spawn threads with names * Spawn rayon threads with names * update the distinct attributes to the spec update * update the analytics on the search route * implements the analytics on the health and version routes * Fix task details serialization * Add the question mark to the task deletion query filter * Add the question mark to the task cancelation query filter * Fix tests * add analytics on the task route * Add all the missing fields of the new task query type * Create a new analytics for the task deletion * Create a new analytics for the task creation * batch the tasks seen events * Update the finite pagination analytics * add the analytics of the swap-indexes route * Stop removing the DB when failing to read it * Rename originalFilters into originalFilters * Rename matchedDocuments into providedIds * Add `workflow_dispatch` to flaky.yml * Bump grenad to 0.4.4 * Bump milli to version v0.37.0 * Don't multiply total memory returned by sysinfo anymore sysinfo now returns bytes rather than KB * Add a dispatch to the publish binaries workflow * Fix publish release CI * Don't use gold but the default linker * Always display details for the indexDeletion task * Fix the insta tests * refactorize the whole test suite 1. Make a call to assert_internally_consistent automatically when snapshoting the scheduler. There is no point in snapshoting something broken and expect the dumb humans to notice. 2. Replace every possible call to assert_internally_consistent by a snapshot of the scheduler. It takes as many lines and ensure we never change something without noticing in any tests ever. 3. Name every snapshots: it's easier to debug when something goes wrong and easier to review in general. 4. Stop skipping breakpoints, it's too easy to miss something. Now you must explicitely show which path is the scheduler supposed to use. 5. Add a timeout on the channel.recv, it eases the process of writing tests, now when something file you get a failure instead of a deadlock. * rebase on release-v0.30 * makes clippy happy * update the snapshots after a rebase * try to remove the flakyness of the failing test * Add more analytics on the ranking rules positions * Update the dump test to check for the dumpUid dumpCreation task details * send the ranking rules as a string because amplitude is too dumb to process an array as a single value * Display a null dumpUid until we computed the dump itself on disk * Update tests * Check if the master key is missing before returning an error Co-authored-by: Loïc Lecrenier <loic.lecrenier@me.com> Co-authored-by: bors[bot] <26634292+bors[bot]@users.noreply.github.com> Co-authored-by: Kerollmops <clement@meilisearch.com> Co-authored-by: ManyTheFish <many@meilisearch.com> Co-authored-by: Tamo <tamo@meilisearch.com> Co-authored-by: Louis Dureuil <louis@meilisearch.com>
2022-11-28 16:27:41 +01:00
Details::Dump { dump_uid: _ } => {
assert_eq!(kind.as_kind(), Kind::DumpCreation);
}
2022-10-26 18:54:15 +02:00
}
}
assert!(self.get_status(&rtxn, status).unwrap().contains(uid));
assert!(self.get_kind(&rtxn, kind.as_kind()).unwrap().contains(uid));
2022-10-26 18:27:43 +02:00
if let KindWithContent::DocumentAdditionOrUpdate { content_file, .. } = kind {
match status {
Status::Enqueued | Status::Processing => {
assert!(self
2023-01-24 18:34:36 +01:00
.file_store
.all_uuids()
.unwrap()
.any(|uuid| uuid.as_ref().unwrap() == &content_file),
"Could not find uuid `{content_file}` in the file_store. Available uuids are {:?}.",
2023-01-25 11:20:15 +01:00
self.file_store.all_uuids().unwrap().collect::<std::result::Result<Vec<_>, file_store::Error>>().unwrap(),
);
}
Status::Succeeded | Status::Failed | Status::Canceled => {
assert!(self
.file_store
.all_uuids()
.unwrap()
2023-01-24 18:34:36 +01:00
.all(|uuid| uuid.as_ref().unwrap() != &content_file));
}
2022-10-26 18:27:43 +02:00
}
}
}
}
}
2023-02-15 12:29:36 +01:00
pub fn dichotomic_search(start_point: usize, mut is_good: impl FnMut(usize) -> bool) -> usize {
let mut biggest_good = None;
let mut smallest_bad = None;
let mut current = start_point;
loop {
let is_good = is_good(current);
(biggest_good, smallest_bad, current) = match (biggest_good, smallest_bad, is_good) {
(None, None, false) => (None, Some(current), current / 2),
(None, None, true) => (Some(current), None, current * 2),
(None, Some(smallest_bad), true) => {
(Some(current), Some(smallest_bad), (current + smallest_bad) / 2)
}
(None, Some(_), false) => (None, Some(current), current / 2),
(Some(_), None, true) => (Some(current), None, current * 2),
(Some(biggest_good), None, false) => {
(Some(biggest_good), Some(current), (biggest_good + current) / 2)
}
(Some(_), Some(smallest_bad), true) => {
(Some(current), Some(smallest_bad), (smallest_bad + current) / 2)
}
(Some(biggest_good), Some(_), false) => {
(Some(biggest_good), Some(current), (biggest_good + current) / 2)
}
};
if current == 0 {
return current;
}
if smallest_bad.is_some() && biggest_good.is_some() && biggest_good >= Some(current) {
return current;
}
}
}