From 009c36a4d01b5862edfb44b54671fb7c7228d771 Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 13 Mar 2025 17:36:49 +0100 Subject: [PATCH] Add support for the progress API of arroy --- Cargo.lock | 7 +-- crates/milli/Cargo.toml | 2 +- crates/milli/src/progress.rs | 50 +++++++++++++++++++ .../milli/src/update/index_documents/mod.rs | 3 ++ crates/milli/src/update/new/indexer/mod.rs | 1 + crates/milli/src/update/new/indexer/write.rs | 3 ++ crates/milli/src/vector/mod.rs | 5 ++ 7 files changed, 67 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ee161b8d2..563863b74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -393,12 +393,13 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "arroy" -version = "0.6.0" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a885313dfac15b64fd61a39d1970a2befa076c69a763434117c5b6163f9fecb" +checksum = "08e6111f351d004bd13e95ab540721272136fd3218b39d3ec95a2ea1c4e6a0a6" dependencies = [ "bytemuck", "byteorder", + "enum-iterator", "heed", "memmap2", "nohash", @@ -3017,7 +3018,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34" dependencies = [ "cfg-if", - "windows-targets 0.48.1", + "windows-targets 0.52.6", ] [[package]] diff --git a/crates/milli/Cargo.toml b/crates/milli/Cargo.toml index 2d7a3ca0c..e3b9b077a 100644 --- a/crates/milli/Cargo.toml +++ b/crates/milli/Cargo.toml @@ -87,7 +87,7 @@ rhai = { git = "https://github.com/rhaiscript/rhai", rev = "ef3df63121d27aacd838 "no_time", "sync", ] } -arroy = "0.6.0" +arroy = "0.6.1" rand = "0.8.5" tracing = "0.1.41" ureq = { version = "2.12.1", features = ["json"] } diff --git a/crates/milli/src/progress.rs b/crates/milli/src/progress.rs index f8cd4b4cc..7eb0cbd6b 100644 --- a/crates/milli/src/progress.rs +++ b/crates/milli/src/progress.rs @@ -1,3 +1,4 @@ +use enum_iterator::Sequence; use std::any::TypeId; use std::borrow::Cow; use std::marker::PhantomData; @@ -76,6 +77,14 @@ impl Progress { durations.drain(..).map(|(name, duration)| (name, format!("{duration:.2?}"))).collect() } + + // TODO: ideally we should expose the progress in a way that let arroy use it directly + pub(crate) fn update_progress_from_arroy(&self, progress: arroy::WriterProgress) { + self.update_progress(progress.main); + if let Some(sub) = progress.sub { + self.update_progress(sub); + } + } } /// Generate the names associated with the durations and push them. @@ -238,3 +247,44 @@ impl Step for VariableNameStep { self.total } } + +impl Step for arroy::MainStep { + fn name(&self) -> Cow<'static, str> { + match self { + arroy::MainStep::PreProcessingTheItems => "pre processing the items", + arroy::MainStep::WritingTheDescendantsAndMetadata => { + "writing the descendants and metadata" + } + arroy::MainStep::RetrieveTheUpdatedItems => "retrieve the updated items", + arroy::MainStep::RetrievingTheTreeAndItemNodes => "retrieving the tree and item nodes", + arroy::MainStep::UpdatingTheTrees => "updating the trees", + arroy::MainStep::CreateNewTrees => "create new trees", + arroy::MainStep::WritingNodesToDatabase => "writing nodes to database", + arroy::MainStep::DeleteExtraneousTrees => "delete extraneous trees", + arroy::MainStep::WriteTheMetadata => "write the metadata", + } + .into() + } + + fn current(&self) -> u32 { + *self as u32 + } + + fn total(&self) -> u32 { + Self::CARDINALITY as u32 + } +} + +impl Step for arroy::SubStep { + fn name(&self) -> Cow<'static, str> { + self.unit.into() + } + + fn current(&self) -> u32 { + self.current.load(Ordering::Relaxed) + } + + fn total(&self) -> u32 { + self.max + } +} diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index dbbf58e4a..95342054d 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -31,6 +31,7 @@ use super::new::StdResult; use crate::documents::{obkv_to_object, DocumentsBatchReader}; use crate::error::{Error, InternalError}; use crate::index::{PrefixSearch, PrefixSettings}; +use crate::progress::Progress; use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder; pub use crate::update::index_documents::helpers::CursorClonableMmap; use crate::update::{ @@ -522,6 +523,8 @@ where let mut writer = ArroyWrapper::new(vector_arroy, embedder_index, was_quantized); writer.build_and_quantize( wtxn, + // In the settings we don't have any progress to share + &Progress::default(), &mut rng, dimension, is_quantizing, diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index d002317ca..4f2dd19c9 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -201,6 +201,7 @@ where build_vectors( index, wtxn, + indexing_context.progress, index_embeddings, arroy_memory, &mut arroy_writers, diff --git a/crates/milli/src/update/new/indexer/write.rs b/crates/milli/src/update/new/indexer/write.rs index d3fa5e182..8618b4b21 100644 --- a/crates/milli/src/update/new/indexer/write.rs +++ b/crates/milli/src/update/new/indexer/write.rs @@ -10,6 +10,7 @@ use super::super::channel::*; use crate::documents::PrimaryKey; use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; use crate::index::IndexEmbeddingConfig; +use crate::progress::Progress; use crate::update::settings::InnerIndexSettings; use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs, Embeddings}; use crate::{Error, Index, InternalError, Result}; @@ -100,6 +101,7 @@ impl ChannelCongestion { pub fn build_vectors( index: &Index, wtxn: &mut RwTxn<'_>, + progress: &Progress, index_embeddings: Vec, arroy_memory: Option, arroy_writers: &mut HashMap, @@ -118,6 +120,7 @@ where let dimensions = *dimensions; writer.build_and_quantize( wtxn, + progress, &mut rng, dimensions, false, diff --git a/crates/milli/src/vector/mod.rs b/crates/milli/src/vector/mod.rs index 3f85f636c..88e871568 100644 --- a/crates/milli/src/vector/mod.rs +++ b/crates/milli/src/vector/mod.rs @@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize}; use utoipa::ToSchema; use self::error::{EmbedError, NewEmbedderError}; +use crate::progress::Progress; use crate::prompt::{Prompt, PromptData}; use crate::ThreadPoolNoAbort; @@ -81,9 +82,11 @@ impl ArroyWrapper { } } + #[allow(clippy::too_many_arguments)] pub fn build_and_quantize( &mut self, wtxn: &mut RwTxn, + progress: &Progress, rng: &mut R, dimension: usize, quantizing: bool, @@ -110,12 +113,14 @@ impl ArroyWrapper { writer .builder(rng) .available_memory(arroy_memory.unwrap_or(usize::MAX)) + .progress(|step| progress.update_progress_from_arroy(step)) .cancel(cancel) .build(wtxn)?; } else if writer.need_build(wtxn)? { writer .builder(rng) .available_memory(arroy_memory.unwrap_or(usize::MAX)) + .progress(|step| progress.update_progress_from_arroy(step)) .cancel(cancel) .build(wtxn)?; } else if writer.is_empty(wtxn)? {