Implement TaskDeletion in the index scheduler

This commit is contained in:
Loïc Lecrenier 2022-10-06 16:53:21 +02:00 committed by Clément Renault
parent ee352b6c7c
commit 05753c663f
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
4 changed files with 248 additions and 14 deletions

View file

@ -19,7 +19,7 @@ use std::sync::{Arc, RwLock};
use file_store::{File, FileStore};
use meilisearch_types::error::ResponseError;
use roaring::RoaringBitmap;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use synchronoise::SignalEvent;
use time::OffsetDateTime;
use uuid::Uuid;
@ -34,7 +34,7 @@ use crate::task::Task;
const DEFAULT_LIMIT: fn() -> u32 = || 20;
#[derive(derive_builder::Builder, Debug, Clone, Deserialize)]
#[derive(derive_builder::Builder, Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct Query {
#[serde(default = "DEFAULT_LIMIT")]
@ -319,7 +319,7 @@ impl IndexScheduler {
started_at: None,
finished_at: None,
error: None,
details: None,
details: task.default_details(),
status: Status::Enqueued,
kind: task,
};
@ -449,6 +449,8 @@ impl IndexScheduler {
#[cfg(test)]
mod tests {
use std::os::unix::process;
use big_s::S;
use insta::*;
use milli::update::IndexDocumentsMethod::ReplaceDocuments;
@ -688,6 +690,100 @@ mod tests {
assert_eq!(tasks[3].status, Status::Succeeded);
}
#[test]
fn task_deletion() {
let (index_scheduler, handle) = IndexScheduler::test();
let to_enqueue = [
KindWithContent::IndexCreation {
index_uid: S("catto"),
primary_key: Some(S("mouse")),
},
KindWithContent::DocumentImport {
index_uid: S("catto"),
primary_key: None,
method: ReplaceDocuments,
content_file: Uuid::new_v4(),
documents_count: 12,
allow_index_creation: true,
},
KindWithContent::DocumentImport {
index_uid: S("doggo"),
primary_key: Some(S("bone")),
method: ReplaceDocuments,
content_file: Uuid::new_v4(),
documents_count: 5000,
allow_index_creation: true,
},
];
for task in to_enqueue {
let _ = index_scheduler.register(task).unwrap();
}
let rtxn = index_scheduler.env.read_txn().unwrap();
let mut all_tasks = Vec::new();
for ret in index_scheduler.all_tasks.iter(&rtxn).unwrap() {
all_tasks.push(ret.unwrap().0);
}
rtxn.commit().unwrap();
assert_smol_debug_snapshot!(all_tasks, @"[U32(0), U32(1), U32(2)]");
index_scheduler.register(KindWithContent::DeleteTasks {
query: "test_query".to_owned(),
tasks: vec![0, 1],
});
let rtxn = index_scheduler.env.read_txn().unwrap();
let task = index_scheduler
.all_tasks
.get(&rtxn, &BEU32::new(3))
.unwrap()
.unwrap();
rtxn.commit().unwrap();
println!("TASK IN DB: {task:?}");
let rtxn = index_scheduler.env.read_txn().unwrap();
let mut all_tasks = Vec::new();
for ret in index_scheduler.all_tasks.iter(&rtxn).unwrap() {
all_tasks.push(ret.unwrap().0);
}
rtxn.commit().unwrap();
assert_smol_debug_snapshot!(all_tasks, @"[U32(0), U32(1), U32(2), U32(3)]");
handle.wait_till(Breakpoint::BatchCreated);
// the last task, with uid = 3, should be marked as processing
let processing_tasks = &index_scheduler.processing_tasks.read().unwrap().1;
assert_smol_debug_snapshot!(processing_tasks, @"RoaringBitmap<[3]>");
let rtxn = index_scheduler.env.read_txn().unwrap();
let task = index_scheduler
.all_tasks
.get(&rtxn, &BEU32::new(3))
.unwrap()
.unwrap();
rtxn.commit().unwrap();
println!("TASK IN DB: {task:?}");
// handle.wait_till(Breakpoint::AfterProcessing);
// let processing_tasks = &index_scheduler.processing_tasks.read().unwrap().1;
// assert_smol_debug_snapshot!(processing_tasks, @"RoaringBitmap<[]>");
// let rtxn = index_scheduler.env.read_txn().unwrap();
// let mut all_tasks = Vec::new();
// for ret in index_scheduler.all_tasks.iter(&rtxn).unwrap() {
// all_tasks.push(ret.unwrap().0);
// }
// rtxn.commit().unwrap();
// assert_smol_debug_snapshot!(all_tasks, @"[U32(0), U32(1), U32(2), U32(3)]");
// index_scheduler.register(KindWithContent::DocumentClear { index_uid: 0 });
// index_scheduler.register(KindWithContent::CancelTask { tasks: vec![0] });
// index_scheduler.register(KindWithContendt::DeleteTasks { tasks: vec![0] });
}
#[test]
fn document_addition() {
let (index_scheduler, handle) = IndexScheduler::test(true);