Finish first draft of the DELETE /tasks route

This commit is contained in:
Loïc Lecrenier 2022-10-13 12:48:23 +02:00 committed by Clément Renault
parent ef3e9e87f5
commit 012c3e986c
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
5 changed files with 81 additions and 53 deletions

View file

@ -20,6 +20,7 @@ use meilisearch_types::{
use roaring::RoaringBitmap;
use uuid::Uuid;
#[derive(Debug)]
pub(crate) enum Batch {
Cancel(Task),
TaskDeletion(Task),
@ -42,6 +43,7 @@ pub(crate) enum Batch {
},
}
#[derive(Debug)]
pub(crate) enum IndexOperation {
DocumentImport {
index_uid: String,
@ -381,7 +383,7 @@ impl IndexScheduler {
}
// 2. we get the next task to delete
let to_delete = self.get_kind(rtxn, Kind::TaskDeletion)?;
let to_delete = self.get_kind(rtxn, Kind::TaskDeletion)? & enqueued;
if let Some(task_id) = to_delete.min() {
let task = self
.get_task(rtxn, task_id)?

View file

@ -30,13 +30,10 @@ use meilisearch_types::milli::{Index, RoaringBitmapCodec, BEU32};
use crate::index_mapper::IndexMapper;
const DEFAULT_LIMIT: fn() -> u32 = || 20;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct Query {
#[serde(default = "DEFAULT_LIMIT")]
pub limit: u32,
pub limit: Option<u32>,
pub from: Option<u32>,
pub status: Option<Vec<Status>>,
#[serde(rename = "type")]
@ -48,7 +45,7 @@ pub struct Query {
impl Default for Query {
fn default() -> Self {
Self {
limit: DEFAULT_LIMIT(),
limit: None,
from: None,
status: None,
kind: None,
@ -96,7 +93,10 @@ impl Query {
}
pub fn with_limit(self, limit: u32) -> Self {
Self { limit, ..self }
Self {
limit: Some(limit),
..self
}
}
}
@ -245,13 +245,20 @@ impl IndexScheduler {
/// Return the task ids corresponding to the query
pub fn get_task_ids(&self, query: &Query) -> Result<RoaringBitmap> {
let rtxn = self.env.read_txn()?;
let last_task_id = match self.last_task_id(&rtxn)? {
Some(tid) => query.from.map(|from| from.min(tid)).unwrap_or(tid),
None => return Ok(RoaringBitmap::new()),
};
// This is the list of all the tasks.
let mut tasks = RoaringBitmap::from_sorted_iter(0..last_task_id).unwrap();
let mut tasks = {
let mut all_tasks = RoaringBitmap::new();
for status in [
Status::Enqueued,
Status::Processing,
Status::Succeeded,
Status::Failed,
] {
all_tasks |= self.get_status(&rtxn, status)?;
}
all_tasks
};
if let Some(uids) = &query.uid {
tasks &= RoaringBitmap::from_iter(uids);
@ -289,8 +296,14 @@ impl IndexScheduler {
let rtxn = self.env.read_txn()?;
let tasks = self.get_task_ids(&query)?;
let tasks =
self.get_existing_tasks(&rtxn, tasks.into_iter().rev().take(query.limit as usize))?;
let tasks = self.get_existing_tasks(
&rtxn,
tasks
.into_iter()
.rev()
.take(query.limit.unwrap_or(u32::MAX) as usize),
)?;
let (started_at, processing) = self
.processing_tasks
.read()