From 786b0fabea2a979442923f32ffbecc8208671cf9 Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 11 Dec 2024 16:18:12 +0100 Subject: [PATCH] implement the progress for almost all the tasks --- crates/index-scheduler/src/batch.rs | 106 +++++++++++++++++++++-- crates/index-scheduler/src/processing.rs | 103 ++++++++++++++++++++++ 2 files changed, 203 insertions(+), 6 deletions(-) diff --git a/crates/index-scheduler/src/batch.rs b/crates/index-scheduler/src/batch.rs index fe055b185..733984043 100644 --- a/crates/index-scheduler/src/batch.rs +++ b/crates/index-scheduler/src/batch.rs @@ -50,7 +50,11 @@ use uuid::Uuid; use crate::autobatcher::{self, BatchKind}; use crate::processing::{ - AtomicBatchStep, AtomicTaskStep, TaskCancelationProgress, TaskDeletionProgress, + AtomicBatchStep, AtomicDocumentStep, AtomicTaskStep, AtomicUpdateFileStep, CreateIndexProgress, + DeleteIndexProgress, DocumentDeletionProgress, DocumentEditionProgress, + DocumentOperationProgress, DumpCreationProgress, InnerSwappingTwoIndexes, SettingsProgress, + SnapshotCreationProgress, SwappingTheIndexes, TaskCancelationProgress, TaskDeletionProgress, + UpdateIndexProgress, VariableNameStep, }; use crate::utils::{self, swap_index_uid_in_task, ProcessingBatch}; use crate::{Error, IndexScheduler, Result, TaskId}; @@ -651,6 +655,8 @@ impl IndexScheduler { Ok(tasks) } Batch::SnapshotCreation(mut tasks) => { + progress.update_progress(SnapshotCreationProgress::StartTheSnapshotCreation); + fs::create_dir_all(&self.snapshots_path)?; let temp_snapshot_dir = tempfile::tempdir()?; @@ -671,6 +677,7 @@ impl IndexScheduler { // two read operations as the task processing is synchronous. // 2.1 First copy the LMDB env of the index-scheduler + progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler); let dst = temp_snapshot_dir.path().join("tasks"); fs::create_dir_all(&dst)?; self.env.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?; @@ -683,6 +690,11 @@ impl IndexScheduler { fs::create_dir_all(&update_files_dir)?; // 2.4 Only copy the update files of the enqueued tasks + progress.update_progress(SnapshotCreationProgress::SnapshotTheUpdateFiles); + let enqueued = self.get_status(&rtxn, Status::Enqueued)?; + let (atomic, update_file_progress) = + AtomicUpdateFileStep::new(enqueued.len() as u32); + progress.update_progress(update_file_progress); for task_id in self.get_status(&rtxn, Status::Enqueued)? { let task = self.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; if let Some(content_uuid) = task.content_uuid() { @@ -690,11 +702,17 @@ impl IndexScheduler { let dst = update_files_dir.join(content_uuid.to_string()); fs::copy(src, dst)?; } + atomic.fetch_add(1, Ordering::Relaxed); } // 3. Snapshot every indexes - for result in self.index_mapper.index_mapping.iter(&rtxn)? { + progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexes); + let index_mapping = self.index_mapper.index_mapping; + let nb_indexes = index_mapping.len(&rtxn)? as u32; + + for (i, result) in index_mapping.iter(&rtxn)?.enumerate() { let (name, uuid) = result?; + progress.update_progress(VariableNameStep::new(name, i as u32, nb_indexes)); let index = self.index_mapper.index(&rtxn, name)?; let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string()); fs::create_dir_all(&dst)?; @@ -706,6 +724,7 @@ impl IndexScheduler { drop(rtxn); // 4. Snapshot the auth LMDB env + progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys); let dst = temp_snapshot_dir.path().join("auth"); fs::create_dir_all(&dst)?; // TODO We can't use the open_auth_store_env function here but we should @@ -718,6 +737,7 @@ impl IndexScheduler { auth.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?; // 5. Copy and tarball the flat snapshot + progress.update_progress(SnapshotCreationProgress::CreateTheTarball); // 5.1 Find the original name of the database // TODO find a better way to get this path let mut base_path = self.env.path().to_owned(); @@ -750,6 +770,7 @@ impl IndexScheduler { Ok(tasks) } Batch::Dump(mut task) => { + progress.update_progress(DumpCreationProgress::StartTheDumpCreation); let started_at = OffsetDateTime::now_utc(); let (keys, instance_uid) = if let KindWithContent::DumpCreation { keys, instance_uid } = &task.kind { @@ -760,6 +781,7 @@ impl IndexScheduler { let dump = dump::DumpWriter::new(*instance_uid)?; // 1. dump the keys + progress.update_progress(DumpCreationProgress::DumpTheApiKeys); let mut dump_keys = dump.create_keys()?; for key in keys { dump_keys.push_key(key)?; @@ -769,7 +791,13 @@ impl IndexScheduler { let rtxn = self.env.read_txn()?; // 2. dump the tasks + progress.update_progress(DumpCreationProgress::DumpTheTasks); let mut dump_tasks = dump.create_tasks_queue()?; + + let (atomic, update_task_progress) = + AtomicTaskStep::new(self.all_tasks.len(&rtxn)? as u32); + progress.update_progress(update_task_progress); + for ret in self.all_tasks.iter(&rtxn)? { if self.must_stop_processing.get() { return Err(Error::AbortedTask); @@ -819,11 +847,22 @@ impl IndexScheduler { dump_content_file.flush()?; } } + atomic.fetch_add(1, Ordering::Relaxed); } dump_tasks.flush()?; // 3. Dump the indexes + progress.update_progress(DumpCreationProgress::DumpTheIndexes); + let nb_indexes = self.index_mapper.index_mapping.len(&rtxn)? as u32; + let mut count = 0; self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> { + progress.update_progress(VariableNameStep::new( + uid.to_string(), + count, + nb_indexes, + )); + count += 1; + let rtxn = index.read_txn()?; let metadata = IndexMetadata { uid: uid.to_owned(), @@ -843,6 +882,12 @@ impl IndexScheduler { .embedding_configs(&rtxn) .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; + let nb_documents = index + .number_of_documents(&rtxn) + .map_err(|e| Error::from_milli(e, Some(uid.to_string())))? + as u32; + let (atomic, update_document_progress) = AtomicDocumentStep::new(nb_documents); + progress.update_progress(update_document_progress); let documents = index .all_documents(&rtxn) .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; @@ -912,6 +957,7 @@ impl IndexScheduler { } index_dumper.push_document(&document)?; + atomic.fetch_add(1, Ordering::Relaxed); } // 3.2. Dump the settings @@ -926,6 +972,7 @@ impl IndexScheduler { })?; // 4. Dump experimental feature settings + progress.update_progress(DumpCreationProgress::DumpTheExperimentalFeatures); let features = self.features().runtime_features(); dump.create_experimental_features(features)?; @@ -936,6 +983,7 @@ impl IndexScheduler { if self.must_stop_processing.get() { return Err(Error::AbortedTask); } + progress.update_progress(DumpCreationProgress::CompressTheDump); let path = self.dumps_path.join(format!("{}.dump", dump_uid)); let file = File::create(path)?; dump.persist_to(BufWriter::new(file))?; @@ -995,6 +1043,8 @@ impl IndexScheduler { Ok(tasks) } Batch::IndexCreation { index_uid, primary_key, task } => { + progress.update_progress(CreateIndexProgress::CreatingTheIndex); + let wtxn = self.env.write_txn()?; if self.index_mapper.exists(&wtxn, &index_uid)? { return Err(Error::IndexAlreadyExists(index_uid)); @@ -1008,6 +1058,7 @@ impl IndexScheduler { ) } Batch::IndexUpdate { index_uid, primary_key, mut task } => { + progress.update_progress(UpdateIndexProgress::UpdatingTheIndex); let rtxn = self.env.read_txn()?; let index = self.index_mapper.index(&rtxn, &index_uid)?; @@ -1060,6 +1111,7 @@ impl IndexScheduler { Ok(vec![task]) } Batch::IndexDeletion { index_uid, index_has_been_created, mut tasks } => { + progress.update_progress(DeleteIndexProgress::DeletingTheIndex); let wtxn = self.env.write_txn()?; // it's possible that the index doesn't exist @@ -1093,6 +1145,8 @@ impl IndexScheduler { Ok(tasks) } Batch::IndexSwap { mut task } => { + progress.update_progress(SwappingTheIndexes::EnsuringCorrectnessOfTheSwap); + let mut wtxn = self.env.write_txn()?; let swaps = if let KindWithContent::IndexSwap { swaps } = &task.kind { swaps @@ -1119,8 +1173,20 @@ impl IndexScheduler { )); } } - for swap in swaps { - self.apply_index_swap(&mut wtxn, task.uid, &swap.indexes.0, &swap.indexes.1)?; + progress.update_progress(SwappingTheIndexes::SwappingTheIndexes); + for (step, swap) in swaps.iter().enumerate() { + progress.update_progress(VariableNameStep::new( + format!("swapping index {} and {}", swap.indexes.0, swap.indexes.1), + step as u32, + swaps.len() as u32, + )); + self.apply_index_swap( + &mut wtxn, + &progress, + task.uid, + &swap.indexes.0, + &swap.indexes.1, + )?; } wtxn.commit()?; task.status = Status::Succeeded; @@ -1130,7 +1196,15 @@ impl IndexScheduler { } /// Swap the index `lhs` with the index `rhs`. - fn apply_index_swap(&self, wtxn: &mut RwTxn, task_id: u32, lhs: &str, rhs: &str) -> Result<()> { + fn apply_index_swap( + &self, + wtxn: &mut RwTxn, + progress: &Progress, + task_id: u32, + lhs: &str, + rhs: &str, + ) -> Result<()> { + progress.update_progress(InnerSwappingTwoIndexes::RetrieveTheTasks); // 1. Verify that both lhs and rhs are existing indexes let index_lhs_exists = self.index_mapper.index_exists(wtxn, lhs)?; if !index_lhs_exists { @@ -1148,14 +1222,21 @@ impl IndexScheduler { index_rhs_task_ids.remove_range(task_id..); // 3. before_name -> new_name in the task's KindWithContent - for task_id in &index_lhs_task_ids | &index_rhs_task_ids { + progress.update_progress(InnerSwappingTwoIndexes::UpdateTheTasks); + let tasks_to_update = &index_lhs_task_ids | &index_rhs_task_ids; + let (atomic, task_progress) = AtomicTaskStep::new(tasks_to_update.len() as u32); + progress.update_progress(task_progress); + + for task_id in tasks_to_update { let mut task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; swap_index_uid_in_task(&mut task, (lhs, rhs)); self.all_tasks.put(wtxn, &task_id, &task)?; + atomic.fetch_add(1, Ordering::Relaxed); } // 4. remove the task from indexuid = before_name // 5. add the task to indexuid = after_name + progress.update_progress(InnerSwappingTwoIndexes::UpdateTheIndexesMetadata); self.update_index(wtxn, lhs, |lhs_tasks| { *lhs_tasks -= &index_lhs_task_ids; *lhs_tasks |= &index_rhs_task_ids; @@ -1222,6 +1303,7 @@ impl IndexScheduler { operations, mut tasks, } => { + progress.update_progress(DocumentOperationProgress::RetrievingConfig); // TODO: at some point, for better efficiency we might want to reuse the bumpalo for successive batches. // this is made difficult by the fact we're doing private clones of the index scheduler and sending it // to a fresh thread. @@ -1277,6 +1359,7 @@ impl IndexScheduler { } }; + progress.update_progress(DocumentOperationProgress::ComputingTheChanges); let (document_changes, operation_stats, primary_key) = indexer .into_changes( &indexer_alloc, @@ -1321,6 +1404,7 @@ impl IndexScheduler { } } + progress.update_progress(DocumentOperationProgress::Indexing); if tasks.iter().any(|res| res.error.is_none()) { indexer::index( index_wtxn, @@ -1350,6 +1434,8 @@ impl IndexScheduler { Ok(tasks) } IndexOperation::DocumentEdition { index_uid, mut task } => { + progress.update_progress(DocumentEditionProgress::RetrievingConfig); + let (filter, code) = if let KindWithContent::DocumentEdition { filter_expr, context: _, @@ -1423,6 +1509,7 @@ impl IndexScheduler { }; let candidates_count = candidates.len(); + progress.update_progress(DocumentEditionProgress::ComputingTheChanges); let indexer = UpdateByFunction::new(candidates, context.clone(), code.clone()); let document_changes = pool .install(|| { @@ -1436,6 +1523,7 @@ impl IndexScheduler { .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; let embedders = self.embedders(index_uid.clone(), embedders)?; + progress.update_progress(DocumentEditionProgress::Indexing); indexer::index( index_wtxn, index, @@ -1488,6 +1576,8 @@ impl IndexScheduler { Ok(vec![task]) } IndexOperation::DocumentDeletion { mut tasks, index_uid } => { + progress.update_progress(DocumentDeletionProgress::RetrievingConfig); + let mut to_delete = RoaringBitmap::new(); let external_documents_ids = index.external_documents_ids(); @@ -1578,6 +1668,7 @@ impl IndexScheduler { } }; + progress.update_progress(DocumentDeletionProgress::DeleteDocuments); let mut indexer = indexer::DocumentDeletion::new(); let candidates_count = to_delete.len(); indexer.delete_documents_by_docids(to_delete); @@ -1587,6 +1678,7 @@ impl IndexScheduler { .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; let embedders = self.embedders(index_uid.clone(), embedders)?; + progress.update_progress(DocumentDeletionProgress::Indexing); indexer::index( index_wtxn, index, @@ -1615,6 +1707,7 @@ impl IndexScheduler { Ok(tasks) } IndexOperation::Settings { index_uid, settings, mut tasks } => { + progress.update_progress(SettingsProgress::RetrievingAndMergingTheSettings); let indexer_config = self.index_mapper.indexer_config(); let mut builder = milli::update::Settings::new(index_wtxn, index, indexer_config); @@ -1628,6 +1721,7 @@ impl IndexScheduler { task.status = Status::Succeeded; } + progress.update_progress(SettingsProgress::ApplyTheSettings); builder .execute( |indexing_step| tracing::debug!(update = ?indexing_step), diff --git a/crates/index-scheduler/src/processing.rs b/crates/index-scheduler/src/processing.rs index f28fa0219..479b6274f 100644 --- a/crates/index-scheduler/src/processing.rs +++ b/crates/index-scheduler/src/processing.rs @@ -119,8 +119,111 @@ make_enum_progress! { - DeletingBatches } +make_enum_progress! { + enum SnapshotCreationProgress: + - StartTheSnapshotCreation + - SnapshotTheIndexScheduler + - SnapshotTheUpdateFiles + - SnapshotTheIndexes + - SnapshotTheApiKeys + - CreateTheTarball +} + +make_enum_progress! { + enum DumpCreationProgress: + - StartTheDumpCreation + - DumpTheApiKeys + - DumpTheTasks + - DumpTheIndexes + - DumpTheExperimentalFeatures + - CompressTheDump +} + +make_enum_progress! { + enum CreateIndexProgress: + - CreatingTheIndex +} + +make_enum_progress! { + enum UpdateIndexProgress: + - UpdatingTheIndex +} + +make_enum_progress! { + enum DeleteIndexProgress: + - DeletingTheIndex +} + +make_enum_progress! { + enum SwappingTheIndexes: + - EnsuringCorrectnessOfTheSwap + - SwappingTheIndexes +} + +make_enum_progress! { + enum InnerSwappingTwoIndexes: + - RetrieveTheTasks + - UpdateTheTasks + - UpdateTheIndexesMetadata +} + +make_enum_progress! { + enum DocumentOperationProgress: + - RetrievingConfig + - ComputingTheChanges + - Indexing +} + +make_enum_progress! { + enum DocumentEditionProgress: + - RetrievingConfig + - ComputingTheChanges + - Indexing +} + +make_enum_progress! { + enum DocumentDeletionProgress: + - RetrievingConfig + - DeleteDocuments + - Indexing +} + +make_enum_progress! { + enum SettingsProgress: + - RetrievingAndMergingTheSettings + - ApplyTheSettings +} + make_atomic_progress!(Task alias AtomicTaskStep => "task" ); +make_atomic_progress!(Document alias AtomicDocumentStep => "document" ); make_atomic_progress!(Batch alias AtomicBatchStep => "batch" ); +make_atomic_progress!(UpdateFile alias AtomicUpdateFileStep => "update file" ); + +pub struct VariableNameStep { + name: String, + current: u32, + total: u32, +} + +impl VariableNameStep { + pub fn new(name: impl Into, current: u32, total: u32) -> Self { + Self { name: name.into(), current, total } + } +} + +impl Step for VariableNameStep { + fn name(&self) -> Cow<'static, str> { + self.name.clone().into() + } + + fn current(&self) -> u32 { + self.current + } + + fn total(&self) -> u32 { + self.total + } +} #[cfg(test)] mod test {