Merge pull request #5707 from Mubelotix/last_embedder_message

Add last embedder error in batches
This commit is contained in:
Louis Dureuil 2025-06-26 15:21:17 +00:00 committed by GitHub
commit 3aa6c3c750
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 492 additions and 93 deletions

3
.gitignore vendored
View File

@ -18,5 +18,8 @@
## ... unreviewed ## ... unreviewed
*.snap.new *.snap.new
# Database snapshot
crates/meilisearch/db.snapshot
# Fuzzcheck data for the facet indexing fuzz test # Fuzzcheck data for the facet indexing fuzz test
crates/milli/fuzz/update::facet::incremental::fuzz::fuzz/ crates/milli/fuzz/update::facet::incremental::fuzz::fuzz/

View File

@ -65,7 +65,7 @@ fn setup_settings<'t>(
let sortable_fields = sortable_fields.iter().map(|s| s.to_string()).collect(); let sortable_fields = sortable_fields.iter().map(|s| s.to_string()).collect();
builder.set_sortable_fields(sortable_fields); builder.set_sortable_fields(sortable_fields);
builder.execute(|_| (), || false).unwrap(); builder.execute(|_| (), || false, Default::default()).unwrap();
} }
fn setup_index_with_settings( fn setup_index_with_settings(
@ -169,6 +169,7 @@ fn indexing_songs_default(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -235,6 +236,7 @@ fn reindexing_songs_default(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -279,6 +281,7 @@ fn reindexing_songs_default(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -347,6 +350,7 @@ fn deleting_songs_in_batches_default(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -423,6 +427,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -467,6 +472,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -507,6 +513,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -574,6 +581,7 @@ fn indexing_songs_without_faceted_numbers(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -640,6 +648,7 @@ fn indexing_songs_without_faceted_fields(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -706,6 +715,7 @@ fn indexing_wiki(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -771,6 +781,7 @@ fn reindexing_wiki(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -815,6 +826,7 @@ fn reindexing_wiki(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -882,6 +894,7 @@ fn deleting_wiki_in_batches_default(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -958,6 +971,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -1003,6 +1017,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -1044,6 +1059,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -1110,6 +1126,7 @@ fn indexing_movies_default(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -1175,6 +1192,7 @@ fn reindexing_movies_default(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -1219,6 +1237,7 @@ fn reindexing_movies_default(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -1286,6 +1305,7 @@ fn deleting_movies_in_batches_default(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -1334,6 +1354,7 @@ fn delete_documents_from_ids(index: Index, document_ids_to_delete: Vec<RoaringBi
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -1398,6 +1419,7 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -1442,6 +1464,7 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -1482,6 +1505,7 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -1571,6 +1595,7 @@ fn indexing_nested_movies_default(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -1661,6 +1686,7 @@ fn deleting_nested_movies_in_batches_default(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -1743,6 +1769,7 @@ fn indexing_nested_movies_without_faceted_fields(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -1809,6 +1836,7 @@ fn indexing_geo(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -1874,6 +1902,7 @@ fn reindexing_geo(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -1918,6 +1947,7 @@ fn reindexing_geo(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -1985,6 +2015,7 @@ fn deleting_geo_in_batches_default(c: &mut Criterion) {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();

View File

@ -90,7 +90,7 @@ pub fn base_setup(conf: &Conf) -> Index {
(conf.configure)(&mut builder); (conf.configure)(&mut builder);
builder.execute(|_| (), || false).unwrap(); builder.execute(|_| (), || false, Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
let config = IndexerConfig::default(); let config = IndexerConfig::default();
@ -128,6 +128,7 @@ pub fn base_setup(conf: &Conf) -> Index {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();

View File

@ -329,6 +329,7 @@ pub(crate) mod test {
write_channel_congestion: None, write_channel_congestion: None,
internal_database_sizes: Default::default(), internal_database_sizes: Default::default(),
}, },
embedder_stats: Default::default(),
enqueued_at: Some(BatchEnqueuedAt { enqueued_at: Some(BatchEnqueuedAt {
earliest: datetime!(2022-11-11 0:00 UTC), earliest: datetime!(2022-11-11 0:00 UTC),
oldest: datetime!(2022-11-11 0:00 UTC), oldest: datetime!(2022-11-11 0:00 UTC),

View File

@ -144,6 +144,7 @@ fn main() {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();

View File

@ -343,6 +343,7 @@ pub fn snapshot_batch(batch: &Batch) -> String {
uid, uid,
details, details,
stats, stats,
embedder_stats,
started_at, started_at,
finished_at, finished_at,
progress: _, progress: _,
@ -366,6 +367,12 @@ pub fn snapshot_batch(batch: &Batch) -> String {
snap.push_str(&format!("uid: {uid}, ")); snap.push_str(&format!("uid: {uid}, "));
snap.push_str(&format!("details: {}, ", serde_json::to_string(details).unwrap())); snap.push_str(&format!("details: {}, ", serde_json::to_string(details).unwrap()));
snap.push_str(&format!("stats: {}, ", serde_json::to_string(&stats).unwrap())); snap.push_str(&format!("stats: {}, ", serde_json::to_string(&stats).unwrap()));
if !embedder_stats.skip_serializing() {
snap.push_str(&format!(
"embedder stats: {}, ",
serde_json::to_string(&embedder_stats).unwrap()
));
}
snap.push_str(&format!("stop reason: {}, ", serde_json::to_string(&stop_reason).unwrap())); snap.push_str(&format!("stop reason: {}, ", serde_json::to_string(&stop_reason).unwrap()));
snap.push('}'); snap.push('}');
snap snap

View File

@ -179,6 +179,7 @@ impl BatchQueue {
progress: None, progress: None,
details: batch.details, details: batch.details,
stats: batch.stats, stats: batch.stats,
embedder_stats: batch.embedder_stats.as_ref().into(),
started_at: batch.started_at, started_at: batch.started_at,
finished_at: batch.finished_at, finished_at: batch.finished_at,
enqueued_at: batch.enqueued_at, enqueued_at: batch.enqueued_at,

View File

@ -162,8 +162,13 @@ impl IndexScheduler {
.set_currently_updating_index(Some((index_uid.clone(), index.clone()))); .set_currently_updating_index(Some((index_uid.clone(), index.clone())));
let pre_commit_dabases_sizes = index.database_sizes(&index_wtxn)?; let pre_commit_dabases_sizes = index.database_sizes(&index_wtxn)?;
let (tasks, congestion) = let (tasks, congestion) = self.apply_index_operation(
self.apply_index_operation(&mut index_wtxn, &index, op, &progress)?; &mut index_wtxn,
&index,
op,
&progress,
current_batch.embedder_stats.clone(),
)?;
{ {
progress.update_progress(FinalizingIndexStep::Committing); progress.update_progress(FinalizingIndexStep::Committing);
@ -238,10 +243,12 @@ impl IndexScheduler {
); );
builder.set_primary_key(primary_key); builder.set_primary_key(primary_key);
let must_stop_processing = self.scheduler.must_stop_processing.clone(); let must_stop_processing = self.scheduler.must_stop_processing.clone();
builder builder
.execute( .execute(
|indexing_step| tracing::debug!(update = ?indexing_step), |indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.get(), || must_stop_processing.get(),
current_batch.embedder_stats.clone(),
) )
.map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?; .map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?;
index_wtxn.commit()?; index_wtxn.commit()?;

View File

@ -1,8 +1,10 @@
use std::sync::Arc;
use bumpalo::collections::CollectIn; use bumpalo::collections::CollectIn;
use bumpalo::Bump; use bumpalo::Bump;
use meilisearch_types::heed::RwTxn; use meilisearch_types::heed::RwTxn;
use meilisearch_types::milli::documents::PrimaryKey; use meilisearch_types::milli::documents::PrimaryKey;
use meilisearch_types::milli::progress::Progress; use meilisearch_types::milli::progress::{EmbedderStats, Progress};
use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction}; use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction};
use meilisearch_types::milli::update::DocumentAdditionResult; use meilisearch_types::milli::update::DocumentAdditionResult;
use meilisearch_types::milli::{self, ChannelCongestion, Filter}; use meilisearch_types::milli::{self, ChannelCongestion, Filter};
@ -24,7 +26,7 @@ impl IndexScheduler {
/// The list of processed tasks. /// The list of processed tasks.
#[tracing::instrument( #[tracing::instrument(
level = "trace", level = "trace",
skip(self, index_wtxn, index, progress), skip(self, index_wtxn, index, progress, embedder_stats),
target = "indexing::scheduler" target = "indexing::scheduler"
)] )]
pub(crate) fn apply_index_operation<'i>( pub(crate) fn apply_index_operation<'i>(
@ -33,6 +35,7 @@ impl IndexScheduler {
index: &'i Index, index: &'i Index,
operation: IndexOperation, operation: IndexOperation,
progress: &Progress, progress: &Progress,
embedder_stats: Arc<EmbedderStats>,
) -> Result<(Vec<Task>, Option<ChannelCongestion>)> { ) -> Result<(Vec<Task>, Option<ChannelCongestion>)> {
let indexer_alloc = Bump::new(); let indexer_alloc = Bump::new();
let started_processing_at = std::time::Instant::now(); let started_processing_at = std::time::Instant::now();
@ -177,6 +180,7 @@ impl IndexScheduler {
embedders, embedders,
&|| must_stop_processing.get(), &|| must_stop_processing.get(),
progress, progress,
&embedder_stats,
) )
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?, .map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?,
); );
@ -288,6 +292,7 @@ impl IndexScheduler {
embedders, embedders,
&|| must_stop_processing.get(), &|| must_stop_processing.get(),
progress, progress,
&embedder_stats,
) )
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?, .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?,
); );
@ -436,6 +441,7 @@ impl IndexScheduler {
embedders, embedders,
&|| must_stop_processing.get(), &|| must_stop_processing.get(),
progress, progress,
&embedder_stats,
) )
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?, .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?,
); );
@ -472,6 +478,7 @@ impl IndexScheduler {
.execute( .execute(
|indexing_step| tracing::debug!(update = ?indexing_step), |indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.get(), || must_stop_processing.get(),
embedder_stats,
) )
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?; .map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
@ -491,6 +498,7 @@ impl IndexScheduler {
tasks: cleared_tasks, tasks: cleared_tasks,
}, },
progress, progress,
embedder_stats.clone(),
)?; )?;
let (settings_tasks, _congestion) = self.apply_index_operation( let (settings_tasks, _congestion) = self.apply_index_operation(
@ -498,6 +506,7 @@ impl IndexScheduler {
index, index,
IndexOperation::Settings { index_uid, settings, tasks: settings_tasks }, IndexOperation::Settings { index_uid, settings, tasks: settings_tasks },
progress, progress,
embedder_stats,
)?; )?;
let mut tasks = settings_tasks; let mut tasks = settings_tasks;

View File

@ -1,7 +1,9 @@
//! Utility functions on the DBs. Mainly getter and setters. //! Utility functions on the DBs. Mainly getter and setters.
use crate::milli::progress::EmbedderStats;
use std::collections::{BTreeSet, HashSet}; use std::collections::{BTreeSet, HashSet};
use std::ops::Bound; use std::ops::Bound;
use std::sync::Arc;
use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchId, BatchStats}; use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchId, BatchStats};
use meilisearch_types::heed::{Database, RoTxn, RwTxn}; use meilisearch_types::heed::{Database, RoTxn, RwTxn};
@ -27,6 +29,7 @@ pub struct ProcessingBatch {
pub uid: BatchId, pub uid: BatchId,
pub details: DetailsView, pub details: DetailsView,
pub stats: BatchStats, pub stats: BatchStats,
pub embedder_stats: Arc<EmbedderStats>,
pub statuses: HashSet<Status>, pub statuses: HashSet<Status>,
pub kinds: HashSet<Kind>, pub kinds: HashSet<Kind>,
@ -48,6 +51,7 @@ impl ProcessingBatch {
uid, uid,
details: DetailsView::default(), details: DetailsView::default(),
stats: BatchStats::default(), stats: BatchStats::default(),
embedder_stats: Default::default(),
statuses, statuses,
kinds: HashSet::default(), kinds: HashSet::default(),
@ -146,6 +150,7 @@ impl ProcessingBatch {
progress: None, progress: None,
details: self.details.clone(), details: self.details.clone(),
stats: self.stats.clone(), stats: self.stats.clone(),
embedder_stats: self.embedder_stats.as_ref().into(),
started_at: self.started_at, started_at: self.started_at,
finished_at: self.finished_at, finished_at: self.finished_at,
enqueued_at: self.enqueued_at, enqueued_at: self.enqueued_at,

View File

@ -3,7 +3,7 @@ use serde::Serialize;
use time::{Duration, OffsetDateTime}; use time::{Duration, OffsetDateTime};
use utoipa::ToSchema; use utoipa::ToSchema;
use crate::batches::{Batch, BatchId, BatchStats}; use crate::batches::{Batch, BatchId, BatchStats, EmbedderStatsView};
use crate::task_view::DetailsView; use crate::task_view::DetailsView;
use crate::tasks::serialize_duration; use crate::tasks::serialize_duration;
@ -14,7 +14,7 @@ pub struct BatchView {
pub uid: BatchId, pub uid: BatchId,
pub progress: Option<ProgressView>, pub progress: Option<ProgressView>,
pub details: DetailsView, pub details: DetailsView,
pub stats: BatchStats, pub stats: BatchStatsView,
#[serde(serialize_with = "serialize_duration", default)] #[serde(serialize_with = "serialize_duration", default)]
pub duration: Option<Duration>, pub duration: Option<Duration>,
#[serde(with = "time::serde::rfc3339", default)] #[serde(with = "time::serde::rfc3339", default)]
@ -25,13 +25,26 @@ pub struct BatchView {
pub batch_strategy: String, pub batch_strategy: String,
} }
#[derive(Debug, Clone, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
#[schema(rename_all = "camelCase")]
pub struct BatchStatsView {
#[serde(flatten)]
pub stats: BatchStats,
#[serde(skip_serializing_if = "EmbedderStatsView::skip_serializing", default)]
pub embedder_requests: EmbedderStatsView,
}
impl BatchView { impl BatchView {
pub fn from_batch(batch: &Batch) -> Self { pub fn from_batch(batch: &Batch) -> Self {
Self { Self {
uid: batch.uid, uid: batch.uid,
progress: batch.progress.clone(), progress: batch.progress.clone(),
details: batch.details.clone(), details: batch.details.clone(),
stats: batch.stats.clone(), stats: BatchStatsView {
stats: batch.stats.clone(),
embedder_requests: batch.embedder_stats.clone(),
},
duration: batch.finished_at.map(|finished_at| finished_at - batch.started_at), duration: batch.finished_at.map(|finished_at| finished_at - batch.started_at),
started_at: batch.started_at, started_at: batch.started_at,
finished_at: batch.finished_at, finished_at: batch.finished_at,

View File

@ -1,6 +1,6 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use milli::progress::ProgressView; use milli::progress::{EmbedderStats, ProgressView};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use time::OffsetDateTime; use time::OffsetDateTime;
use utoipa::ToSchema; use utoipa::ToSchema;
@ -19,6 +19,8 @@ pub struct Batch {
pub progress: Option<ProgressView>, pub progress: Option<ProgressView>,
pub details: DetailsView, pub details: DetailsView,
pub stats: BatchStats, pub stats: BatchStats,
#[serde(skip_serializing_if = "EmbedderStatsView::skip_serializing", default)]
pub embedder_stats: EmbedderStatsView,
#[serde(with = "time::serde::rfc3339")] #[serde(with = "time::serde::rfc3339")]
pub started_at: OffsetDateTime, pub started_at: OffsetDateTime,
@ -43,6 +45,7 @@ impl PartialEq for Batch {
progress, progress,
details, details,
stats, stats,
embedder_stats,
started_at, started_at,
finished_at, finished_at,
enqueued_at, enqueued_at,
@ -53,6 +56,7 @@ impl PartialEq for Batch {
&& progress.is_none() == other.progress.is_none() && progress.is_none() == other.progress.is_none()
&& details == &other.details && details == &other.details
&& stats == &other.stats && stats == &other.stats
&& embedder_stats == &other.embedder_stats
&& started_at == &other.started_at && started_at == &other.started_at
&& finished_at == &other.finished_at && finished_at == &other.finished_at
&& enqueued_at == &other.enqueued_at && enqueued_at == &other.enqueued_at
@ -83,3 +87,30 @@ pub struct BatchStats {
#[serde(default, skip_serializing_if = "serde_json::Map::is_empty")] #[serde(default, skip_serializing_if = "serde_json::Map::is_empty")]
pub internal_database_sizes: serde_json::Map<String, serde_json::Value>, pub internal_database_sizes: serde_json::Map<String, serde_json::Value>,
} }
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
#[schema(rename_all = "camelCase")]
pub struct EmbedderStatsView {
pub total: usize,
pub failed: usize,
#[serde(skip_serializing_if = "Option::is_none", default)]
pub last_error: Option<String>,
}
impl From<&EmbedderStats> for EmbedderStatsView {
fn from(stats: &EmbedderStats) -> Self {
let errors = stats.errors.read().unwrap_or_else(|p| p.into_inner());
Self {
total: stats.total_count.load(std::sync::atomic::Ordering::Relaxed),
failed: errors.1 as usize,
last_error: errors.0.clone(),
}
}
}
impl EmbedderStatsView {
pub fn skip_serializing(&self) -> bool {
self.total == 0 && self.failed == 0 && self.last_error.is_none()
}
}

View File

@ -37,6 +37,7 @@ use index_scheduler::{IndexScheduler, IndexSchedulerOptions};
use meilisearch_auth::{open_auth_store_env, AuthController}; use meilisearch_auth::{open_auth_store_env, AuthController};
use meilisearch_types::milli::constants::VERSION_MAJOR; use meilisearch_types::milli::constants::VERSION_MAJOR;
use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
use meilisearch_types::milli::progress::EmbedderStats;
use meilisearch_types::milli::update::{ use meilisearch_types::milli::update::{
default_thread_pool_and_threads, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, default_thread_pool_and_threads, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig,
}; };
@ -542,8 +543,12 @@ fn import_dump(
tracing::info!("Importing the settings."); tracing::info!("Importing the settings.");
let settings = index_reader.settings()?; let settings = index_reader.settings()?;
apply_settings_to_builder(&settings, &mut builder); apply_settings_to_builder(&settings, &mut builder);
builder let embedder_stats: Arc<EmbedderStats> = Default::default();
.execute(|indexing_step| tracing::debug!("update: {:?}", indexing_step), || false)?; builder.execute(
|indexing_step| tracing::debug!("update: {:?}", indexing_step),
|| false,
embedder_stats.clone(),
)?;
// 4.3 Import the documents. // 4.3 Import the documents.
// 4.3.1 We need to recreate the grenad+obkv format accepted by the index. // 4.3.1 We need to recreate the grenad+obkv format accepted by the index.
@ -574,6 +579,7 @@ fn import_dump(
}, },
|indexing_step| tracing::trace!("update: {:?}", indexing_step), |indexing_step| tracing::trace!("update: {:?}", indexing_step),
|| false, || false,
&embedder_stats,
)?; )?;
let builder = builder.with_embedders(embedders); let builder = builder.with_embedders(embedders);

View File

@ -1,7 +1,10 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::sync::atomic::AtomicUsize;
use meili_snap::{json_string, snapshot}; use meili_snap::{json_string, snapshot};
use reqwest::IntoUrl; use reqwest::IntoUrl;
use std::time::Duration;
use tokio::sync::mpsc;
use wiremock::matchers::{method, path}; use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, Request, ResponseTemplate}; use wiremock::{Mock, MockServer, Request, ResponseTemplate};
@ -334,6 +337,41 @@ async fn create_mock_raw() -> (MockServer, Value) {
(mock_server, embedder_settings) (mock_server, embedder_settings)
} }
async fn create_faulty_mock_raw(sender: mpsc::Sender<()>) -> (MockServer, Value) {
let mock_server = MockServer::start().await;
let count = AtomicUsize::new(0);
Mock::given(method("POST"))
.and(path("/"))
.respond_with(move |_req: &Request| {
let count = count.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
if count >= 5 {
let _ = sender.try_send(());
ResponseTemplate::new(500)
.set_delay(Duration::from_secs(u64::MAX)) // Make the response hang forever
.set_body_string("Service Unavailable")
} else {
ResponseTemplate::new(500).set_body_string("Service Unavailable")
}
})
.mount(&mock_server)
.await;
let url = mock_server.uri();
let embedder_settings = json!({
"source": "rest",
"url": url,
"dimensions": 3,
"request": "{{text}}",
"response": "{{embedding}}",
"documentTemplate": "{{doc.name}}"
});
(mock_server, embedder_settings)
}
pub async fn post<T: IntoUrl>(url: T, text: &str) -> reqwest::Result<reqwest::Response> { pub async fn post<T: IntoUrl>(url: T, text: &str) -> reqwest::Result<reqwest::Response> {
reqwest::Client::builder().build()?.post(url).json(&json!(text)).send().await reqwest::Client::builder().build()?.post(url).json(&json!(text)).send().await
} }
@ -2111,3 +2149,70 @@ async fn searchable_reindex() {
} }
"###); "###);
} }
#[actix_rt::test]
async fn last_error_stats() {
let (sender, mut receiver) = mpsc::channel(10);
let (_mock, setting) = create_faulty_mock_raw(sender).await;
let server = get_server_vector().await;
let index = server.index("doggo");
let (response, code) = index
.update_settings(json!({
"embedders": {
"rest": setting,
},
}))
.await;
snapshot!(code, @"202 Accepted");
let task = server.wait_task(response.uid()).await;
snapshot!(task["status"], @r###""succeeded""###);
let documents = json!([
{"id": 0, "name": "will_return_500"},
{"id": 1, "name": "will_error"},
{"id": 2, "name": "must_error"},
]);
let (_value, code) = index.add_documents(documents, None).await;
snapshot!(code, @"202 Accepted");
// The task will eventually fail, so let's not wait for it.
// Let's just wait for the server's signal
receiver.recv().await;
let (response, _code) = index.filtered_batches(&[], &[], &[]).await;
snapshot!(json_string!(response["results"][0], {
".progress" => "[ignored]",
".stats.embedderRequests.total" => "[ignored]",
".startedAt" => "[ignored]"
}), @r#"
{
"uid": 1,
"progress": "[ignored]",
"details": {
"receivedDocuments": 3,
"indexedDocuments": null
},
"stats": {
"totalNbTasks": 1,
"status": {
"processing": 1
},
"types": {
"documentAdditionOrUpdate": 1
},
"indexUids": {
"doggo": 1
},
"embedderRequests": {
"total": "[ignored]",
"failed": 5,
"lastError": "runtime error: received internal error HTTP 500 from embedding server\n - server replied with `Service Unavailable`"
}
},
"duration": null,
"startedAt": "[ignored]",
"finishedAt": null,
"batchStrategy": "batched all enqueued tasks"
}
"#);
}

View File

@ -1,7 +1,7 @@
use std::any::TypeId; use std::any::TypeId;
use std::borrow::Cow; use std::borrow::Cow;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -22,6 +22,25 @@ pub struct Progress {
steps: Arc<RwLock<InnerProgress>>, steps: Arc<RwLock<InnerProgress>>,
} }
#[derive(Default)]
pub struct EmbedderStats {
pub errors: Arc<RwLock<(Option<String>, u32)>>,
pub total_count: AtomicUsize,
}
impl std::fmt::Debug for EmbedderStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let guard = self.errors.read().unwrap_or_else(|p| p.into_inner());
let (error, count) = (guard.0.clone(), guard.1);
std::mem::drop(guard);
f.debug_struct("EmbedderStats")
.field("last_error", &error)
.field("total_count", &self.total_count.load(Ordering::Relaxed))
.field("error_count", &count)
.finish()
}
}
#[derive(Default)] #[derive(Default)]
struct InnerProgress { struct InnerProgress {
/// The hierarchy of steps. /// The hierarchy of steps.

View File

@ -44,7 +44,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
S("america") => vec![S("the united states")], S("america") => vec![S("the united states")],
}); });
builder.set_searchable_fields(vec![S("title"), S("description")]); builder.set_searchable_fields(vec![S("title"), S("description")]);
builder.execute(|_| (), || false).unwrap(); builder.execute(|_| (), || false, Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// index documents // index documents
@ -95,6 +95,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();

View File

@ -103,6 +103,7 @@ impl TempIndex {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
}) })
.unwrap()?; .unwrap()?;
@ -134,7 +135,7 @@ impl TempIndex {
) -> Result<(), crate::error::Error> { ) -> Result<(), crate::error::Error> {
let mut builder = update::Settings::new(wtxn, &self.inner, &self.indexer_config); let mut builder = update::Settings::new(wtxn, &self.inner, &self.indexer_config);
update(&mut builder); update(&mut builder);
builder.execute(drop, || false)?; builder.execute(drop, || false, Default::default())?;
Ok(()) Ok(())
} }
@ -185,6 +186,7 @@ impl TempIndex {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
}) })
.unwrap()?; .unwrap()?;
@ -259,6 +261,7 @@ fn aborting_indexation() {
embedders, embedders,
&|| should_abort.load(Relaxed), &|| should_abort.load(Relaxed),
&Progress::default(), &Progress::default(),
&Default::default(),
) )
}) })
.unwrap() .unwrap()

View File

@ -17,6 +17,7 @@ use crate::constants::RESERVED_VECTORS_FIELD_NAME;
use crate::error::FaultSource; use crate::error::FaultSource;
use crate::fields_ids_map::metadata::FieldIdMapWithMetadata; use crate::fields_ids_map::metadata::FieldIdMapWithMetadata;
use crate::index::IndexEmbeddingConfig; use crate::index::IndexEmbeddingConfig;
use crate::progress::EmbedderStats;
use crate::prompt::Prompt; use crate::prompt::Prompt;
use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd}; use crate::update::del_add::{DelAdd, KvReaderDelAdd, KvWriterDelAdd};
use crate::update::settings::InnerIndexSettingsDiff; use crate::update::settings::InnerIndexSettingsDiff;
@ -674,6 +675,7 @@ fn compare_vectors(a: &[f32], b: &[f32]) -> Ordering {
a.iter().copied().map(OrderedFloat).cmp(b.iter().copied().map(OrderedFloat)) a.iter().copied().map(OrderedFloat).cmp(b.iter().copied().map(OrderedFloat))
} }
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")] #[tracing::instrument(level = "trace", skip_all, target = "indexing::extract")]
pub fn extract_embeddings<R: io::Read + io::Seek>( pub fn extract_embeddings<R: io::Read + io::Seek>(
// docid, prompt // docid, prompt
@ -682,6 +684,7 @@ pub fn extract_embeddings<R: io::Read + io::Seek>(
embedder: Arc<Embedder>, embedder: Arc<Embedder>,
embedder_name: &str, embedder_name: &str,
possible_embedding_mistakes: &PossibleEmbeddingMistakes, possible_embedding_mistakes: &PossibleEmbeddingMistakes,
embedder_stats: &EmbedderStats,
unused_vectors_distribution: &UnusedVectorsDistribution, unused_vectors_distribution: &UnusedVectorsDistribution,
request_threads: &ThreadPoolNoAbort, request_threads: &ThreadPoolNoAbort,
) -> Result<grenad::Reader<BufReader<File>>> { ) -> Result<grenad::Reader<BufReader<File>>> {
@ -724,6 +727,7 @@ pub fn extract_embeddings<R: io::Read + io::Seek>(
std::mem::replace(&mut chunks, Vec::with_capacity(n_chunks)), std::mem::replace(&mut chunks, Vec::with_capacity(n_chunks)),
embedder_name, embedder_name,
possible_embedding_mistakes, possible_embedding_mistakes,
embedder_stats,
unused_vectors_distribution, unused_vectors_distribution,
request_threads, request_threads,
)?; )?;
@ -746,6 +750,7 @@ pub fn extract_embeddings<R: io::Read + io::Seek>(
std::mem::take(&mut chunks), std::mem::take(&mut chunks),
embedder_name, embedder_name,
possible_embedding_mistakes, possible_embedding_mistakes,
embedder_stats,
unused_vectors_distribution, unused_vectors_distribution,
request_threads, request_threads,
)?; )?;
@ -764,6 +769,7 @@ pub fn extract_embeddings<R: io::Read + io::Seek>(
vec![std::mem::take(&mut current_chunk)], vec![std::mem::take(&mut current_chunk)],
embedder_name, embedder_name,
possible_embedding_mistakes, possible_embedding_mistakes,
embedder_stats,
unused_vectors_distribution, unused_vectors_distribution,
request_threads, request_threads,
)?; )?;
@ -783,10 +789,11 @@ fn embed_chunks(
text_chunks: Vec<Vec<String>>, text_chunks: Vec<Vec<String>>,
embedder_name: &str, embedder_name: &str,
possible_embedding_mistakes: &PossibleEmbeddingMistakes, possible_embedding_mistakes: &PossibleEmbeddingMistakes,
embedder_stats: &EmbedderStats,
unused_vectors_distribution: &UnusedVectorsDistribution, unused_vectors_distribution: &UnusedVectorsDistribution,
request_threads: &ThreadPoolNoAbort, request_threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<Embedding>>> { ) -> Result<Vec<Vec<Embedding>>> {
match embedder.embed_index(text_chunks, request_threads) { match embedder.embed_index(text_chunks, request_threads, embedder_stats) {
Ok(chunks) => Ok(chunks), Ok(chunks) => Ok(chunks),
Err(error) => { Err(error) => {
if let FaultSource::Bug = error.fault { if let FaultSource::Bug = error.fault {

View File

@ -31,6 +31,7 @@ use self::extract_word_position_docids::extract_word_position_docids;
use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters}; use super::helpers::{as_cloneable_grenad, CursorClonableMmap, GrenadParameters};
use super::{helpers, TypedChunk}; use super::{helpers, TypedChunk};
use crate::index::IndexEmbeddingConfig; use crate::index::IndexEmbeddingConfig;
use crate::progress::EmbedderStats;
use crate::update::settings::InnerIndexSettingsDiff; use crate::update::settings::InnerIndexSettingsDiff;
use crate::vector::error::PossibleEmbeddingMistakes; use crate::vector::error::PossibleEmbeddingMistakes;
use crate::{FieldId, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; use crate::{FieldId, Result, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder};
@ -49,6 +50,7 @@ pub(crate) fn data_from_obkv_documents(
settings_diff: Arc<InnerIndexSettingsDiff>, settings_diff: Arc<InnerIndexSettingsDiff>,
max_positions_per_attributes: Option<u32>, max_positions_per_attributes: Option<u32>,
possible_embedding_mistakes: Arc<PossibleEmbeddingMistakes>, possible_embedding_mistakes: Arc<PossibleEmbeddingMistakes>,
embedder_stats: &Arc<EmbedderStats>,
) -> Result<()> { ) -> Result<()> {
let (original_pipeline_result, flattened_pipeline_result): (Result<_>, Result<_>) = rayon::join( let (original_pipeline_result, flattened_pipeline_result): (Result<_>, Result<_>) = rayon::join(
|| { || {
@ -62,6 +64,7 @@ pub(crate) fn data_from_obkv_documents(
embedders_configs.clone(), embedders_configs.clone(),
settings_diff.clone(), settings_diff.clone(),
possible_embedding_mistakes.clone(), possible_embedding_mistakes.clone(),
embedder_stats.clone(),
) )
}) })
.collect::<Result<()>>() .collect::<Result<()>>()
@ -231,6 +234,7 @@ fn send_original_documents_data(
embedders_configs: Arc<Vec<IndexEmbeddingConfig>>, embedders_configs: Arc<Vec<IndexEmbeddingConfig>>,
settings_diff: Arc<InnerIndexSettingsDiff>, settings_diff: Arc<InnerIndexSettingsDiff>,
possible_embedding_mistakes: Arc<PossibleEmbeddingMistakes>, possible_embedding_mistakes: Arc<PossibleEmbeddingMistakes>,
embedder_stats: Arc<EmbedderStats>,
) -> Result<()> { ) -> Result<()> {
let original_documents_chunk = let original_documents_chunk =
original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?; original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
@ -270,6 +274,7 @@ fn send_original_documents_data(
embedder.clone(), embedder.clone(),
&embedder_name, &embedder_name,
&possible_embedding_mistakes, &possible_embedding_mistakes,
&embedder_stats,
&unused_vectors_distribution, &unused_vectors_distribution,
request_threads(), request_threads(),
) { ) {

View File

@ -32,7 +32,7 @@ use crate::database_stats::DatabaseStats;
use crate::documents::{obkv_to_object, DocumentsBatchReader}; use crate::documents::{obkv_to_object, DocumentsBatchReader};
use crate::error::{Error, InternalError}; use crate::error::{Error, InternalError};
use crate::index::{PrefixSearch, PrefixSettings}; use crate::index::{PrefixSearch, PrefixSettings};
use crate::progress::Progress; use crate::progress::{EmbedderStats, Progress};
pub use crate::update::index_documents::helpers::CursorClonableMmap; pub use crate::update::index_documents::helpers::CursorClonableMmap;
use crate::update::{ use crate::update::{
IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst, IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst,
@ -81,6 +81,7 @@ pub struct IndexDocuments<'t, 'i, 'a, FP, FA> {
added_documents: u64, added_documents: u64,
deleted_documents: u64, deleted_documents: u64,
embedders: EmbeddingConfigs, embedders: EmbeddingConfigs,
embedder_stats: &'t Arc<EmbedderStats>,
} }
#[derive(Default, Debug, Clone)] #[derive(Default, Debug, Clone)]
@ -103,6 +104,7 @@ where
config: IndexDocumentsConfig, config: IndexDocumentsConfig,
progress: FP, progress: FP,
should_abort: FA, should_abort: FA,
embedder_stats: &'t Arc<EmbedderStats>,
) -> Result<IndexDocuments<'t, 'i, 'a, FP, FA>> { ) -> Result<IndexDocuments<'t, 'i, 'a, FP, FA>> {
let transform = Some(Transform::new( let transform = Some(Transform::new(
wtxn, wtxn,
@ -123,6 +125,7 @@ where
added_documents: 0, added_documents: 0,
deleted_documents: 0, deleted_documents: 0,
embedders: Default::default(), embedders: Default::default(),
embedder_stats,
}) })
} }
@ -292,6 +295,7 @@ where
// Run extraction pipeline in parallel. // Run extraction pipeline in parallel.
let mut modified_docids = RoaringBitmap::new(); let mut modified_docids = RoaringBitmap::new();
let embedder_stats = self.embedder_stats.clone();
pool.install(|| { pool.install(|| {
let settings_diff_cloned = settings_diff.clone(); let settings_diff_cloned = settings_diff.clone();
rayon::spawn(move || { rayon::spawn(move || {
@ -326,7 +330,8 @@ where
embedders_configs.clone(), embedders_configs.clone(),
settings_diff_cloned, settings_diff_cloned,
max_positions_per_attributes, max_positions_per_attributes,
Arc::new(possible_embedding_mistakes) Arc::new(possible_embedding_mistakes),
&embedder_stats
) )
}); });
@ -2025,6 +2030,7 @@ mod tests {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
@ -2112,6 +2118,7 @@ mod tests {
EmbeddingConfigs::default(), EmbeddingConfigs::default(),
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
@ -2297,6 +2304,7 @@ mod tests {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
@ -2359,6 +2367,7 @@ mod tests {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
@ -2412,6 +2421,7 @@ mod tests {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
@ -2464,6 +2474,7 @@ mod tests {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
@ -2518,6 +2529,7 @@ mod tests {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
@ -2577,6 +2589,7 @@ mod tests {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
@ -2629,6 +2642,7 @@ mod tests {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
@ -2681,6 +2695,7 @@ mod tests {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
@ -2879,6 +2894,7 @@ mod tests {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
@ -2938,6 +2954,7 @@ mod tests {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
@ -2994,6 +3011,7 @@ mod tests {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();

View File

@ -6,6 +6,7 @@ use hashbrown::{DefaultHashBuilder, HashMap};
use super::cache::DelAddRoaringBitmap; use super::cache::DelAddRoaringBitmap;
use crate::error::FaultSource; use crate::error::FaultSource;
use crate::progress::EmbedderStats;
use crate::prompt::Prompt; use crate::prompt::Prompt;
use crate::update::new::channel::EmbeddingSender; use crate::update::new::channel::EmbeddingSender;
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor}; use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor};
@ -22,6 +23,7 @@ pub struct EmbeddingExtractor<'a, 'b> {
embedders: &'a EmbeddingConfigs, embedders: &'a EmbeddingConfigs,
sender: EmbeddingSender<'a, 'b>, sender: EmbeddingSender<'a, 'b>,
possible_embedding_mistakes: PossibleEmbeddingMistakes, possible_embedding_mistakes: PossibleEmbeddingMistakes,
embedder_stats: &'a EmbedderStats,
threads: &'a ThreadPoolNoAbort, threads: &'a ThreadPoolNoAbort,
} }
@ -30,10 +32,11 @@ impl<'a, 'b> EmbeddingExtractor<'a, 'b> {
embedders: &'a EmbeddingConfigs, embedders: &'a EmbeddingConfigs,
sender: EmbeddingSender<'a, 'b>, sender: EmbeddingSender<'a, 'b>,
field_distribution: &'a FieldDistribution, field_distribution: &'a FieldDistribution,
embedder_stats: &'a EmbedderStats,
threads: &'a ThreadPoolNoAbort, threads: &'a ThreadPoolNoAbort,
) -> Self { ) -> Self {
let possible_embedding_mistakes = PossibleEmbeddingMistakes::new(field_distribution); let possible_embedding_mistakes = PossibleEmbeddingMistakes::new(field_distribution);
Self { embedders, sender, threads, possible_embedding_mistakes } Self { embedders, sender, threads, possible_embedding_mistakes, embedder_stats }
} }
} }
@ -75,6 +78,7 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> {
prompt, prompt,
context.data, context.data,
&self.possible_embedding_mistakes, &self.possible_embedding_mistakes,
self.embedder_stats,
self.threads, self.threads,
self.sender, self.sender,
&context.doc_alloc, &context.doc_alloc,
@ -307,6 +311,7 @@ struct Chunks<'a, 'b, 'extractor> {
dimensions: usize, dimensions: usize,
prompt: &'a Prompt, prompt: &'a Prompt,
possible_embedding_mistakes: &'a PossibleEmbeddingMistakes, possible_embedding_mistakes: &'a PossibleEmbeddingMistakes,
embedder_stats: &'a EmbedderStats,
user_provided: &'a RefCell<EmbeddingExtractorData<'extractor>>, user_provided: &'a RefCell<EmbeddingExtractorData<'extractor>>,
threads: &'a ThreadPoolNoAbort, threads: &'a ThreadPoolNoAbort,
sender: EmbeddingSender<'a, 'b>, sender: EmbeddingSender<'a, 'b>,
@ -322,6 +327,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
prompt: &'a Prompt, prompt: &'a Prompt,
user_provided: &'a RefCell<EmbeddingExtractorData<'extractor>>, user_provided: &'a RefCell<EmbeddingExtractorData<'extractor>>,
possible_embedding_mistakes: &'a PossibleEmbeddingMistakes, possible_embedding_mistakes: &'a PossibleEmbeddingMistakes,
embedder_stats: &'a EmbedderStats,
threads: &'a ThreadPoolNoAbort, threads: &'a ThreadPoolNoAbort,
sender: EmbeddingSender<'a, 'b>, sender: EmbeddingSender<'a, 'b>,
doc_alloc: &'a Bump, doc_alloc: &'a Bump,
@ -336,6 +342,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
embedder, embedder,
prompt, prompt,
possible_embedding_mistakes, possible_embedding_mistakes,
embedder_stats,
threads, threads,
sender, sender,
embedder_id, embedder_id,
@ -371,6 +378,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
self.embedder_id, self.embedder_id,
self.embedder_name, self.embedder_name,
self.possible_embedding_mistakes, self.possible_embedding_mistakes,
self.embedder_stats,
unused_vectors_distribution, unused_vectors_distribution,
self.threads, self.threads,
self.sender, self.sender,
@ -389,6 +397,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
self.embedder_id, self.embedder_id,
self.embedder_name, self.embedder_name,
self.possible_embedding_mistakes, self.possible_embedding_mistakes,
self.embedder_stats,
unused_vectors_distribution, unused_vectors_distribution,
self.threads, self.threads,
self.sender, self.sender,
@ -407,6 +416,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
embedder_id: u8, embedder_id: u8,
embedder_name: &str, embedder_name: &str,
possible_embedding_mistakes: &PossibleEmbeddingMistakes, possible_embedding_mistakes: &PossibleEmbeddingMistakes,
embedder_stats: &EmbedderStats,
unused_vectors_distribution: &UnusedVectorsDistributionBump, unused_vectors_distribution: &UnusedVectorsDistributionBump,
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
sender: EmbeddingSender<'a, 'b>, sender: EmbeddingSender<'a, 'b>,
@ -450,7 +460,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
return Err(crate::Error::UserError(crate::UserError::DocumentEmbeddingError(msg))); return Err(crate::Error::UserError(crate::UserError::DocumentEmbeddingError(msg)));
} }
let res = match embedder.embed_index_ref(texts.as_slice(), threads) { let res = match embedder.embed_index_ref(texts.as_slice(), threads, embedder_stats) {
Ok(embeddings) => { Ok(embeddings) => {
for (docid, embedding) in ids.into_iter().zip(embeddings) { for (docid, embedding) in ids.into_iter().zip(embeddings) {
sender.set_vector(*docid, embedder_id, embedding).unwrap(); sender.set_vector(*docid, embedder_id, embedding).unwrap();

View File

@ -13,6 +13,7 @@ use super::super::thread_local::{FullySend, ThreadLocal};
use super::super::FacetFieldIdsDelta; use super::super::FacetFieldIdsDelta;
use super::document_changes::{extract, DocumentChanges, IndexingContext}; use super::document_changes::{extract, DocumentChanges, IndexingContext};
use crate::index::IndexEmbeddingConfig; use crate::index::IndexEmbeddingConfig;
use crate::progress::EmbedderStats;
use crate::progress::MergingWordCache; use crate::progress::MergingWordCache;
use crate::proximity::ProximityPrecision; use crate::proximity::ProximityPrecision;
use crate::update::new::extract::EmbeddingExtractor; use crate::update::new::extract::EmbeddingExtractor;
@ -34,6 +35,7 @@ pub(super) fn extract_all<'pl, 'extractor, DC, MSP>(
mut index_embeddings: Vec<IndexEmbeddingConfig>, mut index_embeddings: Vec<IndexEmbeddingConfig>,
document_ids: &mut RoaringBitmap, document_ids: &mut RoaringBitmap,
modified_docids: &mut RoaringBitmap, modified_docids: &mut RoaringBitmap,
embedder_stats: &EmbedderStats,
) -> Result<(FacetFieldIdsDelta, Vec<IndexEmbeddingConfig>)> ) -> Result<(FacetFieldIdsDelta, Vec<IndexEmbeddingConfig>)>
where where
DC: DocumentChanges<'pl>, DC: DocumentChanges<'pl>,
@ -245,6 +247,7 @@ where
embedders, embedders,
embedding_sender, embedding_sender,
field_distribution, field_distribution,
embedder_stats,
request_threads(), request_threads(),
); );
let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads());

View File

@ -19,7 +19,7 @@ use super::steps::IndexingStep;
use super::thread_local::ThreadLocal; use super::thread_local::ThreadLocal;
use crate::documents::PrimaryKey; use crate::documents::PrimaryKey;
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
use crate::progress::Progress; use crate::progress::{EmbedderStats, Progress};
use crate::update::GrenadParameters; use crate::update::GrenadParameters;
use crate::vector::{ArroyWrapper, EmbeddingConfigs}; use crate::vector::{ArroyWrapper, EmbeddingConfigs};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort}; use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort};
@ -55,6 +55,7 @@ pub fn index<'pl, 'indexer, 'index, DC, MSP>(
embedders: EmbeddingConfigs, embedders: EmbeddingConfigs,
must_stop_processing: &'indexer MSP, must_stop_processing: &'indexer MSP,
progress: &'indexer Progress, progress: &'indexer Progress,
embedder_stats: &'indexer EmbedderStats,
) -> Result<ChannelCongestion> ) -> Result<ChannelCongestion>
where where
DC: DocumentChanges<'pl>, DC: DocumentChanges<'pl>,
@ -158,6 +159,7 @@ where
index_embeddings, index_embeddings,
document_ids, document_ids,
modified_docids, modified_docids,
embedder_stats,
) )
}) })
.unwrap() .unwrap()

View File

@ -27,6 +27,7 @@ use crate::index::{
DEFAULT_MIN_WORD_LEN_ONE_TYPO, DEFAULT_MIN_WORD_LEN_TWO_TYPOS, DEFAULT_MIN_WORD_LEN_ONE_TYPO, DEFAULT_MIN_WORD_LEN_TWO_TYPOS,
}; };
use crate::order_by_map::OrderByMap; use crate::order_by_map::OrderByMap;
use crate::progress::EmbedderStats;
use crate::prompt::{default_max_bytes, default_template_text, PromptData}; use crate::prompt::{default_max_bytes, default_template_text, PromptData};
use crate::proximity::ProximityPrecision; use crate::proximity::ProximityPrecision;
use crate::update::index_documents::IndexDocumentsMethod; use crate::update::index_documents::IndexDocumentsMethod;
@ -466,7 +467,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
#[tracing::instrument( #[tracing::instrument(
level = "trace" level = "trace"
skip(self, progress_callback, should_abort, settings_diff), skip(self, progress_callback, should_abort, settings_diff, embedder_stats),
target = "indexing::documents" target = "indexing::documents"
)] )]
fn reindex<FP, FA>( fn reindex<FP, FA>(
@ -474,6 +475,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
progress_callback: &FP, progress_callback: &FP,
should_abort: &FA, should_abort: &FA,
settings_diff: InnerIndexSettingsDiff, settings_diff: InnerIndexSettingsDiff,
embedder_stats: &Arc<EmbedderStats>,
) -> Result<()> ) -> Result<()>
where where
FP: Fn(UpdateIndexingStep) + Sync, FP: Fn(UpdateIndexingStep) + Sync,
@ -505,6 +507,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
IndexDocumentsConfig::default(), IndexDocumentsConfig::default(),
&progress_callback, &progress_callback,
&should_abort, &should_abort,
embedder_stats,
)?; )?;
indexing_builder.execute_raw(output)?; indexing_builder.execute_raw(output)?;
@ -1355,7 +1358,12 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
} }
} }
pub fn execute<FP, FA>(mut self, progress_callback: FP, should_abort: FA) -> Result<()> pub fn execute<FP, FA>(
mut self,
progress_callback: FP,
should_abort: FA,
embedder_stats: Arc<EmbedderStats>,
) -> Result<()>
where where
FP: Fn(UpdateIndexingStep) + Sync, FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync, FA: Fn() -> bool + Sync,
@ -1413,7 +1421,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
); );
if inner_settings_diff.any_reindexing_needed() { if inner_settings_diff.any_reindexing_needed() {
self.reindex(&progress_callback, &should_abort, inner_settings_diff)?; self.reindex(&progress_callback, &should_abort, inner_settings_diff, &embedder_stats)?;
} }
Ok(()) Ok(())

View File

@ -7,6 +7,7 @@ use super::{
hf, manual, ollama, openai, rest, DistributionShift, EmbedError, Embedding, EmbeddingCache, hf, manual, ollama, openai, rest, DistributionShift, EmbedError, Embedding, EmbeddingCache,
NewEmbedderError, NewEmbedderError,
}; };
use crate::progress::EmbedderStats;
use crate::ThreadPoolNoAbort; use crate::ThreadPoolNoAbort;
#[derive(Debug)] #[derive(Debug)]
@ -81,6 +82,7 @@ impl Embedder {
"This is a sample text. It is meant to compare similarity.".into(), "This is a sample text. It is meant to compare similarity.".into(),
], ],
None, None,
None,
) )
.map_err(|error| NewEmbedderError::composite_test_embedding_failed(error, "search"))?; .map_err(|error| NewEmbedderError::composite_test_embedding_failed(error, "search"))?;
@ -92,6 +94,7 @@ impl Embedder {
"This is a sample text. It is meant to compare similarity.".into(), "This is a sample text. It is meant to compare similarity.".into(),
], ],
None, None,
None,
) )
.map_err(|error| { .map_err(|error| {
NewEmbedderError::composite_test_embedding_failed(error, "indexing") NewEmbedderError::composite_test_embedding_failed(error, "indexing")
@ -150,13 +153,14 @@ impl SubEmbedder {
&self, &self,
texts: Vec<String>, texts: Vec<String>,
deadline: Option<Instant>, deadline: Option<Instant>,
embedder_stats: Option<&EmbedderStats>,
) -> std::result::Result<Vec<Embedding>, EmbedError> { ) -> std::result::Result<Vec<Embedding>, EmbedError> {
match self { match self {
SubEmbedder::HuggingFace(embedder) => embedder.embed(texts), SubEmbedder::HuggingFace(embedder) => embedder.embed(texts),
SubEmbedder::OpenAi(embedder) => embedder.embed(&texts, deadline), SubEmbedder::OpenAi(embedder) => embedder.embed(&texts, deadline, embedder_stats),
SubEmbedder::Ollama(embedder) => embedder.embed(&texts, deadline), SubEmbedder::Ollama(embedder) => embedder.embed(&texts, deadline, embedder_stats),
SubEmbedder::UserProvided(embedder) => embedder.embed(&texts), SubEmbedder::UserProvided(embedder) => embedder.embed(&texts),
SubEmbedder::Rest(embedder) => embedder.embed(texts, deadline), SubEmbedder::Rest(embedder) => embedder.embed(texts, deadline, embedder_stats),
} }
} }
@ -164,18 +168,21 @@ impl SubEmbedder {
&self, &self,
text: &str, text: &str,
deadline: Option<Instant>, deadline: Option<Instant>,
embedder_stats: Option<&EmbedderStats>,
) -> std::result::Result<Embedding, EmbedError> { ) -> std::result::Result<Embedding, EmbedError> {
match self { match self {
SubEmbedder::HuggingFace(embedder) => embedder.embed_one(text), SubEmbedder::HuggingFace(embedder) => embedder.embed_one(text),
SubEmbedder::OpenAi(embedder) => { SubEmbedder::OpenAi(embedder) => embedder
embedder.embed(&[text], deadline)?.pop().ok_or_else(EmbedError::missing_embedding) .embed(&[text], deadline, embedder_stats)?
} .pop()
SubEmbedder::Ollama(embedder) => { .ok_or_else(EmbedError::missing_embedding),
embedder.embed(&[text], deadline)?.pop().ok_or_else(EmbedError::missing_embedding) SubEmbedder::Ollama(embedder) => embedder
} .embed(&[text], deadline, embedder_stats)?
.pop()
.ok_or_else(EmbedError::missing_embedding),
SubEmbedder::UserProvided(embedder) => embedder.embed_one(text), SubEmbedder::UserProvided(embedder) => embedder.embed_one(text),
SubEmbedder::Rest(embedder) => embedder SubEmbedder::Rest(embedder) => embedder
.embed_ref(&[text], deadline)? .embed_ref(&[text], deadline, embedder_stats)?
.pop() .pop()
.ok_or_else(EmbedError::missing_embedding), .ok_or_else(EmbedError::missing_embedding),
} }
@ -188,13 +195,20 @@ impl SubEmbedder {
&self, &self,
text_chunks: Vec<Vec<String>>, text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
embedder_stats: &EmbedderStats,
) -> std::result::Result<Vec<Vec<Embedding>>, EmbedError> { ) -> std::result::Result<Vec<Vec<Embedding>>, EmbedError> {
match self { match self {
SubEmbedder::HuggingFace(embedder) => embedder.embed_index(text_chunks), SubEmbedder::HuggingFace(embedder) => embedder.embed_index(text_chunks),
SubEmbedder::OpenAi(embedder) => embedder.embed_index(text_chunks, threads), SubEmbedder::OpenAi(embedder) => {
SubEmbedder::Ollama(embedder) => embedder.embed_index(text_chunks, threads), embedder.embed_index(text_chunks, threads, embedder_stats)
}
SubEmbedder::Ollama(embedder) => {
embedder.embed_index(text_chunks, threads, embedder_stats)
}
SubEmbedder::UserProvided(embedder) => embedder.embed_index(text_chunks), SubEmbedder::UserProvided(embedder) => embedder.embed_index(text_chunks),
SubEmbedder::Rest(embedder) => embedder.embed_index(text_chunks, threads), SubEmbedder::Rest(embedder) => {
embedder.embed_index(text_chunks, threads, embedder_stats)
}
} }
} }
@ -203,13 +217,18 @@ impl SubEmbedder {
&self, &self,
texts: &[&str], texts: &[&str],
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
embedder_stats: &EmbedderStats,
) -> std::result::Result<Vec<Embedding>, EmbedError> { ) -> std::result::Result<Vec<Embedding>, EmbedError> {
match self { match self {
SubEmbedder::HuggingFace(embedder) => embedder.embed_index_ref(texts), SubEmbedder::HuggingFace(embedder) => embedder.embed_index_ref(texts),
SubEmbedder::OpenAi(embedder) => embedder.embed_index_ref(texts, threads), SubEmbedder::OpenAi(embedder) => {
SubEmbedder::Ollama(embedder) => embedder.embed_index_ref(texts, threads), embedder.embed_index_ref(texts, threads, embedder_stats)
}
SubEmbedder::Ollama(embedder) => {
embedder.embed_index_ref(texts, threads, embedder_stats)
}
SubEmbedder::UserProvided(embedder) => embedder.embed_index_ref(texts), SubEmbedder::UserProvided(embedder) => embedder.embed_index_ref(texts),
SubEmbedder::Rest(embedder) => embedder.embed_index_ref(texts, threads), SubEmbedder::Rest(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats),
} }
} }

View File

@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize};
use utoipa::ToSchema; use utoipa::ToSchema;
use self::error::{EmbedError, NewEmbedderError}; use self::error::{EmbedError, NewEmbedderError};
use crate::progress::Progress; use crate::progress::{EmbedderStats, Progress};
use crate::prompt::{Prompt, PromptData}; use crate::prompt::{Prompt, PromptData};
use crate::ThreadPoolNoAbort; use crate::ThreadPoolNoAbort;
@ -719,18 +719,20 @@ impl Embedder {
} }
let embedding = match self { let embedding = match self {
Embedder::HuggingFace(embedder) => embedder.embed_one(text), Embedder::HuggingFace(embedder) => embedder.embed_one(text),
Embedder::OpenAi(embedder) => { Embedder::OpenAi(embedder) => embedder
embedder.embed(&[text], deadline)?.pop().ok_or_else(EmbedError::missing_embedding) .embed(&[text], deadline, None)?
}
Embedder::Ollama(embedder) => {
embedder.embed(&[text], deadline)?.pop().ok_or_else(EmbedError::missing_embedding)
}
Embedder::UserProvided(embedder) => embedder.embed_one(text),
Embedder::Rest(embedder) => embedder
.embed_ref(&[text], deadline)?
.pop() .pop()
.ok_or_else(EmbedError::missing_embedding), .ok_or_else(EmbedError::missing_embedding),
Embedder::Composite(embedder) => embedder.search.embed_one(text, deadline), Embedder::Ollama(embedder) => embedder
.embed(&[text], deadline, None)?
.pop()
.ok_or_else(EmbedError::missing_embedding),
Embedder::UserProvided(embedder) => embedder.embed_one(text),
Embedder::Rest(embedder) => embedder
.embed_ref(&[text], deadline, None)?
.pop()
.ok_or_else(EmbedError::missing_embedding),
Embedder::Composite(embedder) => embedder.search.embed_one(text, deadline, None),
}?; }?;
if let Some(cache) = self.cache() { if let Some(cache) = self.cache() {
@ -747,14 +749,21 @@ impl Embedder {
&self, &self,
text_chunks: Vec<Vec<String>>, text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
embedder_stats: &EmbedderStats,
) -> std::result::Result<Vec<Vec<Embedding>>, EmbedError> { ) -> std::result::Result<Vec<Vec<Embedding>>, EmbedError> {
match self { match self {
Embedder::HuggingFace(embedder) => embedder.embed_index(text_chunks), Embedder::HuggingFace(embedder) => embedder.embed_index(text_chunks),
Embedder::OpenAi(embedder) => embedder.embed_index(text_chunks, threads), Embedder::OpenAi(embedder) => {
Embedder::Ollama(embedder) => embedder.embed_index(text_chunks, threads), embedder.embed_index(text_chunks, threads, embedder_stats)
}
Embedder::Ollama(embedder) => {
embedder.embed_index(text_chunks, threads, embedder_stats)
}
Embedder::UserProvided(embedder) => embedder.embed_index(text_chunks), Embedder::UserProvided(embedder) => embedder.embed_index(text_chunks),
Embedder::Rest(embedder) => embedder.embed_index(text_chunks, threads), Embedder::Rest(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats),
Embedder::Composite(embedder) => embedder.index.embed_index(text_chunks, threads), Embedder::Composite(embedder) => {
embedder.index.embed_index(text_chunks, threads, embedder_stats)
}
} }
} }
@ -763,14 +772,17 @@ impl Embedder {
&self, &self,
texts: &[&str], texts: &[&str],
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
embedder_stats: &EmbedderStats,
) -> std::result::Result<Vec<Embedding>, EmbedError> { ) -> std::result::Result<Vec<Embedding>, EmbedError> {
match self { match self {
Embedder::HuggingFace(embedder) => embedder.embed_index_ref(texts), Embedder::HuggingFace(embedder) => embedder.embed_index_ref(texts),
Embedder::OpenAi(embedder) => embedder.embed_index_ref(texts, threads), Embedder::OpenAi(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats),
Embedder::Ollama(embedder) => embedder.embed_index_ref(texts, threads), Embedder::Ollama(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats),
Embedder::UserProvided(embedder) => embedder.embed_index_ref(texts), Embedder::UserProvided(embedder) => embedder.embed_index_ref(texts),
Embedder::Rest(embedder) => embedder.embed_index_ref(texts, threads), Embedder::Rest(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats),
Embedder::Composite(embedder) => embedder.index.embed_index_ref(texts, threads), Embedder::Composite(embedder) => {
embedder.index.embed_index_ref(texts, threads, embedder_stats)
}
} }
} }

View File

@ -7,6 +7,7 @@ use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErro
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions}; use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
use super::{DistributionShift, EmbeddingCache, REQUEST_PARALLELISM}; use super::{DistributionShift, EmbeddingCache, REQUEST_PARALLELISM};
use crate::error::FaultSource; use crate::error::FaultSource;
use crate::progress::EmbedderStats;
use crate::vector::Embedding; use crate::vector::Embedding;
use crate::ThreadPoolNoAbort; use crate::ThreadPoolNoAbort;
@ -104,8 +105,9 @@ impl Embedder {
&self, &self,
texts: &[S], texts: &[S],
deadline: Option<Instant>, deadline: Option<Instant>,
embedder_stats: Option<&EmbedderStats>,
) -> Result<Vec<Embedding>, EmbedError> { ) -> Result<Vec<Embedding>, EmbedError> {
match self.rest_embedder.embed_ref(texts, deadline) { match self.rest_embedder.embed_ref(texts, deadline, embedder_stats) {
Ok(embeddings) => Ok(embeddings), Ok(embeddings) => Ok(embeddings),
Err(EmbedError { kind: EmbedErrorKind::RestOtherStatusCode(404, error), fault: _ }) => { Err(EmbedError { kind: EmbedErrorKind::RestOtherStatusCode(404, error), fault: _ }) => {
Err(EmbedError::ollama_model_not_found(error)) Err(EmbedError::ollama_model_not_found(error))
@ -118,15 +120,22 @@ impl Embedder {
&self, &self,
text_chunks: Vec<Vec<String>>, text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
embedder_stats: &EmbedderStats,
) -> Result<Vec<Vec<Embedding>>, EmbedError> { ) -> Result<Vec<Vec<Embedding>>, EmbedError> {
// This condition helps reduce the number of active rayon jobs // This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows. // so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM { if threads.active_operations() >= REQUEST_PARALLELISM {
text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect() text_chunks
.into_iter()
.map(move |chunk| self.embed(&chunk, None, Some(embedder_stats)))
.collect()
} else { } else {
threads threads
.install(move || { .install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect() text_chunks
.into_par_iter()
.map(move |chunk| self.embed(&chunk, None, Some(embedder_stats)))
.collect()
}) })
.map_err(|error| EmbedError { .map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error), kind: EmbedErrorKind::PanicInThreadPool(error),
@ -139,13 +148,14 @@ impl Embedder {
&self, &self,
texts: &[&str], texts: &[&str],
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
embedder_stats: &EmbedderStats,
) -> Result<Vec<Vec<f32>>, EmbedError> { ) -> Result<Vec<Vec<f32>>, EmbedError> {
// This condition helps reduce the number of active rayon jobs // This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows. // so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM { if threads.active_operations() >= REQUEST_PARALLELISM {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.chunks(self.prompt_count_in_chunk_hint()) .chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk, None)) .map(move |chunk| self.embed(chunk, None, Some(embedder_stats)))
.collect(); .collect();
let embeddings = embeddings?; let embeddings = embeddings?;
@ -155,7 +165,7 @@ impl Embedder {
.install(move || { .install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.prompt_count_in_chunk_hint()) .par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk, None)) .map(move |chunk| self.embed(chunk, None, Some(embedder_stats)))
.collect(); .collect();
let embeddings = embeddings?; let embeddings = embeddings?;

View File

@ -9,6 +9,7 @@ use super::error::{EmbedError, NewEmbedderError};
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions}; use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
use super::{DistributionShift, EmbeddingCache, REQUEST_PARALLELISM}; use super::{DistributionShift, EmbeddingCache, REQUEST_PARALLELISM};
use crate::error::FaultSource; use crate::error::FaultSource;
use crate::progress::EmbedderStats;
use crate::vector::error::EmbedErrorKind; use crate::vector::error::EmbedErrorKind;
use crate::vector::Embedding; use crate::vector::Embedding;
use crate::ThreadPoolNoAbort; use crate::ThreadPoolNoAbort;
@ -215,8 +216,9 @@ impl Embedder {
&self, &self,
texts: &[S], texts: &[S],
deadline: Option<Instant>, deadline: Option<Instant>,
embedder_stats: Option<&EmbedderStats>,
) -> Result<Vec<Embedding>, EmbedError> { ) -> Result<Vec<Embedding>, EmbedError> {
match self.rest_embedder.embed_ref(texts, deadline) { match self.rest_embedder.embed_ref(texts, deadline, embedder_stats) {
Ok(embeddings) => Ok(embeddings), Ok(embeddings) => Ok(embeddings),
Err(EmbedError { kind: EmbedErrorKind::RestBadRequest(error, _), fault: _ }) => { Err(EmbedError { kind: EmbedErrorKind::RestBadRequest(error, _), fault: _ }) => {
tracing::warn!(error=?error, "OpenAI: received `BAD_REQUEST`. Input was maybe too long, retrying on tokenized version. For best performance, limit the size of your document template."); tracing::warn!(error=?error, "OpenAI: received `BAD_REQUEST`. Input was maybe too long, retrying on tokenized version. For best performance, limit the size of your document template.");
@ -238,7 +240,11 @@ impl Embedder {
let encoded = self.tokenizer.encode_ordinary(text); let encoded = self.tokenizer.encode_ordinary(text);
let len = encoded.len(); let len = encoded.len();
if len < max_token_count { if len < max_token_count {
all_embeddings.append(&mut self.rest_embedder.embed_ref(&[text], deadline)?); all_embeddings.append(&mut self.rest_embedder.embed_ref(
&[text],
deadline,
None,
)?);
continue; continue;
} }
@ -255,15 +261,22 @@ impl Embedder {
&self, &self,
text_chunks: Vec<Vec<String>>, text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
embedder_stats: &EmbedderStats,
) -> Result<Vec<Vec<Embedding>>, EmbedError> { ) -> Result<Vec<Vec<Embedding>>, EmbedError> {
// This condition helps reduce the number of active rayon jobs // This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows. // so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM { if threads.active_operations() >= REQUEST_PARALLELISM {
text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect() text_chunks
.into_iter()
.map(move |chunk| self.embed(&chunk, None, Some(embedder_stats)))
.collect()
} else { } else {
threads threads
.install(move || { .install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect() text_chunks
.into_par_iter()
.map(move |chunk| self.embed(&chunk, None, Some(embedder_stats)))
.collect()
}) })
.map_err(|error| EmbedError { .map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error), kind: EmbedErrorKind::PanicInThreadPool(error),
@ -276,13 +289,14 @@ impl Embedder {
&self, &self,
texts: &[&str], texts: &[&str],
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
embedder_stats: &EmbedderStats,
) -> Result<Vec<Vec<f32>>, EmbedError> { ) -> Result<Vec<Vec<f32>>, EmbedError> {
// This condition helps reduce the number of active rayon jobs // This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows. // so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM { if threads.active_operations() >= REQUEST_PARALLELISM {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.chunks(self.prompt_count_in_chunk_hint()) .chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk, None)) .map(move |chunk| self.embed(chunk, None, Some(embedder_stats)))
.collect(); .collect();
let embeddings = embeddings?; let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect()) Ok(embeddings.into_iter().flatten().collect())
@ -291,7 +305,7 @@ impl Embedder {
.install(move || { .install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.prompt_count_in_chunk_hint()) .par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk, None)) .map(move |chunk| self.embed(chunk, None, Some(embedder_stats)))
.collect(); .collect();
let embeddings = embeddings?; let embeddings = embeddings?;

View File

@ -13,6 +13,7 @@ use super::{
DistributionShift, EmbedError, Embedding, EmbeddingCache, NewEmbedderError, REQUEST_PARALLELISM, DistributionShift, EmbedError, Embedding, EmbeddingCache, NewEmbedderError, REQUEST_PARALLELISM,
}; };
use crate::error::FaultSource; use crate::error::FaultSource;
use crate::progress::EmbedderStats;
use crate::ThreadPoolNoAbort; use crate::ThreadPoolNoAbort;
// retrying in case of failure // retrying in case of failure
@ -168,19 +169,28 @@ impl Embedder {
&self, &self,
texts: Vec<String>, texts: Vec<String>,
deadline: Option<Instant>, deadline: Option<Instant>,
embedder_stats: Option<&EmbedderStats>,
) -> Result<Vec<Embedding>, EmbedError> { ) -> Result<Vec<Embedding>, EmbedError> {
embed(&self.data, texts.as_slice(), texts.len(), Some(self.dimensions), deadline) embed(
&self.data,
texts.as_slice(),
texts.len(),
Some(self.dimensions),
deadline,
embedder_stats,
)
} }
pub fn embed_ref<S>( pub fn embed_ref<S>(
&self, &self,
texts: &[S], texts: &[S],
deadline: Option<Instant>, deadline: Option<Instant>,
embedder_stats: Option<&EmbedderStats>,
) -> Result<Vec<Embedding>, EmbedError> ) -> Result<Vec<Embedding>, EmbedError>
where where
S: AsRef<str> + Serialize, S: AsRef<str> + Serialize,
{ {
embed(&self.data, texts, texts.len(), Some(self.dimensions), deadline) embed(&self.data, texts, texts.len(), Some(self.dimensions), deadline, embedder_stats)
} }
pub fn embed_tokens( pub fn embed_tokens(
@ -188,7 +198,7 @@ impl Embedder {
tokens: &[u32], tokens: &[u32],
deadline: Option<Instant>, deadline: Option<Instant>,
) -> Result<Embedding, EmbedError> { ) -> Result<Embedding, EmbedError> {
let mut embeddings = embed(&self.data, tokens, 1, Some(self.dimensions), deadline)?; let mut embeddings = embed(&self.data, tokens, 1, Some(self.dimensions), deadline, None)?;
// unwrap: guaranteed that embeddings.len() == 1, otherwise the previous line terminated in error // unwrap: guaranteed that embeddings.len() == 1, otherwise the previous line terminated in error
Ok(embeddings.pop().unwrap()) Ok(embeddings.pop().unwrap())
} }
@ -197,15 +207,22 @@ impl Embedder {
&self, &self,
text_chunks: Vec<Vec<String>>, text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
embedder_stats: &EmbedderStats,
) -> Result<Vec<Vec<Embedding>>, EmbedError> { ) -> Result<Vec<Vec<Embedding>>, EmbedError> {
// This condition helps reduce the number of active rayon jobs // This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows. // so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM { if threads.active_operations() >= REQUEST_PARALLELISM {
text_chunks.into_iter().map(move |chunk| self.embed(chunk, None)).collect() text_chunks
.into_iter()
.map(move |chunk| self.embed(chunk, None, Some(embedder_stats)))
.collect()
} else { } else {
threads threads
.install(move || { .install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect() text_chunks
.into_par_iter()
.map(move |chunk| self.embed(chunk, None, Some(embedder_stats)))
.collect()
}) })
.map_err(|error| EmbedError { .map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error), kind: EmbedErrorKind::PanicInThreadPool(error),
@ -218,13 +235,14 @@ impl Embedder {
&self, &self,
texts: &[&str], texts: &[&str],
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
embedder_stats: &EmbedderStats,
) -> Result<Vec<Embedding>, EmbedError> { ) -> Result<Vec<Embedding>, EmbedError> {
// This condition helps reduce the number of active rayon jobs // This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows. // so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM { if threads.active_operations() >= REQUEST_PARALLELISM {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.chunks(self.prompt_count_in_chunk_hint()) .chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed_ref(chunk, None)) .map(move |chunk| self.embed_ref(chunk, None, Some(embedder_stats)))
.collect(); .collect();
let embeddings = embeddings?; let embeddings = embeddings?;
@ -234,7 +252,7 @@ impl Embedder {
.install(move || { .install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.prompt_count_in_chunk_hint()) .par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed_ref(chunk, None)) .map(move |chunk| self.embed_ref(chunk, None, Some(embedder_stats)))
.collect(); .collect();
let embeddings = embeddings?; let embeddings = embeddings?;
@ -272,7 +290,7 @@ impl Embedder {
} }
fn infer_dimensions(data: &EmbedderData) -> Result<usize, NewEmbedderError> { fn infer_dimensions(data: &EmbedderData) -> Result<usize, NewEmbedderError> {
let v = embed(data, ["test"].as_slice(), 1, None, None) let v = embed(data, ["test"].as_slice(), 1, None, None, None)
.map_err(NewEmbedderError::could_not_determine_dimension)?; .map_err(NewEmbedderError::could_not_determine_dimension)?;
// unwrap: guaranteed that v.len() == 1, otherwise the previous line terminated in error // unwrap: guaranteed that v.len() == 1, otherwise the previous line terminated in error
Ok(v.first().unwrap().len()) Ok(v.first().unwrap().len())
@ -284,6 +302,7 @@ fn embed<S>(
expected_count: usize, expected_count: usize,
expected_dimension: Option<usize>, expected_dimension: Option<usize>,
deadline: Option<Instant>, deadline: Option<Instant>,
embedder_stats: Option<&EmbedderStats>,
) -> Result<Vec<Embedding>, EmbedError> ) -> Result<Vec<Embedding>, EmbedError>
where where
S: Serialize, S: Serialize,
@ -302,6 +321,9 @@ where
let body = data.request.inject_texts(inputs); let body = data.request.inject_texts(inputs);
for attempt in 0..10 { for attempt in 0..10 {
if let Some(embedder_stats) = &embedder_stats {
embedder_stats.total_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
let response = request.clone().send_json(&body); let response = request.clone().send_json(&body);
let result = check_response(response, data.configuration_source).and_then(|response| { let result = check_response(response, data.configuration_source).and_then(|response| {
response_to_embedding(response, data, expected_count, expected_dimension) response_to_embedding(response, data, expected_count, expected_dimension)
@ -311,6 +333,13 @@ where
Ok(response) => return Ok(response), Ok(response) => return Ok(response),
Err(retry) => { Err(retry) => {
tracing::warn!("Failed: {}", retry.error); tracing::warn!("Failed: {}", retry.error);
if let Some(embedder_stats) = &embedder_stats {
let stringified_error = retry.error.to_string();
let mut errors =
embedder_stats.errors.write().unwrap_or_else(|p| p.into_inner());
errors.0 = Some(stringified_error);
errors.1 += 1;
}
if let Some(deadline) = deadline { if let Some(deadline) = deadline {
let now = std::time::Instant::now(); let now = std::time::Instant::now();
if now > deadline { if now > deadline {
@ -336,12 +365,26 @@ where
std::thread::sleep(retry_duration); std::thread::sleep(retry_duration);
} }
if let Some(embedder_stats) = &embedder_stats {
embedder_stats.total_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
let response = request.send_json(&body); let response = request.send_json(&body);
let result = check_response(response, data.configuration_source); let result = check_response(response, data.configuration_source).and_then(|response| {
result.map_err(Retry::into_error).and_then(|response| {
response_to_embedding(response, data, expected_count, expected_dimension) response_to_embedding(response, data, expected_count, expected_dimension)
.map_err(Retry::into_error) });
})
match result {
Ok(response) => Ok(response),
Err(retry) => {
if let Some(embedder_stats) = &embedder_stats {
let stringified_error = retry.error.to_string();
let mut errors = embedder_stats.errors.write().unwrap_or_else(|p| p.into_inner());
errors.0 = Some(stringified_error);
errors.1 += 1;
};
Err(retry.into_error())
}
}
} }
fn check_response( fn check_response(

View File

@ -19,7 +19,7 @@ macro_rules! test_distinct {
let config = milli::update::IndexerConfig::default(); let config = milli::update::IndexerConfig::default();
let mut builder = Settings::new(&mut wtxn, &index, &config); let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_distinct_field(S(stringify!($distinct))); builder.set_distinct_field(S(stringify!($distinct)));
builder.execute(|_| (), || false).unwrap(); builder.execute(|_| (), || false, Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();

View File

@ -25,7 +25,7 @@ fn test_facet_distribution_with_no_facet_values() {
FilterableAttributesRule::Field(S("genres")), FilterableAttributesRule::Field(S("genres")),
FilterableAttributesRule::Field(S("tags")), FilterableAttributesRule::Field(S("tags")),
]); ]);
builder.execute(|_| (), || false).unwrap(); builder.execute(|_| (), || false, Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// index documents // index documents
@ -74,6 +74,7 @@ fn test_facet_distribution_with_no_facet_values() {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();

View File

@ -63,7 +63,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
S("america") => vec![S("the united states")], S("america") => vec![S("the united states")],
}); });
builder.set_searchable_fields(vec![S("title"), S("description")]); builder.set_searchable_fields(vec![S("title"), S("description")]);
builder.execute(|_| (), || false).unwrap(); builder.execute(|_| (), || false, Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
// index documents // index documents
@ -114,6 +114,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();

View File

@ -10,7 +10,7 @@ fn set_stop_words(index: &Index, stop_words: &[&str]) {
let mut builder = Settings::new(&mut wtxn, index, &config); let mut builder = Settings::new(&mut wtxn, index, &config);
let stop_words = stop_words.iter().map(|s| s.to_string()).collect(); let stop_words = stop_words.iter().map(|s| s.to_string()).collect();
builder.set_stop_words(stop_words); builder.set_stop_words(stop_words);
builder.execute(|_| (), || false).unwrap(); builder.execute(|_| (), || false, Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
} }

View File

@ -236,7 +236,7 @@ fn criteria_mixup() {
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index, &config); let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_criteria(criteria.clone()); builder.set_criteria(criteria.clone());
builder.execute(|_| (), || false).unwrap(); builder.execute(|_| (), || false, Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();
@ -276,7 +276,7 @@ fn criteria_ascdesc() {
S("name"), S("name"),
S("age"), S("age"),
}); });
builder.execute(|_| (), || false).unwrap(); builder.execute(|_| (), || false, Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
@ -344,6 +344,7 @@ fn criteria_ascdesc() {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -358,7 +359,7 @@ fn criteria_ascdesc() {
let mut wtxn = index.write_txn().unwrap(); let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index, &config); let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_criteria(vec![criterion.clone()]); builder.set_criteria(vec![criterion.clone()]);
builder.execute(|_| (), || false).unwrap(); builder.execute(|_| (), || false, Default::default()).unwrap();
wtxn.commit().unwrap(); wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap(); let rtxn = index.read_txn().unwrap();

View File

@ -46,7 +46,7 @@ fn test_typo_tolerance_one_typo() {
let config = IndexerConfig::default(); let config = IndexerConfig::default();
let mut builder = Settings::new(&mut txn, &index, &config); let mut builder = Settings::new(&mut txn, &index, &config);
builder.set_min_word_len_one_typo(4); builder.set_min_word_len_one_typo(4);
builder.execute(|_| (), || false).unwrap(); builder.execute(|_| (), || false, Default::default()).unwrap();
// typo is now supported for 4 letters words // typo is now supported for 4 letters words
let mut search = Search::new(&txn, &index); let mut search = Search::new(&txn, &index);
@ -92,7 +92,7 @@ fn test_typo_tolerance_two_typo() {
let config = IndexerConfig::default(); let config = IndexerConfig::default();
let mut builder = Settings::new(&mut txn, &index, &config); let mut builder = Settings::new(&mut txn, &index, &config);
builder.set_min_word_len_two_typos(7); builder.set_min_word_len_two_typos(7);
builder.execute(|_| (), || false).unwrap(); builder.execute(|_| (), || false, Default::default()).unwrap();
// typo is now supported for 4 letters words // typo is now supported for 4 letters words
let mut search = Search::new(&txn, &index); let mut search = Search::new(&txn, &index);
@ -153,6 +153,7 @@ fn test_typo_disabled_on_word() {
embedders, embedders,
&|| false, &|| false,
&Progress::default(), &Progress::default(),
&Default::default(),
) )
.unwrap(); .unwrap();
@ -180,7 +181,7 @@ fn test_typo_disabled_on_word() {
// `zealand` doesn't allow typos anymore // `zealand` doesn't allow typos anymore
exact_words.insert("zealand".to_string()); exact_words.insert("zealand".to_string());
builder.set_exact_words(exact_words); builder.set_exact_words(exact_words);
builder.execute(|_| (), || false).unwrap(); builder.execute(|_| (), || false, Default::default()).unwrap();
let mut search = Search::new(&txn, &index); let mut search = Search::new(&txn, &index);
search.query("zealand"); search.query("zealand");
@ -218,7 +219,7 @@ fn test_disable_typo_on_attribute() {
let mut builder = Settings::new(&mut txn, &index, &config); let mut builder = Settings::new(&mut txn, &index, &config);
// disable typos on `description` // disable typos on `description`
builder.set_exact_attributes(vec!["description".to_string()].into_iter().collect()); builder.set_exact_attributes(vec!["description".to_string()].into_iter().collect());
builder.execute(|_| (), || false).unwrap(); builder.execute(|_| (), || false, Default::default()).unwrap();
let mut search = Search::new(&txn, &index); let mut search = Search::new(&txn, &index);
search.query("antebelum"); search.query("antebelum");