5420: Add support for the progress API of arroy r=Kerollmops a=irevoire

# Pull Request

## Related issue
Fixes https://github.com/meilisearch/meilisearch/issues/5419

## What does this PR do?
- Convert the arroy progress to the meilisearch progress
- Use the new arroy closure to support the progress of arroy


Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2025-03-13 18:03:08 +00:00 committed by GitHub
commit 2a46624e19
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 67 additions and 4 deletions

7
Cargo.lock generated
View File

@ -393,12 +393,13 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711"
[[package]]
name = "arroy"
version = "0.6.0"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a885313dfac15b64fd61a39d1970a2befa076c69a763434117c5b6163f9fecb"
checksum = "08e6111f351d004bd13e95ab540721272136fd3218b39d3ec95a2ea1c4e6a0a6"
dependencies = [
"bytemuck",
"byteorder",
"enum-iterator",
"heed",
"memmap2",
"nohash",
@ -3017,7 +3018,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
dependencies = [
"cfg-if",
"windows-targets 0.48.1",
"windows-targets 0.52.6",
]
[[package]]

View File

@ -87,7 +87,7 @@ rhai = { git = "https://github.com/rhaiscript/rhai", rev = "ef3df63121d27aacd838
"no_time",
"sync",
] }
arroy = "0.6.0"
arroy = "0.6.1"
rand = "0.8.5"
tracing = "0.1.41"
ureq = { version = "2.12.1", features = ["json"] }

View File

@ -1,3 +1,4 @@
use enum_iterator::Sequence;
use std::any::TypeId;
use std::borrow::Cow;
use std::marker::PhantomData;
@ -76,6 +77,14 @@ impl Progress {
durations.drain(..).map(|(name, duration)| (name, format!("{duration:.2?}"))).collect()
}
// TODO: ideally we should expose the progress in a way that let arroy use it directly
pub(crate) fn update_progress_from_arroy(&self, progress: arroy::WriterProgress) {
self.update_progress(progress.main);
if let Some(sub) = progress.sub {
self.update_progress(sub);
}
}
}
/// Generate the names associated with the durations and push them.
@ -238,3 +247,44 @@ impl<U: Send + Sync + 'static> Step for VariableNameStep<U> {
self.total
}
}
impl Step for arroy::MainStep {
fn name(&self) -> Cow<'static, str> {
match self {
arroy::MainStep::PreProcessingTheItems => "pre processing the items",
arroy::MainStep::WritingTheDescendantsAndMetadata => {
"writing the descendants and metadata"
}
arroy::MainStep::RetrieveTheUpdatedItems => "retrieve the updated items",
arroy::MainStep::RetrievingTheTreeAndItemNodes => "retrieving the tree and item nodes",
arroy::MainStep::UpdatingTheTrees => "updating the trees",
arroy::MainStep::CreateNewTrees => "create new trees",
arroy::MainStep::WritingNodesToDatabase => "writing nodes to database",
arroy::MainStep::DeleteExtraneousTrees => "delete extraneous trees",
arroy::MainStep::WriteTheMetadata => "write the metadata",
}
.into()
}
fn current(&self) -> u32 {
*self as u32
}
fn total(&self) -> u32 {
Self::CARDINALITY as u32
}
}
impl Step for arroy::SubStep {
fn name(&self) -> Cow<'static, str> {
self.unit.into()
}
fn current(&self) -> u32 {
self.current.load(Ordering::Relaxed)
}
fn total(&self) -> u32 {
self.max
}
}

View File

@ -31,6 +31,7 @@ use super::new::StdResult;
use crate::documents::{obkv_to_object, DocumentsBatchReader};
use crate::error::{Error, InternalError};
use crate::index::{PrefixSearch, PrefixSettings};
use crate::progress::Progress;
use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder;
pub use crate::update::index_documents::helpers::CursorClonableMmap;
use crate::update::{
@ -522,6 +523,8 @@ where
let mut writer = ArroyWrapper::new(vector_arroy, embedder_index, was_quantized);
writer.build_and_quantize(
wtxn,
// In the settings we don't have any progress to share
&Progress::default(),
&mut rng,
dimension,
is_quantizing,

View File

@ -201,6 +201,7 @@ where
build_vectors(
index,
wtxn,
indexing_context.progress,
index_embeddings,
arroy_memory,
&mut arroy_writers,

View File

@ -10,6 +10,7 @@ use super::super::channel::*;
use crate::documents::PrimaryKey;
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
use crate::index::IndexEmbeddingConfig;
use crate::progress::Progress;
use crate::update::settings::InnerIndexSettings;
use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs, Embeddings};
use crate::{Error, Index, InternalError, Result};
@ -100,6 +101,7 @@ impl ChannelCongestion {
pub fn build_vectors<MSP>(
index: &Index,
wtxn: &mut RwTxn<'_>,
progress: &Progress,
index_embeddings: Vec<IndexEmbeddingConfig>,
arroy_memory: Option<usize>,
arroy_writers: &mut HashMap<u8, (&str, &Embedder, ArroyWrapper, usize)>,
@ -118,6 +120,7 @@ where
let dimensions = *dimensions;
writer.build_and_quantize(
wtxn,
progress,
&mut rng,
dimensions,
false,

View File

@ -13,6 +13,7 @@ use serde::{Deserialize, Serialize};
use utoipa::ToSchema;
use self::error::{EmbedError, NewEmbedderError};
use crate::progress::Progress;
use crate::prompt::{Prompt, PromptData};
use crate::ThreadPoolNoAbort;
@ -81,9 +82,11 @@ impl ArroyWrapper {
}
}
#[allow(clippy::too_many_arguments)]
pub fn build_and_quantize<R: rand::Rng + rand::SeedableRng>(
&mut self,
wtxn: &mut RwTxn,
progress: &Progress,
rng: &mut R,
dimension: usize,
quantizing: bool,
@ -110,12 +113,14 @@ impl ArroyWrapper {
writer
.builder(rng)
.available_memory(arroy_memory.unwrap_or(usize::MAX))
.progress(|step| progress.update_progress_from_arroy(step))
.cancel(cancel)
.build(wtxn)?;
} else if writer.need_build(wtxn)? {
writer
.builder(rng)
.available_memory(arroy_memory.unwrap_or(usize::MAX))
.progress(|step| progress.update_progress_from_arroy(step))
.cancel(cancel)
.build(wtxn)?;
} else if writer.is_empty(wtxn)? {