Add a DetailsView type and improve index scheduler snapshots

The DetailsView type is necessary because serde incorrectly
deserialises the `Details` type, so the database fails to correctly
decode Tasks
This commit is contained in:
Loïc Lecrenier 2022-10-10 12:57:17 +02:00 committed by Clément Renault
parent dc81992eb2
commit 4c55c30027
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
6 changed files with 348 additions and 50 deletions

1
Cargo.lock generated
View File

@ -1877,6 +1877,7 @@ dependencies = [
"nelson",
"roaring 0.9.0",
"serde",
"serde_json",
"synchronoise",
"tempfile",
"thiserror",

View File

@ -17,6 +17,7 @@ meilisearch-types = { path = "../meilisearch-types" }
document-formats = { path = "../document-formats" }
roaring = "0.9.0"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = { version = "1.0.85", features = ["preserve_order"] }
tempfile = "3.3.0"
thiserror = "1.0.30"
time = { version = "0.3.7", features = ["serde-well-known", "formatting", "parsing", "macros"] }

View File

@ -380,7 +380,7 @@ impl IndexScheduler {
let task = self
.get_task(rtxn, task_id)?
.ok_or(Error::CorruptedTaskQueue)?;
println!("DeletionTask: {task:?}");
return Ok(Some(Batch::DeleteTasks(task)));
}
@ -442,7 +442,6 @@ impl IndexScheduler {
match batch {
Batch::Cancel(_) => todo!(),
Batch::DeleteTasks(mut task) => {
println!("delete task: {task:?}");
// 1. Retrieve the tasks that matched the quety at enqueue-time
let matched_tasks =
if let KindWithContent::DeleteTasks { tasks, query: _ } = &task.kind {
@ -450,10 +449,10 @@ impl IndexScheduler {
} else {
unreachable!()
};
println!("matched tasks: {matched_tasks:?}");
let mut wtxn = self.env.write_txn()?;
let nbr_deleted_tasks = self.delete_matched_tasks(&mut wtxn, matched_tasks)?;
println!("nbr_deleted_tasks: {nbr_deleted_tasks}");
task.status = Status::Succeeded;
match &mut task.details {
Some(Details::DeleteTasks {
@ -523,7 +522,6 @@ impl IndexScheduler {
primary_key,
mut task,
} => {
println!("IndexUpdate task: {task:?}");
let rtxn = self.env.read_txn()?;
let index = self.index_mapper.index(&rtxn, &index_uid)?;

View File

@ -2,6 +2,8 @@ mod autobatcher;
mod batch;
pub mod error;
mod index_mapper;
#[cfg(test)]
mod snapshot;
pub mod task;
mod utils;
@ -323,7 +325,6 @@ impl IndexScheduler {
status: Status::Enqueued,
kind: task,
};
self.all_tasks
.append(&mut wtxn, &BEU32::new(task.uid), &task)?;
@ -403,22 +404,26 @@ impl IndexScheduler {
// 2. Process the tasks
let res = self.process_batch(batch);
dbg!();
let mut wtxn = self.env.write_txn()?;
dbg!();
let finished_at = OffsetDateTime::now_utc();
match res {
Ok(tasks) => {
dbg!();
for mut task in tasks {
task.started_at = Some(started_at);
task.finished_at = Some(finished_at);
// TODO the info field should've been set by the process_batch function
self.update_task(&mut wtxn, &task)?;
task.remove_data()?;
dbg!();
}
dbg!();
}
// In case of a failure we must get back and patch all the tasks with the error.
Err(err) => {
dbg!();
let error: ResponseError = err.into();
for id in ids {
let mut task = self.get_task(&wtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
@ -432,10 +437,11 @@ impl IndexScheduler {
}
}
}
dbg!();
*self.processing_tasks.write().unwrap() = (finished_at, RoaringBitmap::new());
dbg!();
wtxn.commit()?;
dbg!();
log::info!("A batch of tasks was successfully completed.");
#[cfg(test)]
@ -449,15 +455,13 @@ impl IndexScheduler {
#[cfg(test)]
mod tests {
use std::os::unix::process;
use big_s::S;
use insta::*;
use milli::update::IndexDocumentsMethod::ReplaceDocuments;
use tempfile::TempDir;
use uuid::Uuid;
use crate::assert_smol_debug_snapshot;
use crate::{assert_smol_debug_snapshot, snapshot::snapshot_index};
use super::*;
@ -703,7 +707,7 @@ mod tests {
index_uid: S("catto"),
primary_key: None,
method: ReplaceDocuments,
content_file: Uuid::new_v4(),
content_file: Uuid::from_u128(0),
documents_count: 12,
allow_index_creation: true,
},
@ -711,7 +715,7 @@ mod tests {
index_uid: S("doggo"),
primary_key: Some(S("bone")),
method: ReplaceDocuments,
content_file: Uuid::new_v4(),
content_file: Uuid::from_u128(1),
documents_count: 5000,
allow_index_creation: true,
},
@ -727,6 +731,31 @@ mod tests {
}
rtxn.commit().unwrap();
assert_snapshot!(snapshot_index(&index_scheduler), @r###"
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
1 {uid: 1, status: enqueued, details: { received_documents: 12, indexed_documents: 0 }, kind: DocumentImport { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: e881d224-ed39-4322-87ae-eae5a749b835, documents_count: 12, allow_index_creation: true }}
2 {uid: 2, status: enqueued, details: { received_documents: 5000, indexed_documents: 0 }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: f21ce9f3-58f4-4bab-813b-ecb0b202d20f, documents_count: 5000, allow_index_creation: true }}
----------------------------------------------------------------------
### Status:
enqueued [0,1,2,]
----------------------------------------------------------------------
### Kind:
{"documentImport":{"method":"ReplaceDocuments","allow_index_creation":true}} [1,2,]
"indexCreation" [0,]
----------------------------------------------------------------------
### Index Tasks:
catto [0,1,]
doggo [2,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
"###);
assert_smol_debug_snapshot!(all_tasks, @"[U32(0), U32(1), U32(2)]");
index_scheduler.register(KindWithContent::DeleteTasks {
@ -740,7 +769,6 @@ mod tests {
.unwrap()
.unwrap();
rtxn.commit().unwrap();
println!("TASK IN DB: {task:?}");
let rtxn = index_scheduler.env.read_txn().unwrap();
let mut all_tasks = Vec::new();
@ -754,8 +782,8 @@ mod tests {
handle.wait_till(Breakpoint::BatchCreated);
// the last task, with uid = 3, should be marked as processing
let processing_tasks = &index_scheduler.processing_tasks.read().unwrap().1;
assert_smol_debug_snapshot!(processing_tasks, @"RoaringBitmap<[3]>");
// let processing_tasks = &index_scheduler.processing_tasks.read().unwrap().1;
// assert_smol_debug_snapshot!(processing_tasks, @"RoaringBitmap<[3]>");
let rtxn = index_scheduler.env.read_txn().unwrap();
let task = index_scheduler
.all_tasks
@ -763,22 +791,27 @@ mod tests {
.unwrap()
.unwrap();
rtxn.commit().unwrap();
println!("TASK IN DB: {task:?}");
// handle.wait_till(Breakpoint::AfterProcessing);
handle.wait_till(Breakpoint::AfterProcessing);
// let processing_tasks = &index_scheduler.processing_tasks.read().unwrap().1;
// assert_smol_debug_snapshot!(processing_tasks, @"RoaringBitmap<[]>");
dbg!();
// let rtxn = index_scheduler.env.read_txn().unwrap();
// let mut all_tasks = Vec::new();
// for ret in index_scheduler.all_tasks.iter(&rtxn).unwrap() {
// all_tasks.push(ret.unwrap().0);
// }
// rtxn.commit().unwrap();
let processing_tasks = &index_scheduler.processing_tasks.read().unwrap().1;
assert_smol_debug_snapshot!(processing_tasks, @"RoaringBitmap<[]>");
// assert_smol_debug_snapshot!(all_tasks, @"[U32(0), U32(1), U32(2), U32(3)]");
dbg!();
let rtxn = index_scheduler.env.read_txn().unwrap();
let mut all_tasks = Vec::new();
for ret in index_scheduler.all_tasks.iter(&rtxn).unwrap() {
all_tasks.push(ret.unwrap().0);
}
rtxn.commit().unwrap();
dbg!();
assert_smol_debug_snapshot!(all_tasks, @"[U32(0), U32(1), U32(2), U32(3)]");
handle.dont_block();
// index_scheduler.register(KindWithContent::DocumentClear { index_uid: 0 });
// index_scheduler.register(KindWithContent::CancelTask { tasks: vec![0] });
// index_scheduler.register(KindWithContendt::DeleteTasks { tasks: vec![0] });

View File

@ -0,0 +1,182 @@
use milli::{
heed::{
types::{OwnedType, SerdeBincode, SerdeJson, Str},
Database, RoTxn,
},
RoaringBitmapCodec, BEU32,
};
use roaring::RoaringBitmap;
use crate::{
index_mapper::IndexMapper,
task::{Details, Task},
IndexScheduler, Kind, Status,
};
pub fn snapshot_index(scheduler: &IndexScheduler) -> String {
let IndexScheduler {
processing_tasks,
file_store: _,
env,
all_tasks,
status,
kind,
index_tasks,
index_mapper,
wake_up: _,
test_breakpoint_sdr: _,
} = scheduler;
let rtxn = env.read_txn().unwrap();
let mut snap = String::new();
let (_time, processing_tasks) = processing_tasks.read().unwrap().clone();
snap.push_str("### Processing Tasks:\n");
snap.push_str(&snapshot_bitmap(&processing_tasks));
snap.push_str("\n----------------------------------------------------------------------\n");
snap.push_str("### All Tasks:\n");
snap.push_str(&snapshot_all_tasks(&rtxn, *all_tasks));
snap.push_str("----------------------------------------------------------------------\n");
snap.push_str("### Status:\n");
snap.push_str(&snapshot_status(&rtxn, *status));
snap.push_str("----------------------------------------------------------------------\n");
snap.push_str("### Kind:\n");
snap.push_str(&snapshot_kind(&rtxn, *kind));
snap.push_str("----------------------------------------------------------------------\n");
snap.push_str("### Index Tasks:\n");
snap.push_str(&snapshot_index_tasks(&rtxn, *index_tasks));
snap.push_str("----------------------------------------------------------------------\n");
snap.push_str("### Index Mapper:\n");
snap.push_str(&snapshot_index_mapper(&rtxn, index_mapper));
snap.push_str("\n----------------------------------------------------------------------\n");
rtxn.commit().unwrap();
snap
}
fn snapshot_bitmap(r: &RoaringBitmap) -> String {
let mut snap = String::new();
snap.push('[');
for x in r {
snap.push_str(&format!("{x},"));
}
snap.push(']');
snap
}
fn snapshot_all_tasks(rtxn: &RoTxn, db: Database<OwnedType<BEU32>, SerdeJson<Task>>) -> String {
let mut snap = String::new();
let mut iter = db.iter(rtxn).unwrap();
while let Some(next) = iter.next() {
let (task_id, task) = next.unwrap();
snap.push_str(&format!("{task_id} {}\n", snapshot_task(&task)));
}
snap
}
fn snapshot_task(task: &Task) -> String {
let mut snap = String::new();
let Task {
uid,
enqueued_at: _,
started_at: _,
finished_at: _,
error,
details,
status,
kind,
} = task;
snap.push('{');
snap.push_str(&format!("uid: {uid}, "));
snap.push_str(&format!("status: {status}, "));
if let Some(error) = error {
snap.push_str(&format!("error: {error:?}, "));
}
if let Some(details) = details {
snap.push_str(&format!("details: {}, ", &snaphsot_details(details)));
}
snap.push_str(&format!("kind: {kind:?}"));
snap.push('}');
snap
}
fn snaphsot_details(d: &Details) -> String {
match d {
Details::DocumentAddition {
received_documents,
indexed_documents,
} => {
format!("{{ received_documents: {received_documents}, indexed_documents: {indexed_documents} }}")
}
Details::Settings { settings } => {
format!("{{ settings: {settings:?} }}")
}
Details::IndexInfo { primary_key } => {
format!("{{ primary_key: {primary_key:?} }}")
}
Details::DocumentDeletion {
received_document_ids,
deleted_documents,
} => format!("{{ received_document_ids: {received_document_ids}, deleted_documents: {deleted_documents:?} }}"),
Details::ClearAll { deleted_documents } => {
format!("{{ deleted_documents: {deleted_documents:?} }}")
},
Details::DeleteTasks {
matched_tasks,
deleted_tasks,
original_query,
} => {
format!("{{ matched_tasks: {matched_tasks:?}, deleted_tasks: {deleted_tasks:?}, original_query: {original_query:?} }}")
},
Details::Dump { dump_uid } => {
format!("{{ dump_uid: {dump_uid:?} }}")
},
}
}
fn snapshot_status(rtxn: &RoTxn, db: Database<SerdeBincode<Status>, RoaringBitmapCodec>) -> String {
let mut snap = String::new();
let mut iter = db.iter(rtxn).unwrap();
while let Some(next) = iter.next() {
let (status, task_ids) = next.unwrap();
snap.push_str(&format!("{status} {}\n", snapshot_bitmap(&task_ids)));
}
snap
}
fn snapshot_kind(rtxn: &RoTxn, db: Database<SerdeBincode<Kind>, RoaringBitmapCodec>) -> String {
let mut snap = String::new();
let mut iter = db.iter(rtxn).unwrap();
while let Some(next) = iter.next() {
let (kind, task_ids) = next.unwrap();
let kind = serde_json::to_string(&kind).unwrap();
snap.push_str(&format!("{kind} {}\n", snapshot_bitmap(&task_ids)));
}
snap
}
fn snapshot_index_tasks(rtxn: &RoTxn, db: Database<Str, RoaringBitmapCodec>) -> String {
let mut snap = String::new();
let mut iter = db.iter(rtxn).unwrap();
while let Some(next) = iter.next() {
let (index, task_ids) = next.unwrap();
snap.push_str(&format!("{index} {}\n", snapshot_bitmap(&task_ids)));
}
snap
}
fn snapshot_index_mapper(rtxn: &RoTxn, mapper: &IndexMapper) -> String {
let names = mapper
.indexes(rtxn)
.unwrap()
.into_iter()
.map(|(n, _)| n)
.collect::<Vec<_>>();
format!("{names:?}")
}

View File

@ -4,13 +4,17 @@ use meilisearch_types::error::ResponseError;
use milli::update::IndexDocumentsMethod;
use serde::{Deserialize, Serialize, Serializer};
use std::{fmt::Write, path::PathBuf, str::FromStr};
use std::{
fmt::{Display, Write},
path::PathBuf,
str::FromStr,
};
use time::{Duration, OffsetDateTime};
use uuid::Uuid;
use crate::{Error, Query, TaskId};
use crate::{Error, TaskId};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
#[derive(Debug, Clone, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskView {
pub uid: TaskId,
@ -21,9 +25,9 @@ pub struct TaskView {
#[serde(rename = "type")]
pub kind: Kind,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub details: Option<Details>,
#[serde(skip_serializing_if = "Option::is_none", default)]
#[serde(skip_serializing_if = "Option::is_none")]
pub details: Option<DetailsView>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<ResponseError>,
#[serde(
@ -92,7 +96,7 @@ impl Task {
.and_then(|vec| vec.first().map(|i| i.to_string())),
status: self.status,
kind: self.kind.as_kind(),
details: self.details.clone(),
details: self.details.as_ref().map(Details::as_details_view),
error: self.error.clone(),
duration: self
.started_at
@ -113,7 +117,16 @@ pub enum Status {
Succeeded,
Failed,
}
impl Display for Status {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Status::Enqueued => write!(f, "enqueued"),
Status::Processing => write!(f, "processing"),
Status::Succeeded => write!(f, "succeeded"),
Status::Failed => write!(f, "failed"),
}
}
}
impl FromStr for Status {
type Err = Error;
@ -374,37 +387,107 @@ impl FromStr for Kind {
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
#[serde(untagged)]
#[allow(clippy::large_enum_variant)]
pub enum Details {
#[serde(rename_all = "camelCase")]
DocumentAddition {
received_documents: u64,
indexed_documents: u64,
},
#[serde(rename_all = "camelCase")]
Settings {
#[serde(flatten)]
settings: Settings<Unchecked>,
},
#[serde(rename_all = "camelCase")]
IndexInfo { primary_key: Option<String> },
#[serde(rename_all = "camelCase")]
IndexInfo {
primary_key: Option<String>,
},
DocumentDeletion {
received_document_ids: usize,
// TODO why is this optional?
deleted_documents: Option<u64>,
},
#[serde(rename_all = "camelCase")]
ClearAll { deleted_documents: Option<u64> },
#[serde(rename_all = "camelCase")]
ClearAll {
deleted_documents: Option<u64>,
},
DeleteTasks {
matched_tasks: usize,
deleted_tasks: Option<usize>,
original_query: String,
},
#[serde(rename_all = "camelCase")]
Dump { dump_uid: String },
Dump {
dump_uid: String,
},
}
#[derive(Default, Debug, PartialEq, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct DetailsView {
#[serde(skip_serializing_if = "Option::is_none")]
received_documents: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
indexed_documents: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
primary_key: Option<Option<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
received_document_ids: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
deleted_documents: Option<Option<u64>>,
#[serde(skip_serializing_if = "Option::is_none")]
matched_tasks: Option<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
deleted_tasks: Option<Option<usize>>,
#[serde(skip_serializing_if = "Option::is_none")]
original_query: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
dump_uid: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(flatten)]
settings: Option<Settings<Unchecked>>,
}
impl Details {
fn as_details_view(&self) -> DetailsView {
match self.clone() {
Details::DocumentAddition {
received_documents,
indexed_documents,
} => DetailsView {
received_documents: Some(received_documents),
indexed_documents: Some(indexed_documents),
..DetailsView::default()
},
Details::Settings { settings } => DetailsView {
settings: Some(settings),
..DetailsView::default()
},
Details::IndexInfo { primary_key } => DetailsView {
primary_key: Some(primary_key),
..DetailsView::default()
},
Details::DocumentDeletion {
received_document_ids,
deleted_documents,
} => DetailsView {
received_document_ids: Some(received_document_ids),
deleted_documents: Some(deleted_documents),
..DetailsView::default()
},
Details::ClearAll { deleted_documents } => DetailsView {
deleted_documents: Some(deleted_documents),
..DetailsView::default()
},
Details::DeleteTasks {
matched_tasks,
deleted_tasks,
original_query,
} => DetailsView {
matched_tasks: Some(matched_tasks),
deleted_tasks: Some(deleted_tasks),
original_query: Some(original_query),
..DetailsView::default()
},
Details::Dump { dump_uid } => DetailsView {
dump_uid: Some(dump_uid),
..DetailsView::default()
},
}
}
}
/// Serialize a `time::Duration` as a best effort ISO 8601 while waiting for
@ -476,6 +559,6 @@ mod tests {
let serialised = SerdeJson::<Details>::bytes_encode(&details).unwrap();
let deserialised = SerdeJson::<Details>::bytes_decode(&serialised).unwrap();
assert_smol_debug_snapshot!(details, @r###"DeleteTasks { matched_tasks: 1, deleted_tasks: None, original_query: "hello" }"###);
assert_smol_debug_snapshot!(deserialised, @"Settings { settings: Settings { displayed_attributes: NotSet, searchable_attributes: NotSet, filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, synonyms: NotSet, distinct_attribute: NotSet, typo_tolerance: NotSet, faceting: NotSet, pagination: NotSet, _kind: PhantomData } }");
assert_smol_debug_snapshot!(deserialised, @r###"DeleteTasks { matched_tasks: 1, deleted_tasks: None, original_query: "hello" }"###);
}
}