mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-04 20:37:15 +02:00
Continue implementation of task deletion
1. Matched tasks are a roaring bitmap 2. Start implementation in meilisearch-http 3. Snapshots use meili-snap 4. Rename to TaskDeletion
This commit is contained in:
parent
e4d461ecba
commit
9522b75454
27 changed files with 290 additions and 456 deletions
|
@ -242,44 +242,52 @@ impl IndexScheduler {
|
|||
self.index_mapper.indexes(&rtxn)
|
||||
}
|
||||
|
||||
/// Returns the tasks corresponding to the query.
|
||||
pub fn get_tasks(&self, query: Query) -> Result<Vec<Task>> {
|
||||
/// Return the task ids corresponding to the query
|
||||
pub fn get_task_ids(&self, query: &Query) -> Result<RoaringBitmap> {
|
||||
let rtxn = self.env.read_txn()?;
|
||||
let last_task_id = match self.last_task_id(&rtxn)? {
|
||||
Some(tid) => query.from.map(|from| from.min(tid)).unwrap_or(tid),
|
||||
None => return Ok(Vec::new()),
|
||||
None => return Ok(RoaringBitmap::new()),
|
||||
};
|
||||
|
||||
// This is the list of all the tasks.
|
||||
let mut tasks = RoaringBitmap::from_sorted_iter(0..last_task_id).unwrap();
|
||||
|
||||
if let Some(uids) = query.uid {
|
||||
if let Some(uids) = &query.uid {
|
||||
tasks &= RoaringBitmap::from_iter(uids);
|
||||
}
|
||||
|
||||
if let Some(status) = query.status {
|
||||
if let Some(status) = &query.status {
|
||||
let mut status_tasks = RoaringBitmap::new();
|
||||
for status in status {
|
||||
status_tasks |= self.get_status(&rtxn, status)?;
|
||||
status_tasks |= self.get_status(&rtxn, *status)?;
|
||||
}
|
||||
tasks &= status_tasks;
|
||||
}
|
||||
|
||||
if let Some(kind) = query.kind {
|
||||
if let Some(kind) = &query.kind {
|
||||
let mut kind_tasks = RoaringBitmap::new();
|
||||
for kind in kind {
|
||||
kind_tasks |= self.get_kind(&rtxn, kind)?;
|
||||
kind_tasks |= self.get_kind(&rtxn, *kind)?;
|
||||
}
|
||||
tasks &= kind_tasks;
|
||||
}
|
||||
|
||||
if let Some(index) = query.index_uid {
|
||||
if let Some(index) = &query.index_uid {
|
||||
let mut index_tasks = RoaringBitmap::new();
|
||||
for index in index {
|
||||
index_tasks |= self.index_tasks(&rtxn, &index)?;
|
||||
}
|
||||
tasks &= index_tasks;
|
||||
}
|
||||
rtxn.commit().unwrap();
|
||||
Ok(tasks)
|
||||
}
|
||||
|
||||
/// Returns the tasks corresponding to the query.
|
||||
pub fn get_tasks(&self, query: Query) -> Result<Vec<Task>> {
|
||||
let rtxn = self.env.read_txn()?;
|
||||
let tasks = self.get_task_ids(&query)?;
|
||||
|
||||
let tasks =
|
||||
self.get_existing_tasks(&rtxn, tasks.into_iter().rev().take(query.limit as usize))?;
|
||||
|
@ -442,7 +450,7 @@ impl IndexScheduler {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use big_s::S;
|
||||
use insta::*;
|
||||
use meili_snap::snapshot;
|
||||
use meilisearch_types::milli::update::IndexDocumentsMethod::ReplaceDocuments;
|
||||
use tempfile::TempDir;
|
||||
use uuid::Uuid;
|
||||
|
@ -580,7 +588,7 @@ mod tests {
|
|||
assert_eq!(task.kind.as_kind(), k);
|
||||
}
|
||||
|
||||
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -597,7 +605,7 @@ mod tests {
|
|||
})
|
||||
.unwrap();
|
||||
|
||||
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||
}
|
||||
|
||||
/// We send a lot of tasks but notify the tasks scheduler only once as
|
||||
|
@ -692,23 +700,23 @@ mod tests {
|
|||
|
||||
// here we have registered all the tasks, but the index scheduler
|
||||
// has not progressed at all
|
||||
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_enqueued");
|
||||
|
||||
index_scheduler
|
||||
.register(KindWithContent::DeleteTasks {
|
||||
.register(KindWithContent::TaskDeletion {
|
||||
query: "test_query".to_owned(),
|
||||
tasks: vec![0, 1],
|
||||
tasks: RoaringBitmap::from_iter(&[0, 1]),
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
// again, no progress made at all, but one more task is registered
|
||||
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_enqueued");
|
||||
|
||||
// now we create the first batch
|
||||
handle.wait_till(Breakpoint::BatchCreated);
|
||||
|
||||
// the task deletion should now be "processing"
|
||||
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_processing");
|
||||
|
||||
handle.wait_till(Breakpoint::AfterProcessing);
|
||||
|
||||
|
@ -716,7 +724,7 @@ mod tests {
|
|||
// because the tasks with ids 0 and 1 were still "enqueued", and thus undeleteable
|
||||
// the "task deletion" task should be marked as "succeeded" and, in its details, the
|
||||
// number of deleted tasks should be 0
|
||||
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_done");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -737,25 +745,25 @@ mod tests {
|
|||
file0.persist().unwrap();
|
||||
file1.persist().unwrap();
|
||||
|
||||
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_enqueued");
|
||||
|
||||
handle.wait_till(Breakpoint::AfterProcessing);
|
||||
// first addition of documents should be successful
|
||||
// TODO: currently the result of this operation is incorrect!
|
||||
// only the first task should be successful, because it should not be batched with
|
||||
// the second task, that operates on a different index!
|
||||
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "initial_tasks_processed");
|
||||
|
||||
// Now we delete the first task
|
||||
index_scheduler
|
||||
.register(KindWithContent::DeleteTasks {
|
||||
.register(KindWithContent::TaskDeletion {
|
||||
query: "test_query".to_owned(),
|
||||
tasks: vec![0],
|
||||
tasks: RoaringBitmap::from_iter(&[0]),
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
handle.wait_till(Breakpoint::AfterProcessing);
|
||||
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "task_deletion_processed");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -784,15 +792,15 @@ mod tests {
|
|||
.unwrap();
|
||||
file.persist().unwrap();
|
||||
|
||||
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||
|
||||
handle.wait_till(Breakpoint::BatchCreated);
|
||||
|
||||
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||
|
||||
handle.wait_till(Breakpoint::AfterProcessing);
|
||||
|
||||
assert_snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||
snapshot!(snapshot_index_scheduler(&index_scheduler));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -833,18 +841,10 @@ mod tests {
|
|||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! assert_smol_debug_snapshot {
|
||||
macro_rules! debug_snapshot {
|
||||
($value:expr, @$snapshot:literal) => {{
|
||||
let value = format!("{:?}", $value);
|
||||
insta::assert_snapshot!(value, stringify!($value), @$snapshot);
|
||||
}};
|
||||
($name:expr, $value:expr) => {{
|
||||
let value = format!("{:?}", $value);
|
||||
insta::assert_snapshot!(Some($name), value, stringify!($value));
|
||||
}};
|
||||
($value:expr) => {{
|
||||
let value = format!("{:?}", $value);
|
||||
insta::assert_snapshot!($crate::_macro_support::AutoName, value, stringify!($value));
|
||||
meili_snap::snapshot!(value, @$snapshot);
|
||||
}};
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue