implement the progress for almost all the tasks

This commit is contained in:
Tamo 2024-12-11 16:18:12 +01:00
parent 26733c705d
commit 786b0fabea
No known key found for this signature in database
GPG Key ID: 20CD8020AFA88D69
2 changed files with 203 additions and 6 deletions

View File

@ -50,7 +50,11 @@ use uuid::Uuid;
use crate::autobatcher::{self, BatchKind}; use crate::autobatcher::{self, BatchKind};
use crate::processing::{ use crate::processing::{
AtomicBatchStep, AtomicTaskStep, TaskCancelationProgress, TaskDeletionProgress, AtomicBatchStep, AtomicDocumentStep, AtomicTaskStep, AtomicUpdateFileStep, CreateIndexProgress,
DeleteIndexProgress, DocumentDeletionProgress, DocumentEditionProgress,
DocumentOperationProgress, DumpCreationProgress, InnerSwappingTwoIndexes, SettingsProgress,
SnapshotCreationProgress, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress,
UpdateIndexProgress, VariableNameStep,
}; };
use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch}; use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch};
use crate::{Error, IndexScheduler, Result, TaskId}; use crate::{Error, IndexScheduler, Result, TaskId};
@ -651,6 +655,8 @@ impl IndexScheduler {
Ok(tasks) Ok(tasks)
} }
Batch::SnapshotCreation(mut tasks) => { Batch::SnapshotCreation(mut tasks) => {
progress.update_progress(SnapshotCreationProgress::StartTheSnapshotCreation);
fs::create_dir_all(&self.snapshots_path)?; fs::create_dir_all(&self.snapshots_path)?;
let temp_snapshot_dir = tempfile::tempdir()?; let temp_snapshot_dir = tempfile::tempdir()?;
@ -671,6 +677,7 @@ impl IndexScheduler {
// two read operations as the task processing is synchronous. // two read operations as the task processing is synchronous.
// 2.1 First copy the LMDB env of the index-scheduler // 2.1 First copy the LMDB env of the index-scheduler
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
let dst = temp_snapshot_dir.path().join("tasks"); let dst = temp_snapshot_dir.path().join("tasks");
fs::create_dir_all(&dst)?; fs::create_dir_all(&dst)?;
self.env.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?; self.env.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;
@ -683,6 +690,11 @@ impl IndexScheduler {
fs::create_dir_all(&update_files_dir)?; fs::create_dir_all(&update_files_dir)?;
// 2.4 Only copy the update files of the enqueued tasks // 2.4 Only copy the update files of the enqueued tasks
progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles);
let enqueued = self.get_status(&rtxn, Status::Enqueued)?;
let (atomic, update_file_progress) =
AtomicUpdateFileStep::new(enqueued.len() as u32);
progress.update_progress(update_file_progress);
for task_id in self.get_status(&rtxn, Status::Enqueued)? { for task_id in self.get_status(&rtxn, Status::Enqueued)? {
let task = self.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; let task = self.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
if let Some(content_uuid) = task.content_uuid() { if let Some(content_uuid) = task.content_uuid() {
@ -690,11 +702,17 @@ impl IndexScheduler {
let dst = update_files_dir.join(content_uuid.to_string()); let dst = update_files_dir.join(content_uuid.to_string());
fs::copy(src, dst)?; fs::copy(src, dst)?;
} }
atomic.fetch_add(1, Ordering::Relaxed);
} }
// 3. Snapshot every indexes // 3. Snapshot every indexes
for result in self.index_mapper.index_mapping.iter(&rtxn)? { progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes);
let index_mapping = self.index_mapper.index_mapping;
let nb_indexes = index_mapping.len(&rtxn)? as u32;
for (i, result) in index_mapping.iter(&rtxn)?.enumerate() {
let (name, uuid) = result?; let (name, uuid) = result?;
progress.update_progress(VariableNameStep::new(name, i as u32, nb_indexes));
let index = self.index_mapper.index(&rtxn, name)?; let index = self.index_mapper.index(&rtxn, name)?;
let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string()); let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string());
fs::create_dir_all(&dst)?; fs::create_dir_all(&dst)?;
@ -706,6 +724,7 @@ impl IndexScheduler {
drop(rtxn); drop(rtxn);
// 4. Snapshot the auth LMDB env // 4. Snapshot the auth LMDB env
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
let dst = temp_snapshot_dir.path().join("auth"); let dst = temp_snapshot_dir.path().join("auth");
fs::create_dir_all(&dst)?; fs::create_dir_all(&dst)?;
// TODO We can't use the open_auth_store_env function here but we should // TODO We can't use the open_auth_store_env function here but we should
@ -718,6 +737,7 @@ impl IndexScheduler {
auth.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?; auth.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;
// 5. Copy and tarball the flat snapshot // 5. Copy and tarball the flat snapshot
progress.update_progress(SnapshotCreationProgress::CreateTheTarball);
// 5.1 Find the original name of the database // 5.1 Find the original name of the database
// TODO find a better way to get this path // TODO find a better way to get this path
let mut base_path = self.env.path().to_owned(); let mut base_path = self.env.path().to_owned();
@ -750,6 +770,7 @@ impl IndexScheduler {
Ok(tasks) Ok(tasks)
} }
Batch::Dump(mut task) => { Batch::Dump(mut task) => {
progress.update_progress(DumpCreationProgress::StartTheDumpCreation);
let started_at = OffsetDateTime::now_utc(); let started_at = OffsetDateTime::now_utc();
let (keys, instance_uid) = let (keys, instance_uid) =
if let KindWithContent::DumpCreation { keys, instance_uid } = &task.kind { if let KindWithContent::DumpCreation { keys, instance_uid } = &task.kind {
@ -760,6 +781,7 @@ impl IndexScheduler {
let dump = dump::DumpWriter::new(*instance_uid)?; let dump = dump::DumpWriter::new(*instance_uid)?;
// 1. dump the keys // 1. dump the keys
progress.update_progress(DumpCreationProgress::DumpTheApiKeys);
let mut dump_keys = dump.create_keys()?; let mut dump_keys = dump.create_keys()?;
for key in keys { for key in keys {
dump_keys.push_key(key)?; dump_keys.push_key(key)?;
@ -769,7 +791,13 @@ impl IndexScheduler {
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
// 2. dump the tasks // 2. dump the tasks
progress.update_progress(DumpCreationProgress::DumpTheTasks);
let mut dump_tasks = dump.create_tasks_queue()?; let mut dump_tasks = dump.create_tasks_queue()?;
let (atomic, update_task_progress) =
AtomicTaskStep::new(self.all_tasks.len(&rtxn)? as u32);
progress.update_progress(update_task_progress);
for ret in self.all_tasks.iter(&rtxn)? { for ret in self.all_tasks.iter(&rtxn)? {
if self.must_stop_processing.get() { if self.must_stop_processing.get() {
return Err(Error::AbortedTask); return Err(Error::AbortedTask);
@ -819,11 +847,22 @@ impl IndexScheduler {
dump_content_file.flush()?; dump_content_file.flush()?;
} }
} }
atomic.fetch_add(1, Ordering::Relaxed);
} }
dump_tasks.flush()?; dump_tasks.flush()?;
// 3. Dump the indexes // 3. Dump the indexes
progress.update_progress(DumpCreationProgress::DumpTheIndexes);
let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32;
let mut count = 0;
self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> { self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> {
progress.update_progress(VariableNameStep::new(
uid.to_string(),
count,
nb_indexes,
));
count += 1;
let rtxn = index.read_txn()?; let rtxn = index.read_txn()?;
let metadata = IndexMetadata { let metadata = IndexMetadata {
uid: uid.to_owned(), uid: uid.to_owned(),
@ -843,6 +882,12 @@ impl IndexScheduler {
.embedding_configs(&rtxn) .embedding_configs(&rtxn)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
let nb_documents = index
.number_of_documents(&rtxn)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?
as u32;
let (atomic, update_document_progress) = AtomicDocumentStep::new(nb_documents);
progress.update_progress(update_document_progress);
let documents = index let documents = index
.all_documents(&rtxn) .all_documents(&rtxn)
.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?;
@ -912,6 +957,7 @@ impl IndexScheduler {
} }
index_dumper.push_document(&document)?; index_dumper.push_document(&document)?;
atomic.fetch_add(1, Ordering::Relaxed);
} }
// 3.2. Dump the settings // 3.2. Dump the settings
@ -926,6 +972,7 @@ impl IndexScheduler {
})?; })?;
// 4. Dump experimental feature settings // 4. Dump experimental feature settings
progress.update_progress(DumpCreationProgress::DumpTheExperimentalFeatures);
let features = self.features().runtime_features(); let features = self.features().runtime_features();
dump.create_experimental_features(features)?; dump.create_experimental_features(features)?;
@ -936,6 +983,7 @@ impl IndexScheduler {
if self.must_stop_processing.get() { if self.must_stop_processing.get() {
return Err(Error::AbortedTask); return Err(Error::AbortedTask);
} }
progress.update_progress(DumpCreationProgress::CompressTheDump);
let path = self.dumps_path.join(format!("{}.dump", dump_uid)); let path = self.dumps_path.join(format!("{}.dump", dump_uid));
let file = File::create(path)?; let file = File::create(path)?;
dump.persist_to(BufWriter::new(file))?; dump.persist_to(BufWriter::new(file))?;
@ -995,6 +1043,8 @@ impl IndexScheduler {
Ok(tasks) Ok(tasks)
} }
Batch::IndexCreation { index_uid, primary_key, task } => { Batch::IndexCreation { index_uid, primary_key, task } => {
progress.update_progress(CreateIndexProgress::CreatingTheIndex);
let wtxn = self.env.write_txn()?; let wtxn = self.env.write_txn()?;
if self.index_mapper.exists(&wtxn, &index_uid)? { if self.index_mapper.exists(&wtxn, &index_uid)? {
return Err(Error::IndexAlreadyExists(index_uid)); return Err(Error::IndexAlreadyExists(index_uid));
@ -1008,6 +1058,7 @@ impl IndexScheduler {
) )
} }
Batch::IndexUpdate { index_uid, primary_key, mut task } => { Batch::IndexUpdate { index_uid, primary_key, mut task } => {
progress.update_progress(UpdateIndexProgress::UpdatingTheIndex);
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;
let index = self.index_mapper.index(&rtxn, &index_uid)?; let index = self.index_mapper.index(&rtxn, &index_uid)?;
@ -1060,6 +1111,7 @@ impl IndexScheduler {
Ok(vec![task]) Ok(vec![task])
} }
Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => { Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => {
progress.update_progress(DeleteIndexProgress::DeletingTheIndex);
let wtxn = self.env.write_txn()?; let wtxn = self.env.write_txn()?;
// it's possible that the index doesn't exist // it's possible that the index doesn't exist
@ -1093,6 +1145,8 @@ impl IndexScheduler {
Ok(tasks) Ok(tasks)
} }
Batch::IndexSwap { mut task } => { Batch::IndexSwap { mut task } => {
progress.update_progress(SwappingTheIndexes::EnsuringCorrectnessOfTheSwap);
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
let swaps = if let KindWithContent::IndexSwap { swaps } = &task.kind { let swaps = if let KindWithContent::IndexSwap { swaps } = &task.kind {
swaps swaps
@ -1119,8 +1173,20 @@ impl IndexScheduler {
)); ));
} }
} }
for swap in swaps { progress.update_progress(SwappingTheIndexes::SwappingTheIndexes);
self.apply_index_swap(&mut wtxn, task.uid, &swap.indexes.0, &swap.indexes.1)?; for (step, swap) in swaps.iter().enumerate() {
progress.update_progress(VariableNameStep::new(
format!("swapping index {} and {}", swap.indexes.0, swap.indexes.1),
step as u32,
swaps.len() as u32,
));
self.apply_index_swap(
&mut wtxn,
&progress,
task.uid,
&swap.indexes.0,
&swap.indexes.1,
)?;
} }
wtxn.commit()?; wtxn.commit()?;
task.status = Status::Succeeded; task.status = Status::Succeeded;
@ -1130,7 +1196,15 @@ impl IndexScheduler {
} }
/// Swap the index `lhs` with the index `rhs`. /// Swap the index `lhs` with the index `rhs`.
fn apply_index_swap(&self, wtxn: &mut RwTxn, task_id: u32, lhs: &str, rhs: &str) -> Result<()> { fn apply_index_swap(
&self,
wtxn: &mut RwTxn,
progress: &Progress,
task_id: u32,
lhs: &str,
rhs: &str,
) -> Result<()> {
progress.update_progress(InnerSwappingTwoIndexes::RetrieveTheTasks);
// 1. Verify that both lhs and rhs are existing indexes // 1. Verify that both lhs and rhs are existing indexes
let index_lhs_exists = self.index_mapper.index_exists(wtxn, lhs)?; let index_lhs_exists = self.index_mapper.index_exists(wtxn, lhs)?;
if !index_lhs_exists { if !index_lhs_exists {
@ -1148,14 +1222,21 @@ impl IndexScheduler {
index_rhs_task_ids.remove_range(task_id..); index_rhs_task_ids.remove_range(task_id..);
// 3. before_name -> new_name in the task's KindWithContent // 3. before_name -> new_name in the task's KindWithContent
for task_id in &index_lhs_task_ids | &index_rhs_task_ids { progress.update_progress(InnerSwappingTwoIndexes::UpdateTheTasks);
let tasks_to_update = &index_lhs_task_ids | &index_rhs_task_ids;
let (atomic, task_progress) = AtomicTaskStep::new(tasks_to_update.len() as u32);
progress.update_progress(task_progress);
for task_id in tasks_to_update {
let mut task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; let mut task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
swap_index_uid_in_task(&mut task, (lhs, rhs)); swap_index_uid_in_task(&mut task, (lhs, rhs));
self.all_tasks.put(wtxn, &task_id, &task)?; self.all_tasks.put(wtxn, &task_id, &task)?;
atomic.fetch_add(1, Ordering::Relaxed);
} }
// 4. remove the task from indexuid = before_name // 4. remove the task from indexuid = before_name
// 5. add the task to indexuid = after_name // 5. add the task to indexuid = after_name
progress.update_progress(InnerSwappingTwoIndexes::UpdateTheIndexesMetadata);
self.update_index(wtxn, lhs, |lhs_tasks| { self.update_index(wtxn, lhs, |lhs_tasks| {
*lhs_tasks -= &index_lhs_task_ids; *lhs_tasks -= &index_lhs_task_ids;
*lhs_tasks |= &index_rhs_task_ids; *lhs_tasks |= &index_rhs_task_ids;
@ -1222,6 +1303,7 @@ impl IndexScheduler {
operations, operations,
mut tasks, mut tasks,
} => { } => {
progress.update_progress(DocumentOperationProgress::RetrievingConfig);
// TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches. // TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches.
// this is made difficult by the fact we're doing private clones of the index scheduler and sending it // this is made difficult by the fact we're doing private clones of the index scheduler and sending it
// to a fresh thread. // to a fresh thread.
@ -1277,6 +1359,7 @@ impl IndexScheduler {
} }
}; };
progress.update_progress(DocumentOperationProgress::ComputingTheChanges);
let (document_changes, operation_stats, primary_key) = indexer let (document_changes, operation_stats, primary_key) = indexer
.into_changes( .into_changes(
&indexer_alloc, &indexer_alloc,
@ -1321,6 +1404,7 @@ impl IndexScheduler {
} }
} }
progress.update_progress(DocumentOperationProgress::Indexing);
if tasks.iter().any(|res| res.error.is_none()) { if tasks.iter().any(|res| res.error.is_none()) {
indexer::index( indexer::index(
index_wtxn, index_wtxn,
@ -1350,6 +1434,8 @@ impl IndexScheduler {
Ok(tasks) Ok(tasks)
} }
IndexOperation::DocumentEdition { index_uid, mut task } => { IndexOperation::DocumentEdition { index_uid, mut task } => {
progress.update_progress(DocumentEditionProgress::RetrievingConfig);
let (filter, code) = if let KindWithContent::DocumentEdition { let (filter, code) = if let KindWithContent::DocumentEdition {
filter_expr, filter_expr,
context: _, context: _,
@ -1423,6 +1509,7 @@ impl IndexScheduler {
}; };
let candidates_count = candidates.len(); let candidates_count = candidates.len();
progress.update_progress(DocumentEditionProgress::ComputingTheChanges);
let indexer = UpdateByFunction::new(candidates, context.clone(), code.clone()); let indexer = UpdateByFunction::new(candidates, context.clone(), code.clone());
let document_changes = pool let document_changes = pool
.install(|| { .install(|| {
@ -1436,6 +1523,7 @@ impl IndexScheduler {
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
let embedders = self.embedders(index_uid.clone(), embedders)?; let embedders = self.embedders(index_uid.clone(), embedders)?;
progress.update_progress(DocumentEditionProgress::Indexing);
indexer::index( indexer::index(
index_wtxn, index_wtxn,
index, index,
@ -1488,6 +1576,8 @@ impl IndexScheduler {
Ok(vec![task]) Ok(vec![task])
} }
IndexOperation::DocumentDeletion { mut tasks, index_uid } => { IndexOperation::DocumentDeletion { mut tasks, index_uid } => {
progress.update_progress(DocumentDeletionProgress::RetrievingConfig);
let mut to_delete = RoaringBitmap::new(); let mut to_delete = RoaringBitmap::new();
let external_documents_ids = index.external_documents_ids(); let external_documents_ids = index.external_documents_ids();
@ -1578,6 +1668,7 @@ impl IndexScheduler {
} }
}; };
progress.update_progress(DocumentDeletionProgress::DeleteDocuments);
let mut indexer = indexer::DocumentDeletion::new(); let mut indexer = indexer::DocumentDeletion::new();
let candidates_count = to_delete.len(); let candidates_count = to_delete.len();
indexer.delete_documents_by_docids(to_delete); indexer.delete_documents_by_docids(to_delete);
@ -1587,6 +1678,7 @@ impl IndexScheduler {
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
let embedders = self.embedders(index_uid.clone(), embedders)?; let embedders = self.embedders(index_uid.clone(), embedders)?;
progress.update_progress(DocumentDeletionProgress::Indexing);
indexer::index( indexer::index(
index_wtxn, index_wtxn,
index, index,
@ -1615,6 +1707,7 @@ impl IndexScheduler {
Ok(tasks) Ok(tasks)
} }
IndexOperation::Settings { index_uid, settings, mut tasks } => { IndexOperation::Settings { index_uid, settings, mut tasks } => {
progress.update_progress(SettingsProgress::RetrievingAndMergingTheSettings);
let indexer_config = self.index_mapper.indexer_config(); let indexer_config = self.index_mapper.indexer_config();
let mut builder = milli::update::Settings::new(index_wtxn, index, indexer_config); let mut builder = milli::update::Settings::new(index_wtxn, index, indexer_config);
@ -1628,6 +1721,7 @@ impl IndexScheduler {
task.status = Status::Succeeded; task.status = Status::Succeeded;
} }
progress.update_progress(SettingsProgress::ApplyTheSettings);
builder builder
.execute( .execute(
|indexing_step| tracing::debug!(update = ?indexing_step), |indexing_step| tracing::debug!(update = ?indexing_step),

View File

@ -119,8 +119,111 @@ make_enum_progress! {
- DeletingBatches - DeletingBatches
} }
make_enum_progress! {
enum SnapshotCreationProgress:
- StartTheSnapshotCreation
- SnapshotTheIndexScheduler
- SnapshotTheUpdateFiles
- SnapshotTheIndexes
- SnapshotTheApiKeys
- CreateTheTarball
}
make_enum_progress! {
enum DumpCreationProgress:
- StartTheDumpCreation
- DumpTheApiKeys
- DumpTheTasks
- DumpTheIndexes
- DumpTheExperimentalFeatures
- CompressTheDump
}
make_enum_progress! {
enum CreateIndexProgress:
- CreatingTheIndex
}
make_enum_progress! {
enum UpdateIndexProgress:
- UpdatingTheIndex
}
make_enum_progress! {
enum DeleteIndexProgress:
- DeletingTheIndex
}
make_enum_progress! {
enum SwappingTheIndexes:
- EnsuringCorrectnessOfTheSwap
- SwappingTheIndexes
}
make_enum_progress! {
enum InnerSwappingTwoIndexes:
- RetrieveTheTasks
- UpdateTheTasks
- UpdateTheIndexesMetadata
}
make_enum_progress! {
enum DocumentOperationProgress:
- RetrievingConfig
- ComputingTheChanges
- Indexing
}
make_enum_progress! {
enum DocumentEditionProgress:
- RetrievingConfig
- ComputingTheChanges
- Indexing
}
make_enum_progress! {
enum DocumentDeletionProgress:
- RetrievingConfig
- DeleteDocuments
- Indexing
}
make_enum_progress! {
enum SettingsProgress:
- RetrievingAndMergingTheSettings
- ApplyTheSettings
}
make_atomic_progress!(Task alias AtomicTaskStep => "task" ); make_atomic_progress!(Task alias AtomicTaskStep => "task" );
make_atomic_progress!(Document alias AtomicDocumentStep => "document" );
make_atomic_progress!(Batch alias AtomicBatchStep => "batch" ); make_atomic_progress!(Batch alias AtomicBatchStep => "batch" );
make_atomic_progress!(UpdateFile alias AtomicUpdateFileStep => "update file" );
pub struct VariableNameStep {
name: String,
current: u32,
total: u32,
}
impl VariableNameStep {
pub fn new(name: impl Into<String>, current: u32, total: u32) -> Self {
Self { name: name.into(), current, total }
}
}
impl Step for VariableNameStep {
fn name(&self) -> Cow<'static, str> {
self.name.clone().into()
}
fn current(&self) -> u32 {
self.current
}
fn total(&self) -> u32 {
self.total
}
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {