Add embedder stats in batches

This commit is contained in:
Mubelotix 2025-06-20 12:42:22 +02:00
parent fc6cc80705
commit 4cadc8113b
No known key found for this signature in database
GPG key ID: 89F391DBCC8CE7F0
26 changed files with 188 additions and 73 deletions

View file

@ -17,6 +17,7 @@ use crate::constants::RESERVED_VECTORS_FIELD_NAME;
use crate::error::FaultSource;
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
use crate::index::IndexEmbeddingConfig;
use crate::progress::EmbedderStats;
use crate::prompt::Prompt;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::settings::InnerIndexSettingsDiff;
@ -682,6 +683,7 @@ pub fn extract_embeddings<R: io::Read + io::Seek>(
embedder: Arc<Embedder>,
embedder_name: &str,
possible_embedding_mistakes: &PossibleEmbeddingMistakes,
embedder_stats: Option<Arc<EmbedderStats>>,
unused_vectors_distribution: &UnusedVectorsDistribution,
request_threads: &ThreadPoolNoAbort,
) -> Result<grenad::Reader<BufReader<File>>> {
@ -724,6 +726,7 @@ pub fn extract_embeddings<R: io::Read + io::Seek>(
std::mem::replace(&mut chunks, Vec::with_capacity(n_chunks)),
embedder_name,
possible_embedding_mistakes,
embedder_stats.clone(),
unused_vectors_distribution,
request_threads,
)?;
@ -746,6 +749,7 @@ pub fn extract_embeddings<R: io::Read + io::Seek>(
std::mem::take(&mut chunks),
embedder_name,
possible_embedding_mistakes,
embedder_stats.clone(),
unused_vectors_distribution,
request_threads,
)?;
@ -764,6 +768,7 @@ pub fn extract_embeddings<R: io::Read + io::Seek>(
vec![std::mem::take(&mut current_chunk)],
embedder_name,
possible_embedding_mistakes,
embedder_stats,
unused_vectors_distribution,
request_threads,
)?;
@ -783,10 +788,11 @@ fn embed_chunks(
text_chunks: Vec<Vec<String>>,
embedder_name: &str,
possible_embedding_mistakes: &PossibleEmbeddingMistakes,
embedder_stats: Option<Arc<EmbedderStats>>,
unused_vectors_distribution: &UnusedVectorsDistribution,
request_threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<Embedding>>> {
match embedder.embed_index(text_chunks, request_threads) {
match embedder.embed_index(text_chunks, request_threads, embedder_stats) {
Ok(chunks) => Ok(chunks),
Err(error) => {
if let FaultSource::Bug = error.fault {

View file

@ -31,6 +31,7 @@ use self::extract_word_position_docids::extract_word_position_docids;
use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters};
use super::{helpers, TypedChunk};
use crate::index::IndexEmbeddingConfig;
use crate::progress::EmbedderStats;
use crate::update::settings::InnerIndexSettingsDiff;
use crate::vector::error::PossibleEmbeddingMistakes;
use crate::{FieldId, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
@ -49,6 +50,7 @@ pub(crate) fn data_from_obkv_documents(
settings_diff: Arc<InnerIndexSettingsDiff>,
max_positions_per_attributes: Option<u32>,
possible_embedding_mistakes: Arc<PossibleEmbeddingMistakes>,
embedder_stats: Option<Arc<EmbedderStats>>,
) -> Result<()> {
let (original_pipeline_result, flattened_pipeline_result): (Result<_>, Result<_>) = rayon::join(
|| {
@ -62,6 +64,7 @@ pub(crate) fn data_from_obkv_documents(
embedders_configs.clone(),
settings_diff.clone(),
possible_embedding_mistakes.clone(),
embedder_stats.clone(),
)
})
.collect::<Result<()>>()
@ -231,6 +234,7 @@ fn send_original_documents_data(
embedders_configs: Arc<Vec<IndexEmbeddingConfig>>,
settings_diff: Arc<InnerIndexSettingsDiff>,
possible_embedding_mistakes: Arc<PossibleEmbeddingMistakes>,
embedder_stats: Option<Arc<EmbedderStats>>,
) -> Result<()> {
let original_documents_chunk =
original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
@ -270,6 +274,7 @@ fn send_original_documents_data(
embedder.clone(),
&embedder_name,
&possible_embedding_mistakes,
embedder_stats.clone(),
&unused_vectors_distribution,
request_threads(),
) {

View file

@ -32,7 +32,7 @@ use crate::database_stats::DatabaseStats;
use crate::documents::{obkv_to_object, DocumentsBatchReader};
use crate::error::{Error, InternalError};
use crate::index::{PrefixSearch, PrefixSettings};
use crate::progress::Progress;
use crate::progress::{EmbedderStats, Progress};
pub use crate::update::index_documents::helpers::CursorClonableMmap;
use crate::update::{
IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst,
@ -81,6 +81,7 @@ pub struct IndexDocuments<'t, 'i, 'a, FP, FA> {
added_documents: u64,
deleted_documents: u64,
embedders: EmbeddingConfigs,
embedder_stats: Option<Arc<EmbedderStats>>,
}
#[derive(Default, Debug, Clone)]
@ -103,6 +104,7 @@ where
config: IndexDocumentsConfig,
progress: FP,
should_abort: FA,
embedder_stats: Option<Arc<EmbedderStats>>,
) -> Result<IndexDocuments<'t, 'i, 'a, FP, FA>> {
let transform = Some(Transform::new(
wtxn,
@ -123,6 +125,7 @@ where
added_documents: 0,
deleted_documents: 0,
embedders: Default::default(),
embedder_stats,
})
}
@ -292,6 +295,7 @@ where
// Run extraction pipeline in parallel.
let mut modified_docids = RoaringBitmap::new();
let embedder_stats = self.embedder_stats.clone();
pool.install(|| {
let settings_diff_cloned = settings_diff.clone();
rayon::spawn(move || {
@ -326,7 +330,8 @@ where
embedders_configs.clone(),
settings_diff_cloned,
max_positions_per_attributes,
Arc::new(possible_embedding_mistakes)
Arc::new(possible_embedding_mistakes),
embedder_stats.clone()
)
});

View file

@ -450,7 +450,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
return Err(crate::Error::UserError(crate::UserError::DocumentEmbeddingError(msg)));
}
let res = match embedder.embed_index_ref(texts.as_slice(), threads) {
let res = match embedder.embed_index_ref(texts.as_slice(), threads, None) {
Ok(embeddings) => {
for (docid, embedding) in ids.into_iter().zip(embeddings) {
sender.set_vector(*docid, embedder_id, embedding).unwrap();

View file

@ -27,6 +27,7 @@ use crate::index::{
DEFAULT_MIN_WORD_LEN_ONE_TYPO, DEFAULT_MIN_WORD_LEN_TWO_TYPOS,
};
use crate::order_by_map::OrderByMap;
use crate::progress::EmbedderStats;
use crate::prompt::{default_max_bytes, default_template_text, PromptData};
use crate::proximity::ProximityPrecision;
use crate::update::index_documents::IndexDocumentsMethod;
@ -466,7 +467,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
#[tracing::instrument(
level = "trace"
skip(self, progress_callback, should_abort, settings_diff),
skip(self, progress_callback, should_abort, settings_diff, embedder_stats),
target = "indexing::documents"
)]
fn reindex<FP, FA>(
@ -474,6 +475,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
progress_callback: &FP,
should_abort: &FA,
settings_diff: InnerIndexSettingsDiff,
embedder_stats: Option<Arc<EmbedderStats>>,
) -> Result<()>
where
FP: Fn(UpdateIndexingStep) + Sync,
@ -505,6 +507,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
IndexDocumentsConfig::default(),
&progress_callback,
&should_abort,
embedder_stats,
)?;
indexing_builder.execute_raw(output)?;
@ -1355,7 +1358,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
}
}
pub fn execute<FP, FA>(mut self, progress_callback: FP, should_abort: FA) -> Result<()>
pub fn execute<FP, FA>(mut self, progress_callback: FP, should_abort: FA, embedder_stats: Option<Arc<EmbedderStats>>) -> Result<()>
where
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,
@ -1413,7 +1416,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
);
if inner_settings_diff.any_reindexing_needed() {
self.reindex(&progress_callback, &should_abort, inner_settings_diff)?;
self.reindex(&progress_callback, &should_abort, inner_settings_diff, embedder_stats)?;
}
Ok(())