Add a DetailsView type and improve index scheduler snapshots

The DetailsView type is necessary because serde incorrectly
deserialises the `Details` type, so the database fails to correctly
decode Tasks
This commit is contained in:
Loïc Lecrenier 2022-10-10 12:57:17 +02:00 committed by Clément Renault
parent 05753c663f
commit fd73e65165
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
6 changed files with 348 additions and 50 deletions

View file

@ -2,6 +2,8 @@ mod autobatcher;
mod batch;
pub mod error;
mod index_mapper;
#[cfg(test)]
mod snapshot;
pub mod task;
mod utils;
@ -323,7 +325,6 @@ impl IndexScheduler {
status: Status::Enqueued,
kind: task,
};
self.all_tasks
.append(&mut wtxn, &BEU32::new(task.uid), &task)?;
@ -403,22 +404,26 @@ impl IndexScheduler {
// 2. Process the tasks
let res = self.process_batch(batch);
dbg!();
let mut wtxn = self.env.write_txn()?;
dbg!();
let finished_at = OffsetDateTime::now_utc();
match res {
Ok(tasks) => {
dbg!();
for mut task in tasks {
task.started_at = Some(started_at);
task.finished_at = Some(finished_at);
// TODO the info field should've been set by the process_batch function
self.update_task(&mut wtxn, &task)?;
task.remove_data()?;
dbg!();
}
dbg!();
}
// In case of a failure we must get back and patch all the tasks with the error.
Err(err) => {
dbg!();
let error: ResponseError = err.into();
for id in ids {
let mut task = self.get_task(&wtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
@ -432,10 +437,11 @@ impl IndexScheduler {
}
}
}
dbg!();
*self.processing_tasks.write().unwrap() = (finished_at, RoaringBitmap::new());
dbg!();
wtxn.commit()?;
dbg!();
log::info!("A batch of tasks was successfully completed.");
#[cfg(test)]
@ -449,15 +455,13 @@ impl IndexScheduler {
#[cfg(test)]
mod tests {
use std::os::unix::process;
use big_s::S;
use insta::*;
use milli::update::IndexDocumentsMethod::ReplaceDocuments;
use tempfile::TempDir;
use uuid::Uuid;
use crate::assert_smol_debug_snapshot;
use crate::{assert_smol_debug_snapshot, snapshot::snapshot_index};
use super::*;
@ -703,7 +707,7 @@ mod tests {
index_uid: S("catto"),
primary_key: None,
method: ReplaceDocuments,
content_file: Uuid::new_v4(),
content_file: Uuid::from_u128(0),
documents_count: 12,
allow_index_creation: true,
},
@ -711,7 +715,7 @@ mod tests {
index_uid: S("doggo"),
primary_key: Some(S("bone")),
method: ReplaceDocuments,
content_file: Uuid::new_v4(),
content_file: Uuid::from_u128(1),
documents_count: 5000,
allow_index_creation: true,
},
@ -727,6 +731,31 @@ mod tests {
}
rtxn.commit().unwrap();
assert_snapshot!(snapshot_index(&index_scheduler), @r###"
### Processing Tasks:
[]
----------------------------------------------------------------------
### All Tasks:
0 {uid: 0, status: enqueued, details: { primary_key: Some("mouse") }, kind: IndexCreation { index_uid: "catto", primary_key: Some("mouse") }}
1 {uid: 1, status: enqueued, details: { received_documents: 12, indexed_documents: 0 }, kind: DocumentImport { index_uid: "catto", primary_key: None, method: ReplaceDocuments, content_file: e881d224-ed39-4322-87ae-eae5a749b835, documents_count: 12, allow_index_creation: true }}
2 {uid: 2, status: enqueued, details: { received_documents: 5000, indexed_documents: 0 }, kind: DocumentImport { index_uid: "doggo", primary_key: Some("bone"), method: ReplaceDocuments, content_file: f21ce9f3-58f4-4bab-813b-ecb0b202d20f, documents_count: 5000, allow_index_creation: true }}
----------------------------------------------------------------------
### Status:
enqueued [0,1,2,]
----------------------------------------------------------------------
### Kind:
{"documentImport":{"method":"ReplaceDocuments","allow_index_creation":true}} [1,2,]
"indexCreation" [0,]
----------------------------------------------------------------------
### Index Tasks:
catto [0,1,]
doggo [2,]
----------------------------------------------------------------------
### Index Mapper:
[]
----------------------------------------------------------------------
"###);
assert_smol_debug_snapshot!(all_tasks, @"[U32(0), U32(1), U32(2)]");
index_scheduler.register(KindWithContent::DeleteTasks {
@ -740,7 +769,6 @@ mod tests {
.unwrap()
.unwrap();
rtxn.commit().unwrap();
println!("TASK IN DB: {task:?}");
let rtxn = index_scheduler.env.read_txn().unwrap();
let mut all_tasks = Vec::new();
@ -754,8 +782,8 @@ mod tests {
handle.wait_till(Breakpoint::BatchCreated);
// the last task, with uid = 3, should be marked as processing
let processing_tasks = &index_scheduler.processing_tasks.read().unwrap().1;
assert_smol_debug_snapshot!(processing_tasks, @"RoaringBitmap<[3]>");
// let processing_tasks = &index_scheduler.processing_tasks.read().unwrap().1;
// assert_smol_debug_snapshot!(processing_tasks, @"RoaringBitmap<[3]>");
let rtxn = index_scheduler.env.read_txn().unwrap();
let task = index_scheduler
.all_tasks
@ -763,22 +791,27 @@ mod tests {
.unwrap()
.unwrap();
rtxn.commit().unwrap();
println!("TASK IN DB: {task:?}");
// handle.wait_till(Breakpoint::AfterProcessing);
handle.wait_till(Breakpoint::AfterProcessing);
// let processing_tasks = &index_scheduler.processing_tasks.read().unwrap().1;
// assert_smol_debug_snapshot!(processing_tasks, @"RoaringBitmap<[]>");
dbg!();
// let rtxn = index_scheduler.env.read_txn().unwrap();
// let mut all_tasks = Vec::new();
// for ret in index_scheduler.all_tasks.iter(&rtxn).unwrap() {
// all_tasks.push(ret.unwrap().0);
// }
// rtxn.commit().unwrap();
let processing_tasks = &index_scheduler.processing_tasks.read().unwrap().1;
assert_smol_debug_snapshot!(processing_tasks, @"RoaringBitmap<[]>");
// assert_smol_debug_snapshot!(all_tasks, @"[U32(0), U32(1), U32(2), U32(3)]");
dbg!();
let rtxn = index_scheduler.env.read_txn().unwrap();
let mut all_tasks = Vec::new();
for ret in index_scheduler.all_tasks.iter(&rtxn).unwrap() {
all_tasks.push(ret.unwrap().0);
}
rtxn.commit().unwrap();
dbg!();
assert_smol_debug_snapshot!(all_tasks, @"[U32(0), U32(1), U32(2), U32(3)]");
handle.dont_block();
// index_scheduler.register(KindWithContent::DocumentClear { index_uid: 0 });
// index_scheduler.register(KindWithContent::CancelTask { tasks: vec![0] });
// index_scheduler.register(KindWithContendt::DeleteTasks { tasks: vec![0] });