Continue implementation of task deletion

1. Matched tasks are a roaring bitmap
2. Start implementation in meilisearch-http
3. Snapshots use meili-snap
4. Rename to TaskDeletion
This commit is contained in:
Loïc Lecrenier 2022-10-13 11:09:00 +02:00 committed by Clément Renault
parent 7d4527728e
commit ef3e9e87f5
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
27 changed files with 316 additions and 366 deletions

View file

@ -82,6 +82,7 @@ tokio-stream = "0.1.10"
toml = "0.5.9"
uuid = { version = "1.1.2", features = ["serde", "v4"] }
walkdir = "2.3.2"
yaup = "0.2.0"
prometheus = { version = "0.13.2", features = ["process"], optional = true }
lazy_static = "1.4.0"

View file

@ -1,11 +1,12 @@
use actix_web::web::Data;
use actix_web::{web, HttpRequest, HttpResponse};
use index_scheduler::{IndexScheduler, TaskId};
use env_logger::filter;
use index_scheduler::{IndexScheduler, Query, TaskId};
use meilisearch_types::error::ResponseError;
use meilisearch_types::index_uid::IndexUid;
use meilisearch_types::settings::{Settings, Unchecked};
use meilisearch_types::star_or::StarOr;
use meilisearch_types::tasks::{serialize_duration, Details, Kind, Status, Task};
use meilisearch_types::tasks::{serialize_duration, Details, Kind, KindWithContent, Status, Task};
use serde::{Deserialize, Serialize};
use serde_cs::vec::CS;
use serde_json::json;
@ -21,7 +22,8 @@ const DEFAULT_LIMIT: fn() -> u32 = || 20;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::get().to(SeqHandler(get_tasks))))
.service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task))));
.service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task))))
.service(web::resource("").route(web::delete().to(SeqHandler(delete_tasks))));
}
#[derive(Debug, Clone, PartialEq, Serialize)]
@ -140,7 +142,7 @@ impl From<Details> for DetailsView {
deleted_documents: Some(deleted_documents),
..DetailsView::default()
},
Details::DeleteTasks {
Details::TaskDeletion {
matched_tasks,
deleted_tasks,
original_query,
@ -195,6 +197,29 @@ fn task_status_matches_events(status: &TaskStatus, events: &[TaskEvent]) -> bool
})
}
async fn delete_tasks(
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_DELETE }>, Data<IndexScheduler>>,
params: web::Query<Query>,
_req: HttpRequest,
_analytics: web::Data<dyn Analytics>,
) -> Result<HttpResponse, ResponseError> {
let query = params.into_inner();
let filtered_query = filter_out_inaccessible_indexes_from_query(&index_scheduler, &query);
let tasks = index_scheduler.get_task_ids(&filtered_query)?;
let filtered_query_string = yaup::to_string(&filtered_query).unwrap();
let task_deletion = KindWithContent::TaskDeletion {
query: filtered_query_string,
tasks,
};
// TODO: Lo: analytics
let task = index_scheduler.register(task_deletion)?;
let task_view: TaskView = task.into();
Ok(HttpResponse::Ok().json(task_view))
}
async fn get_tasks(
index_scheduler: GuardedData<ActionPolicy<{ actions::TASKS_GET }>, Data<IndexScheduler>>,
params: web::Query<TasksFilterQuery>,
@ -318,3 +343,38 @@ async fn get_task(
Err(index_scheduler::Error::TaskNotFound(task_id).into())
}
}
fn filter_out_inaccessible_indexes_from_query<const ACTION: u8>(
index_scheduler: &GuardedData<ActionPolicy<ACTION>, Data<IndexScheduler>>,
query: &Query,
) -> Query {
let mut query = query.clone();
// First remove all indexes from the query, we will add them back later
let indexes = query.index_uid.take();
let search_rules = &index_scheduler.filters().search_rules;
let mut query = index_scheduler::Query::default();
// We filter on potential indexes and make sure that the search filter
// restrictions are also applied.
match indexes {
Some(indexes) => {
for name in indexes.iter() {
if search_rules.is_index_authorized(&name) {
query = query.with_index(name.to_string());
}
}
}
None => {
if !search_rules.is_index_authorized("*") {
for (index, _policy) in search_rules.clone() {
query = query.with_index(index.to_string());
}
}
}
};
query
}