Re-integrate embedder stats

This commit is contained in:
ManyTheFish 2025-06-30 09:46:19 +02:00
parent d35b2d8d33
commit 6db5939f84
17 changed files with 45 additions and 23 deletions

View File

@ -65,7 +65,7 @@ fn setup_settings<'t>(
let sortable_fields = sortable_fields.iter().map(|s| s.to_string()).collect(); let sortable_fields = sortable_fields.iter().map(|s| s.to_string()).collect();
builder.set_sortable_fields(sortable_fields); builder.set_sortable_fields(sortable_fields);
builder.execute(&|| false, &Progress::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
} }
fn setup_index_with_settings( fn setup_index_with_settings(

View File

@ -90,7 +90,7 @@ pub fn base_setup(conf: &Conf) -> Index {
(conf.configure)(&mut builder); (conf.configure)(&mut builder);
builder.execute(&|| false, &Progress::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
let config = IndexerConfig::default(); let config = IndexerConfig::default();

View File

@ -245,7 +245,11 @@ impl IndexScheduler {
let must_stop_processing = self.scheduler.must_stop_processing.clone(); let must_stop_processing = self.scheduler.must_stop_processing.clone();
builder builder
.execute(&|| must_stop_processing.get(), &progress) .execute(
&|| must_stop_processing.get(),
&progress,
current_batch.embedder_stats.clone(),
)
.map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?; .map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?;
index_wtxn.commit()?; index_wtxn.commit()?;
} }

View File

@ -475,7 +475,7 @@ impl IndexScheduler {
progress.update_progress(SettingsProgress::ApplyTheSettings); progress.update_progress(SettingsProgress::ApplyTheSettings);
let congestion = builder let congestion = builder
.execute(&|| must_stop_processing.get(), progress) .execute(&|| must_stop_processing.get(), progress, embedder_stats)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
Ok((tasks, congestion)) Ok((tasks, congestion))

View File

@ -37,7 +37,7 @@ use index_scheduler::{IndexScheduler, IndexSchedulerOptions};
use meilisearch_auth::{open_auth_store_env, AuthController}; use meilisearch_auth::{open_auth_store_env, AuthController};
use meilisearch_types::milli::constants::VERSION_MAJOR; use meilisearch_types::milli::constants::VERSION_MAJOR;
use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
use meilisearch_types::milli::progress::Progress; use meilisearch_types::milli::progress::{EmbedderStats, Progress};
use meilisearch_types::milli::update::{ use meilisearch_types::milli::update::{
default_thread_pool_and_threads, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, default_thread_pool_and_threads, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig,
}; };
@ -544,7 +544,8 @@ fn import_dump(
tracing::info!("Importing the settings."); tracing::info!("Importing the settings.");
let settings = index_reader.settings()?; let settings = index_reader.settings()?;
apply_settings_to_builder(&settings, &mut builder); apply_settings_to_builder(&settings, &mut builder);
builder.execute(&|| false, &progress)?; let embedder_stats: Arc<EmbedderStats> = Default::default();
builder.execute(&|| false, &progress, embedder_stats.clone())?;
// 4.3 Import the documents. // 4.3 Import the documents.
// 4.3.1 We need to recreate the grenad+obkv format accepted by the index. // 4.3.1 We need to recreate the grenad+obkv format accepted by the index.

View File

@ -44,7 +44,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
S("america") => vec![S("the united states")], S("america") => vec![S("the united states")],
}); });
builder.set_searchable_fields(vec![S("title"), S("description")]); builder.set_searchable_fields(vec![S("title"), S("description")]);
builder.execute(&|| false, &Progress::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// index documents // index documents

View File

@ -135,7 +135,7 @@ impl TempIndex {
) -> Result<(), crate::error::Error> { ) -> Result<(), crate::error::Error> {
let mut builder = update::Settings::new(wtxn, &self.inner, &self.indexer_config); let mut builder = update::Settings::new(wtxn, &self.inner, &self.indexer_config);
update(&mut builder); update(&mut builder);
builder.execute(&|| false, &Progress::default())?; builder.execute(&|| false, &Progress::default(), Default::default())?;
Ok(()) Ok(())
} }

View File

@ -303,6 +303,7 @@ pub struct SettingsChangeEmbeddingExtractor<'a, 'b> {
old_embedders: &'a EmbeddingConfigs, old_embedders: &'a EmbeddingConfigs,
embedder_actions: &'a BTreeMap<String, EmbedderAction>, embedder_actions: &'a BTreeMap<String, EmbedderAction>,
embedder_category_id: &'a std::collections::HashMap<String, u8>, embedder_category_id: &'a std::collections::HashMap<String, u8>,
embedder_stats: &'a EmbedderStats,
sender: EmbeddingSender<'a, 'b>, sender: EmbeddingSender<'a, 'b>,
possible_embedding_mistakes: PossibleEmbeddingMistakes, possible_embedding_mistakes: PossibleEmbeddingMistakes,
threads: &'a ThreadPoolNoAbort, threads: &'a ThreadPoolNoAbort,
@ -314,6 +315,7 @@ impl<'a, 'b> SettingsChangeEmbeddingExtractor<'a, 'b> {
old_embedders: &'a EmbeddingConfigs, old_embedders: &'a EmbeddingConfigs,
embedder_actions: &'a BTreeMap<String, EmbedderAction>, embedder_actions: &'a BTreeMap<String, EmbedderAction>,
embedder_category_id: &'a std::collections::HashMap<String, u8>, embedder_category_id: &'a std::collections::HashMap<String, u8>,
embedder_stats: &'a EmbedderStats,
sender: EmbeddingSender<'a, 'b>, sender: EmbeddingSender<'a, 'b>,
field_distribution: &'a FieldDistribution, field_distribution: &'a FieldDistribution,
threads: &'a ThreadPoolNoAbort, threads: &'a ThreadPoolNoAbort,
@ -324,6 +326,7 @@ impl<'a, 'b> SettingsChangeEmbeddingExtractor<'a, 'b> {
old_embedders, old_embedders,
embedder_actions, embedder_actions,
embedder_category_id, embedder_category_id,
embedder_stats,
sender, sender,
threads, threads,
possible_embedding_mistakes, possible_embedding_mistakes,
@ -371,6 +374,7 @@ impl<'extractor> SettingsChangeExtractor<'extractor> for SettingsChangeEmbedding
prompt, prompt,
context.data, context.data,
&self.possible_embedding_mistakes, &self.possible_embedding_mistakes,
self.embedder_stats,
self.threads, self.threads,
self.sender, self.sender,
&context.doc_alloc, &context.doc_alloc,

View File

@ -333,6 +333,7 @@ pub(super) fn extract_all_settings_changes<MSP, SD>(
field_distribution: &mut BTreeMap<String, u64>, field_distribution: &mut BTreeMap<String, u64>,
mut index_embeddings: Vec<IndexEmbeddingConfig>, mut index_embeddings: Vec<IndexEmbeddingConfig>,
modified_docids: &mut RoaringBitmap, modified_docids: &mut RoaringBitmap,
embedder_stats: &EmbedderStats,
) -> Result<Vec<IndexEmbeddingConfig>> ) -> Result<Vec<IndexEmbeddingConfig>>
where where
MSP: Fn() -> bool + Sync, MSP: Fn() -> bool + Sync,
@ -371,6 +372,7 @@ where
settings_delta.old_embedders(), settings_delta.old_embedders(),
settings_delta.embedder_actions(), settings_delta.embedder_actions(),
settings_delta.new_embedder_category_id(), settings_delta.new_embedder_category_id(),
embedder_stats,
embedding_sender, embedding_sender,
field_distribution, field_distribution,
request_threads(), request_threads(),

View File

@ -1,6 +1,6 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::{Once, RwLock}; use std::sync::{Arc, Once, RwLock};
use std::thread::{self, Builder}; use std::thread::{self, Builder};
use big_s::S; use big_s::S;
@ -20,8 +20,8 @@ use super::steps::IndexingStep;
use super::thread_local::ThreadLocal; use super::thread_local::ThreadLocal;
use crate::documents::PrimaryKey; use crate::documents::PrimaryKey;
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
use crate::update::settings::SettingsDelta;
use crate::progress::{EmbedderStats, Progress}; use crate::progress::{EmbedderStats, Progress};
use crate::update::settings::SettingsDelta;
use crate::update::GrenadParameters; use crate::update::GrenadParameters;
use crate::vector::settings::{EmbedderAction, WriteBackToDocuments}; use crate::vector::settings::{EmbedderAction, WriteBackToDocuments};
use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs}; use crate::vector::{ArroyWrapper, Embedder, EmbeddingConfigs};
@ -213,6 +213,7 @@ pub fn reindex<'indexer, 'index, MSP, SD>(
settings_delta: &'indexer SD, settings_delta: &'indexer SD,
must_stop_processing: &'indexer MSP, must_stop_processing: &'indexer MSP,
progress: &'indexer Progress, progress: &'indexer Progress,
embedder_stats: Arc<EmbedderStats>,
) -> Result<ChannelCongestion> ) -> Result<ChannelCongestion>
where where
MSP: Fn() -> bool + Sync, MSP: Fn() -> bool + Sync,
@ -274,6 +275,7 @@ where
field_distribution, field_distribution,
index_embeddings, index_embeddings,
modified_docids, modified_docids,
&embedder_stats,
) )
}) })
.unwrap() .unwrap()

View File

@ -27,8 +27,8 @@ use crate::index::{
DEFAULT_MIN_WORD_LEN_ONE_TYPO, DEFAULT_MIN_WORD_LEN_TWO_TYPOS, DEFAULT_MIN_WORD_LEN_ONE_TYPO, DEFAULT_MIN_WORD_LEN_TWO_TYPOS,
}; };
use crate::order_by_map::OrderByMap; use crate::order_by_map::OrderByMap;
use crate::progress::Progress;
use crate::progress::EmbedderStats; use crate::progress::EmbedderStats;
use crate::progress::Progress;
use crate::prompt::{default_max_bytes, default_template_text, PromptData}; use crate::prompt::{default_max_bytes, default_template_text, PromptData};
use crate::proximity::ProximityPrecision; use crate::proximity::ProximityPrecision;
use crate::update::index_documents::IndexDocumentsMethod; use crate::update::index_documents::IndexDocumentsMethod;
@ -1362,7 +1362,12 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
} }
} }
pub fn legacy_execute<FP, FA>(mut self, progress_callback: FP, should_abort: FA) -> Result<()> pub fn legacy_execute<FP, FA>(
mut self,
progress_callback: FP,
should_abort: FA,
embedder_stats: Arc<EmbedderStats>,
) -> Result<()>
where where
FP: Fn(UpdateIndexingStep) + Sync, FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync, FA: Fn() -> bool + Sync,
@ -1430,6 +1435,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
mut self, mut self,
must_stop_processing: &'indexer MSP, must_stop_processing: &'indexer MSP,
progress: &'indexer Progress, progress: &'indexer Progress,
embedder_stats: Arc<EmbedderStats>,
) -> Result<Option<ChannelCongestion>> ) -> Result<Option<ChannelCongestion>>
where where
MSP: Fn() -> bool + Sync, MSP: Fn() -> bool + Sync,
@ -1440,6 +1446,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
.legacy_execute( .legacy_execute(
|indexing_step| tracing::debug!(update = ?indexing_step), |indexing_step| tracing::debug!(update = ?indexing_step),
must_stop_processing, must_stop_processing,
embedder_stats,
) )
.map(|_| None); .map(|_| None);
} }
@ -1510,6 +1517,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
&inner_settings_diff, &inner_settings_diff,
must_stop_processing, must_stop_processing,
progress, progress,
embedder_stats,
) )
.map(Some) .map(Some)
} else { } else {
@ -1519,6 +1527,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
self.legacy_execute( self.legacy_execute(
|indexing_step| tracing::debug!(update = ?indexing_step), |indexing_step| tracing::debug!(update = ?indexing_step),
must_stop_processing, must_stop_processing,
embedder_stats,
) )
.map(|_| None) .map(|_| None)
} }

View File

@ -20,7 +20,7 @@ macro_rules! test_distinct {
let config = milli::update::IndexerConfig::default(); let config = milli::update::IndexerConfig::default();
let mut builder = Settings::new(&mut wtxn, &index, &config); let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_distinct_field(S(stringify!($distinct))); builder.set_distinct_field(S(stringify!($distinct)));
builder.execute(&|| false, &Progress::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();

View File

@ -25,7 +25,7 @@ fn test_facet_distribution_with_no_facet_values() {
FilterableAttributesRule::Field(S("genres")), FilterableAttributesRule::Field(S("genres")),
FilterableAttributesRule::Field(S("tags")), FilterableAttributesRule::Field(S("tags")),
]); ]);
builder.execute(&|| false, &Progress::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// index documents // index documents

View File

@ -63,7 +63,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
S("america") => vec![S("the united states")], S("america") => vec![S("the united states")],
}); });
builder.set_searchable_fields(vec![S("title"), S("description")]); builder.set_searchable_fields(vec![S("title"), S("description")]);
builder.execute(&|| false, &Progress::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// index documents // index documents

View File

@ -11,7 +11,7 @@ fn set_stop_words(index: &Index, stop_words: &[&str]) {
let mut builder = Settings::new(&mut wtxn, index, &config); let mut builder = Settings::new(&mut wtxn, index, &config);
let stop_words = stop_words.iter().map(|s| s.to_string()).collect(); let stop_words = stop_words.iter().map(|s| s.to_string()).collect();
builder.set_stop_words(stop_words); builder.set_stop_words(stop_words);
builder.execute(&|| false, &Progress::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
} }

View File

@ -236,7 +236,7 @@ fn criteria_mixup() {
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index, &config); let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_criteria(criteria.clone()); builder.set_criteria(criteria.clone());
builder.execute(&|| false, &Progress::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
@ -276,7 +276,7 @@ fn criteria_ascdesc() {
S("name"), S("name"),
S("age"), S("age"),
}); });
builder.execute(&|| false, &Progress::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
@ -359,7 +359,7 @@ fn criteria_ascdesc() {
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index, &config); let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_criteria(vec![criterion.clone()]); builder.set_criteria(vec![criterion.clone()]);
builder.execute(&|| false, &Progress::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();

View File

@ -46,7 +46,7 @@ fn test_typo_tolerance_one_typo() {
let config = IndexerConfig::default(); let config = IndexerConfig::default();
let mut builder = Settings::new(&mut txn, &index, &config); let mut builder = Settings::new(&mut txn, &index, &config);
builder.set_min_word_len_one_typo(4); builder.set_min_word_len_one_typo(4);
builder.execute(&|| false, &Progress::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
// typo is now supported for 4 letters words // typo is now supported for 4 letters words
let mut search = Search::new(&txn, &index); let mut search = Search::new(&txn, &index);
@ -92,7 +92,7 @@ fn test_typo_tolerance_two_typo() {
let config = IndexerConfig::default(); let config = IndexerConfig::default();
let mut builder = Settings::new(&mut txn, &index, &config); let mut builder = Settings::new(&mut txn, &index, &config);
builder.set_min_word_len_two_typos(7); builder.set_min_word_len_two_typos(7);
builder.execute(&|| false, &Progress::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
// typo is now supported for 4 letters words // typo is now supported for 4 letters words
let mut search = Search::new(&txn, &index); let mut search = Search::new(&txn, &index);
@ -181,7 +181,7 @@ fn test_typo_disabled_on_word() {
// `zealand` doesn't allow typos anymore // `zealand` doesn't allow typos anymore
exact_words.insert("zealand".to_string()); exact_words.insert("zealand".to_string());
builder.set_exact_words(exact_words); builder.set_exact_words(exact_words);
builder.execute(&|| false, &Progress::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
let mut search = Search::new(&txn, &index); let mut search = Search::new(&txn, &index);
search.query("zealand"); search.query("zealand");
@ -219,7 +219,7 @@ fn test_disable_typo_on_attribute() {
let mut builder = Settings::new(&mut txn, &index, &config); let mut builder = Settings::new(&mut txn, &index, &config);
// disable typos on `description` // disable typos on `description`
builder.set_exact_attributes(vec!["description".to_string()].into_iter().collect()); builder.set_exact_attributes(vec!["description".to_string()].into_iter().collect());
builder.execute(&|| false, &Progress::default()).unwrap(); builder.execute(&|| false, &Progress::default(), Default::default()).unwrap();
let mut search = Search::new(&txn, &index); let mut search = Search::new(&txn, &index);
search.query("antebelum"); search.query("antebelum");