Move embedder stats out of progress

This commit is contained in:
Mubelotix 2025-06-23 15:24:14 +02:00
parent 4cadc8113b
commit 4925b30196
No known key found for this signature in database
GPG Key ID: 89F391DBCC8CE7F0
30 changed files with 255 additions and 69 deletions

View File

@ -65,7 +65,7 @@ fn setup_settings<'t>(
let sortable_fields = sortable_fields.iter().map(|s| s.to_string()).collect();
builder.set_sortable_fields(sortable_fields);
builder.execute(|_| (), || false, None).unwrap();
builder.execute(|_| (), || false, Default::default()).unwrap();
}
fn setup_index_with_settings(
@ -169,6 +169,7 @@ fn indexing_songs_default(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -235,6 +236,7 @@ fn reindexing_songs_default(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -279,6 +281,7 @@ fn reindexing_songs_default(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -347,6 +350,7 @@ fn deleting_songs_in_batches_default(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -423,6 +427,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -467,6 +472,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -507,6 +513,7 @@ fn indexing_songs_in_three_batches_default(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -574,6 +581,7 @@ fn indexing_songs_without_faceted_numbers(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -640,6 +648,7 @@ fn indexing_songs_without_faceted_fields(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -706,6 +715,7 @@ fn indexing_wiki(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -771,6 +781,7 @@ fn reindexing_wiki(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -815,6 +826,7 @@ fn reindexing_wiki(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -882,6 +894,7 @@ fn deleting_wiki_in_batches_default(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -958,6 +971,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -1003,6 +1017,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -1044,6 +1059,7 @@ fn indexing_wiki_in_three_batches(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -1110,6 +1126,7 @@ fn indexing_movies_default(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -1175,6 +1192,7 @@ fn reindexing_movies_default(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -1219,6 +1237,7 @@ fn reindexing_movies_default(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -1286,6 +1305,7 @@ fn deleting_movies_in_batches_default(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -1334,6 +1354,7 @@ fn delete_documents_from_ids(index: Index, document_ids_to_delete: Vec<RoaringBi
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -1398,6 +1419,7 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -1442,6 +1464,7 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -1482,6 +1505,7 @@ fn indexing_movies_in_three_batches(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -1571,6 +1595,7 @@ fn indexing_nested_movies_default(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -1661,6 +1686,7 @@ fn deleting_nested_movies_in_batches_default(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -1743,6 +1769,7 @@ fn indexing_nested_movies_without_faceted_fields(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -1809,6 +1836,7 @@ fn indexing_geo(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -1874,6 +1902,7 @@ fn reindexing_geo(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -1918,6 +1947,7 @@ fn reindexing_geo(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -1985,6 +2015,7 @@ fn deleting_geo_in_batches_default(c: &mut Criterion) {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();

View File

@ -90,7 +90,7 @@ pub fn base_setup(conf: &Conf) -> Index {
(conf.configure)(&mut builder);
builder.execute(|_| (), || false, None).unwrap();
builder.execute(|_| (), || false, Default::default()).unwrap();
wtxn.commit().unwrap();
let config = IndexerConfig::default();
@ -128,6 +128,7 @@ pub fn base_setup(conf: &Conf) -> Index {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();

View File

@ -328,8 +328,8 @@ pub(crate) mod test {
progress_trace: Default::default(),
write_channel_congestion: None,
internal_database_sizes: Default::default(),
embeddings: Default::default(),
},
embedder_stats: None,
enqueued_at: Some(BatchEnqueuedAt {
earliest: datetime!(2022-11-11 0:00 UTC),
oldest: datetime!(2022-11-11 0:00 UTC),

View File

@ -144,6 +144,7 @@ fn main() {
embedders,
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();

View File

@ -1,7 +1,7 @@
use std::collections::BTreeSet;
use std::fmt::Write;
use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchStats};
use meilisearch_types::batches::{Batch, BatchEmbeddingStats, BatchEnqueuedAt, BatchStats};
use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{Database, RoTxn};
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
@ -343,6 +343,7 @@ pub fn snapshot_batch(batch: &Batch) -> String {
uid,
details,
stats,
embedder_stats,
started_at,
finished_at,
progress: _,
@ -366,6 +367,7 @@ pub fn snapshot_batch(batch: &Batch) -> String {
snap.push_str(&format!("uid: {uid}, "));
snap.push_str(&format!("details: {}, ", serde_json::to_string(details).unwrap()));
snap.push_str(&format!("stats: {}, ", serde_json::to_string(&stats).unwrap()));
snap.push_str(&format!("embedder_stats: {}, ", serde_json::to_string(&embedder_stats).unwrap()));
snap.push_str(&format!("stop reason: {}, ", serde_json::to_string(&stop_reason).unwrap()));
snap.push('}');
snap

View File

@ -1,7 +1,7 @@
use std::collections::HashSet;
use std::ops::{Bound, RangeBounds};
use meilisearch_types::batches::{Batch, BatchId};
use meilisearch_types::batches::{Batch, BatchEmbeddingStats, BatchId};
use meilisearch_types::heed::types::{DecodeIgnore, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
@ -92,7 +92,10 @@ impl BatchQueue {
}
pub(crate) fn get_batch(&self, rtxn: &RoTxn, batch_id: BatchId) -> Result<Option<Batch>> {
Ok(self.all_batches.get(rtxn, &batch_id)?)
println!("Got batch from db {batch_id:?}");
let r = Ok(self.all_batches.get(rtxn, &batch_id)?);
println!("Got batch from db => {:?}", r);
r
}
/// Returns the whole set of batches that belongs to this index.
@ -171,6 +174,8 @@ impl BatchQueue {
pub(crate) fn write_batch(&self, wtxn: &mut RwTxn, batch: ProcessingBatch) -> Result<()> {
let old_batch = self.all_batches.get(wtxn, &batch.uid)?;
println!("Saving batch: {}", batch.embedder_stats.is_some());
self.all_batches.put(
wtxn,
&batch.uid,
@ -179,6 +184,7 @@ impl BatchQueue {
progress: None,
details: batch.details,
stats: batch.stats,
embedder_stats: batch.embedder_stats.as_ref().map(|s| BatchEmbeddingStats::from(s.as_ref())),
started_at: batch.started_at,
finished_at: batch.finished_at,
enqueued_at: batch.enqueued_at,

View File

@ -1,10 +1,11 @@
use std::collections::{BTreeSet, HashMap, HashSet};
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::progress::{EmbedderStats, Progress, VariableNameStep};
use meilisearch_types::milli::{self, ChannelCongestion};
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
@ -163,7 +164,7 @@ impl IndexScheduler {
let pre_commit_dabases_sizes = index.database_sizes(&index_wtxn)?;
let (tasks, congestion) =
self.apply_index_operation(&mut index_wtxn, &index, op, &progress)?;
self.apply_index_operation(&mut index_wtxn, &index, op, &progress, current_batch.clone_embedder_stats())?;
{
progress.update_progress(FinalizingIndexStep::Committing);
@ -238,11 +239,21 @@ impl IndexScheduler {
);
builder.set_primary_key(primary_key);
let must_stop_processing = self.scheduler.must_stop_processing.clone();
let embedder_stats = match current_batch.embedder_stats {
Some(ref stats) => stats.clone(),
None => {
let embedder_stats: Arc<EmbedderStats> = Default::default();
current_batch.embedder_stats = Some(embedder_stats.clone());
embedder_stats
},
};
builder
.execute(
|indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.get(),
Some(progress.embedder_stats),
embedder_stats,
)
.map_err(|e| Error::from_milli(e, Some(index_uid.to_string())))?;
index_wtxn.commit()?;

View File

@ -4,7 +4,7 @@ use bumpalo::collections::CollectIn;
use bumpalo::Bump;
use meilisearch_types::heed::RwTxn;
use meilisearch_types::milli::documents::PrimaryKey;
use meilisearch_types::milli::progress::Progress;
use meilisearch_types::milli::progress::{EmbedderStats, Progress};
use meilisearch_types::milli::update::new::indexer::{self, UpdateByFunction};
use meilisearch_types::milli::update::DocumentAdditionResult;
use meilisearch_types::milli::{self, ChannelCongestion, Filter};
@ -26,7 +26,7 @@ impl IndexScheduler {
/// The list of processed tasks.
#[tracing::instrument(
level = "trace",
skip(self, index_wtxn, index, progress),
skip(self, index_wtxn, index, progress, embedder_stats),
target = "indexing::scheduler"
)]
pub(crate) fn apply_index_operation<'i>(
@ -35,6 +35,7 @@ impl IndexScheduler {
index: &'i Index,
operation: IndexOperation,
progress: &Progress,
embedder_stats: Arc<EmbedderStats>,
) -> Result<(Vec<Task>, Option<ChannelCongestion>)> {
let indexer_alloc = Bump::new();
let started_processing_at = std::time::Instant::now();
@ -179,6 +180,7 @@ impl IndexScheduler {
embedders,
&|| must_stop_processing.get(),
progress,
embedder_stats,
)
.map_err(|e| Error::from_milli(e, Some(index_uid.clone())))?,
);
@ -290,6 +292,7 @@ impl IndexScheduler {
embedders,
&|| must_stop_processing.get(),
progress,
embedder_stats,
)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?,
);
@ -438,6 +441,7 @@ impl IndexScheduler {
embedders,
&|| must_stop_processing.get(),
progress,
embedder_stats,
)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?,
);
@ -474,7 +478,7 @@ impl IndexScheduler {
.execute(
|indexing_step| tracing::debug!(update = ?indexing_step),
|| must_stop_processing.get(),
Some(Arc::clone(&progress.embedder_stats))
embedder_stats,
)
.map_err(|err| Error::from_milli(err, Some(index_uid.clone())))?;
@ -494,6 +498,7 @@ impl IndexScheduler {
tasks: cleared_tasks,
},
progress,
embedder_stats.clone(),
)?;
let (settings_tasks, _congestion) = self.apply_index_operation(
@ -501,6 +506,7 @@ impl IndexScheduler {
index,
IndexOperation::Settings { index_uid, settings, tasks: settings_tasks },
progress,
embedder_stats,
)?;
let mut tasks = settings_tasks;

View File

@ -2,8 +2,10 @@
use std::collections::{BTreeSet, HashSet};
use std::ops::Bound;
use std::sync::Arc;
use crate::milli::progress::EmbedderStats;
use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchId, BatchStats};
use meilisearch_types::batches::{Batch, BatchEmbeddingStats, BatchEnqueuedAt, BatchId, BatchStats};
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
use meilisearch_types::milli::CboRoaringBitmapCodec;
use meilisearch_types::task_view::DetailsView;
@ -27,6 +29,7 @@ pub struct ProcessingBatch {
pub uid: BatchId,
pub details: DetailsView,
pub stats: BatchStats,
pub embedder_stats: Option<Arc<EmbedderStats>>,
pub statuses: HashSet<Status>,
pub kinds: HashSet<Kind>,
@ -48,6 +51,7 @@ impl ProcessingBatch {
uid,
details: DetailsView::default(),
stats: BatchStats::default(),
embedder_stats: None,
statuses,
kinds: HashSet::default(),
@ -60,6 +64,17 @@ impl ProcessingBatch {
}
}
pub fn clone_embedder_stats(&mut self) -> Arc<EmbedderStats> {
match self.embedder_stats {
Some(ref stats) => stats.clone(),
None => {
let embedder_stats: Arc<EmbedderStats> = Default::default();
self.embedder_stats = Some(embedder_stats.clone());
embedder_stats
},
}
}
/// Update itself with the content of the task and update the batch id in the task.
pub fn processing<'a>(&mut self, tasks: impl IntoIterator<Item = &'a mut Task>) {
for task in tasks.into_iter() {
@ -141,11 +156,13 @@ impl ProcessingBatch {
}
pub fn to_batch(&self) -> Batch {
println!("Converting to batch: {:?}", self.embedder_stats);
Batch {
uid: self.uid,
progress: None,
details: self.details.clone(),
stats: self.stats.clone(),
embedder_stats: self.embedder_stats.as_ref().map(|s| BatchEmbeddingStats::from(s.as_ref())),
started_at: self.started_at,
finished_at: self.finished_at,
enqueued_at: self.enqueued_at,

View File

@ -3,7 +3,7 @@ use serde::Serialize;
use time::{Duration, OffsetDateTime};
use utoipa::ToSchema;
use crate::batches::{Batch, BatchId, BatchStats};
use crate::batches::{Batch, BatchEmbeddingStats, BatchId, BatchStats};
use crate::task_view::DetailsView;
use crate::tasks::serialize_duration;
@ -14,7 +14,7 @@ pub struct BatchView {
pub uid: BatchId,
pub progress: Option<ProgressView>,
pub details: DetailsView,
pub stats: BatchStats,
pub stats: BatchStatsView,
#[serde(serialize_with = "serialize_duration", default)]
pub duration: Option<Duration>,
#[serde(with = "time::serde::rfc3339", default)]
@ -25,13 +25,26 @@ pub struct BatchView {
pub batch_strategy: String,
}
#[derive(Debug, Clone, Serialize, ToSchema)]
#[serde(rename_all = "camelCase")]
#[schema(rename_all = "camelCase")]
pub struct BatchStatsView {
#[serde(flatten)]
pub stats: BatchStats,
#[serde(skip_serializing_if = "BatchEmbeddingStats::skip_serializing")]
pub embedder: Option<BatchEmbeddingStats>,
}
impl BatchView {
pub fn from_batch(batch: &Batch) -> Self {
Self {
uid: batch.uid,
progress: batch.progress.clone(),
details: batch.details.clone(),
stats: BatchStatsView {
stats: batch.stats.clone(),
embedder: batch.embedder_stats.clone(),
},
duration: batch.finished_at.map(|finished_at| finished_at - batch.started_at),
started_at: batch.started_at,
finished_at: batch.finished_at,

View File

@ -1,6 +1,7 @@
use std::collections::BTreeMap;
use std::sync::Arc;
use milli::progress::ProgressView;
use milli::progress::{EmbedderStats, ProgressView};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use utoipa::ToSchema;
@ -19,6 +20,7 @@ pub struct Batch {
pub progress: Option<ProgressView>,
pub details: DetailsView,
pub stats: BatchStats,
pub embedder_stats: Option<BatchEmbeddingStats>,
#[serde(with = "time::serde::rfc3339")]
pub started_at: OffsetDateTime,
@ -43,6 +45,7 @@ impl PartialEq for Batch {
progress,
details,
stats,
embedder_stats,
started_at,
finished_at,
enqueued_at,
@ -53,6 +56,7 @@ impl PartialEq for Batch {
&& progress.is_none() == other.progress.is_none()
&& details == &other.details
&& stats == &other.stats
&& embedder_stats == &other.embedder_stats
&& started_at == &other.started_at
&& finished_at == &other.finished_at
&& enqueued_at == &other.enqueued_at
@ -82,7 +86,6 @@ pub struct BatchStats {
pub write_channel_congestion: Option<serde_json::Map<String, serde_json::Value>>,
#[serde(default, skip_serializing_if = "serde_json::Map::is_empty")]
pub internal_database_sizes: serde_json::Map<String, serde_json::Value>,
pub embeddings: BatchEmbeddingStats
}
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
@ -91,5 +94,26 @@ pub struct BatchStats {
pub struct BatchEmbeddingStats {
pub total_count: usize,
pub error_count: usize,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_error: Option<String>,
}
impl From<&EmbedderStats> for BatchEmbeddingStats {
fn from(stats: &EmbedderStats) -> Self {
let errors = stats.errors.read().unwrap();
Self {
total_count: stats.total_requests.load(std::sync::atomic::Ordering::Relaxed),
error_count: errors.1 as usize,
last_error: errors.0.clone(),
}
}
}
impl BatchEmbeddingStats {
pub fn skip_serializing(this: &Option<BatchEmbeddingStats>) -> bool {
match this {
Some(stats) => stats.total_count == 0 && stats.error_count == 0 && stats.last_error.is_none(),
None => true,
}
}
}

View File

@ -37,6 +37,7 @@ use index_scheduler::{IndexScheduler, IndexSchedulerOptions};
use meilisearch_auth::{open_auth_store_env, AuthController};
use meilisearch_types::milli::constants::VERSION_MAJOR;
use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
use meilisearch_types::milli::progress::EmbedderStats;
use meilisearch_types::milli::update::{
default_thread_pool_and_threads, IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig,
};
@ -542,8 +543,9 @@ fn import_dump(
tracing::info!("Importing the settings.");
let settings = index_reader.settings()?;
apply_settings_to_builder(&settings, &mut builder);
let embedder_stats: Arc<EmbedderStats> = Default::default(); // FIXME: this isn't linked to anything
builder
.execute(|indexing_step| tracing::debug!("update: {:?}", indexing_step), || false, None)?;
.execute(|indexing_step| tracing::debug!("update: {:?}", indexing_step), || false, embedder_stats.clone())?;
// 4.3 Import the documents.
// 4.3.1 We need to recreate the grenad+obkv format accepted by the index.
@ -574,7 +576,7 @@ fn import_dump(
},
|indexing_step| tracing::trace!("update: {:?}", indexing_step),
|| false,
None,
embedder_stats,
)?;
let builder = builder.with_embedders(embedders);

View File

@ -4,6 +4,8 @@ use meili_snap::{json_string, snapshot};
use reqwest::IntoUrl;
use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, Request, ResponseTemplate};
use std::thread::sleep;
use std::time::Duration;
use crate::common::Value;
use crate::json;
@ -305,6 +307,7 @@ async fn create_mock_raw() -> (MockServer, Value) {
Mock::given(method("POST"))
.and(path("/"))
.respond_with(move |req: &Request| {
println!("Sent!");
let req: String = match req.body_json() {
Ok(req) => req,
Err(error) => {
@ -2111,3 +2114,40 @@ async fn searchable_reindex() {
}
"###);
}
#[actix_rt::test]
async fn observability() {
let (_mock, setting) = create_mock_raw().await;
let server = get_server_vector().await;
let index = server.index("doggo");
let (response, code) = index
.update_settings(json!({
"embedders": {
"rest": setting,
},
}))
.await;
snapshot!(code, @"202 Accepted");
let task = server.wait_task(response.uid()).await;
snapshot!(task["status"], @r###""succeeded""###);
let documents = json!([
{"id": 0, "name": "kefir"},
{"id": 1, "name": "echo", "_vectors": { "rest": [1, 1, 1] }},
{"id": 2, "name": "intel"},
{"id": 3, "name": "missing"}, // Stuff that doesn't exist
{"id": 4, "name": "invalid"},
{"id": 5, "name": "foobar"},
]);
let (value, code) = index.add_documents(documents, None).await;
snapshot!(code, @"202 Accepted");
let batches = index.filtered_batches(&[], &[], &[]).await;
println!("Batches: {batches:?}");
let task = index.wait_task(value.uid()).await;
let batches = index.filtered_batches(&[], &[], &[]).await;
println!("Batches: {batches:?}");
}

View File

@ -20,7 +20,6 @@ pub trait Step: 'static + Send + Sync {
#[derive(Clone, Default)]
pub struct Progress {
steps: Arc<RwLock<InnerProgress>>,
pub embedder_stats: Arc<EmbedderStats>,
}
#[derive(Default)]
@ -29,6 +28,17 @@ pub struct EmbedderStats {
pub total_requests: AtomicUsize
}
impl std::fmt::Debug for EmbedderStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let (error, count) = self.errors.read().unwrap().clone();
f.debug_struct("EmbedderStats")
.field("errors", &error)
.field("total_requests", &self.total_requests.load(Ordering::Relaxed))
.field("error_count", &count)
.finish()
}
}
#[derive(Default)]
struct InnerProgress {
/// The hierarchy of steps.
@ -72,19 +82,7 @@ impl Progress {
});
}
let embedder_view = {
let (last_error, error_count) = match self.embedder_stats.errors.read() {
Ok(guard) => (guard.0.clone(), guard.1),
Err(_) => (None, 0),
};
EmbedderStatsView {
last_error,
request_count: self.embedder_stats.total_requests.load(Ordering::Relaxed) as u32,
error_count,
}
};
ProgressView { steps: step_view, percentage: percentage * 100.0, embedder: embedder_view }
ProgressView { steps: step_view, percentage: percentage * 100.0 }
}
pub fn accumulated_durations(&self) -> IndexMap<String, String> {
@ -228,7 +226,6 @@ make_enum_progress! {
pub struct ProgressView {
pub steps: Vec<ProgressStepView>,
pub percentage: f32,
pub embedder: EmbedderStatsView,
}
#[derive(Debug, Serialize, Clone, ToSchema)]
@ -240,16 +237,6 @@ pub struct ProgressStepView {
pub total: u32,
}
#[derive(Debug, Serialize, Clone, ToSchema)]
#[serde(rename_all = "camelCase")]
#[schema(rename_all = "camelCase")]
pub struct EmbedderStatsView {
#[serde(skip_serializing_if = "Option::is_none")]
pub last_error: Option<String>,
pub request_count: u32,
pub error_count: u32,
}
/// Used when the name can change but it's still the same step.
/// To avoid conflicts on the `TypeId`, create a unique type every time you use this step:
/// ```text

View File

@ -44,7 +44,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
S("america") => vec![S("the united states")],
});
builder.set_searchable_fields(vec![S("title"), S("description")]);
builder.execute(|_| (), || false, None).unwrap();
builder.execute(|_| (), || false, Default::default()).unwrap();
wtxn.commit().unwrap();
// index documents
@ -95,6 +95,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
embedders,
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();

View File

@ -103,6 +103,7 @@ impl TempIndex {
embedders,
&|| false,
&Progress::default(),
Default::default(),
)
})
.unwrap()?;
@ -134,7 +135,7 @@ impl TempIndex {
) -> Result<(), crate::error::Error> {
let mut builder = update::Settings::new(wtxn, &self.inner, &self.indexer_config);
update(&mut builder);
builder.execute(drop, || false, None)?;
builder.execute(drop, || false, Default::default())?;
Ok(())
}
@ -185,6 +186,7 @@ impl TempIndex {
embedders,
&|| false,
&Progress::default(),
Default::default(),
)
})
.unwrap()?;
@ -259,6 +261,7 @@ fn aborting_indexation() {
embedders,
&|| should_abort.load(Relaxed),
&Progress::default(),
Default::default(),
)
})
.unwrap()

View File

@ -687,6 +687,8 @@ pub fn extract_embeddings<R: io::Read + io::Seek>(
unused_vectors_distribution: &UnusedVectorsDistribution,
request_threads: &ThreadPoolNoAbort,
) -> Result<grenad::Reader<BufReader<File>>> {
println!("Extract embedder stats {}:", embedder_stats.is_some());
let n_chunks = embedder.chunk_count_hint(); // chunk level parallelism
let n_vectors_per_chunk = embedder.prompt_count_in_chunk_hint(); // number of vectors in a single chunk

View File

@ -50,7 +50,7 @@ pub(crate) fn data_from_obkv_documents(
settings_diff: Arc<InnerIndexSettingsDiff>,
max_positions_per_attributes: Option<u32>,
possible_embedding_mistakes: Arc<PossibleEmbeddingMistakes>,
embedder_stats: Option<Arc<EmbedderStats>>,
embedder_stats: Arc<EmbedderStats>,
) -> Result<()> {
let (original_pipeline_result, flattened_pipeline_result): (Result<_>, Result<_>) = rayon::join(
|| {
@ -234,7 +234,7 @@ fn send_original_documents_data(
embedders_configs: Arc<Vec<IndexEmbeddingConfig>>,
settings_diff: Arc<InnerIndexSettingsDiff>,
possible_embedding_mistakes: Arc<PossibleEmbeddingMistakes>,
embedder_stats: Option<Arc<EmbedderStats>>,
embedder_stats: Arc<EmbedderStats>,
) -> Result<()> {
let original_documents_chunk =
original_documents_chunk.and_then(|c| unsafe { as_cloneable_grenad(&c) })?;
@ -274,7 +274,7 @@ fn send_original_documents_data(
embedder.clone(),
&embedder_name,
&possible_embedding_mistakes,
embedder_stats.clone(),
Some(embedder_stats.clone()),
&unused_vectors_distribution,
request_threads(),
) {

View File

@ -81,7 +81,7 @@ pub struct IndexDocuments<'t, 'i, 'a, FP, FA> {
added_documents: u64,
deleted_documents: u64,
embedders: EmbeddingConfigs,
embedder_stats: Option<Arc<EmbedderStats>>,
embedder_stats: Arc<EmbedderStats>,
}
#[derive(Default, Debug, Clone)]
@ -104,7 +104,7 @@ where
config: IndexDocumentsConfig,
progress: FP,
should_abort: FA,
embedder_stats: Option<Arc<EmbedderStats>>,
embedder_stats: Arc<EmbedderStats>,
) -> Result<IndexDocuments<'t, 'i, 'a, FP, FA>> {
let transform = Some(Transform::new(
wtxn,
@ -2030,6 +2030,7 @@ mod tests {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
wtxn.commit().unwrap();
@ -2117,6 +2118,7 @@ mod tests {
EmbeddingConfigs::default(),
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
wtxn.commit().unwrap();
@ -2302,6 +2304,7 @@ mod tests {
embedders,
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
wtxn.commit().unwrap();
@ -2364,6 +2367,7 @@ mod tests {
embedders,
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
wtxn.commit().unwrap();
@ -2417,6 +2421,7 @@ mod tests {
embedders,
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
wtxn.commit().unwrap();
@ -2469,6 +2474,7 @@ mod tests {
embedders,
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
wtxn.commit().unwrap();
@ -2523,6 +2529,7 @@ mod tests {
embedders,
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
wtxn.commit().unwrap();
@ -2582,6 +2589,7 @@ mod tests {
embedders,
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
wtxn.commit().unwrap();
@ -2634,6 +2642,7 @@ mod tests {
embedders,
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
wtxn.commit().unwrap();
@ -2686,6 +2695,7 @@ mod tests {
embedders,
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
wtxn.commit().unwrap();
@ -2884,6 +2894,7 @@ mod tests {
embedders,
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
wtxn.commit().unwrap();
@ -2943,6 +2954,7 @@ mod tests {
embedders,
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
wtxn.commit().unwrap();
@ -2999,6 +3011,7 @@ mod tests {
embedders,
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
wtxn.commit().unwrap();

View File

@ -1,4 +1,5 @@
use std::cell::RefCell;
use std::f32::consts::E;
use std::{cell::RefCell, sync::Arc};
use bumpalo::collections::Vec as BVec;
use bumpalo::Bump;
@ -6,6 +7,7 @@ use hashbrown::{DefaultHashBuilder, HashMap};
use super::cache::DelAddRoaringBitmap;
use crate::error::FaultSource;
use crate::progress::EmbedderStats;
use crate::prompt::Prompt;
use crate::update::new::channel::EmbeddingSender;
use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor};
@ -22,6 +24,7 @@ pub struct EmbeddingExtractor<'a, 'b> {
embedders: &'a EmbeddingConfigs,
sender: EmbeddingSender<'a, 'b>,
possible_embedding_mistakes: PossibleEmbeddingMistakes,
embedder_stats: Option<Arc<EmbedderStats>>,
threads: &'a ThreadPoolNoAbort,
}
@ -30,10 +33,11 @@ impl<'a, 'b> EmbeddingExtractor<'a, 'b> {
embedders: &'a EmbeddingConfigs,
sender: EmbeddingSender<'a, 'b>,
field_distribution: &'a FieldDistribution,
embedder_stats: Option<Arc<EmbedderStats>>,
threads: &'a ThreadPoolNoAbort,
) -> Self {
let possible_embedding_mistakes = PossibleEmbeddingMistakes::new(field_distribution);
Self { embedders, sender, threads, possible_embedding_mistakes }
Self { embedders, sender, threads, possible_embedding_mistakes, embedder_stats }
}
}
@ -75,6 +79,7 @@ impl<'extractor> Extractor<'extractor> for EmbeddingExtractor<'_, '_> {
prompt,
context.data,
&self.possible_embedding_mistakes,
self.embedder_stats.clone(),
self.threads,
self.sender,
&context.doc_alloc,
@ -307,6 +312,7 @@ struct Chunks<'a, 'b, 'extractor> {
dimensions: usize,
prompt: &'a Prompt,
possible_embedding_mistakes: &'a PossibleEmbeddingMistakes,
embedder_stats: Option<Arc<EmbedderStats>>,
user_provided: &'a RefCell<EmbeddingExtractorData<'extractor>>,
threads: &'a ThreadPoolNoAbort,
sender: EmbeddingSender<'a, 'b>,
@ -322,6 +328,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
prompt: &'a Prompt,
user_provided: &'a RefCell<EmbeddingExtractorData<'extractor>>,
possible_embedding_mistakes: &'a PossibleEmbeddingMistakes,
embedder_stats: Option<Arc<EmbedderStats>>,
threads: &'a ThreadPoolNoAbort,
sender: EmbeddingSender<'a, 'b>,
doc_alloc: &'a Bump,
@ -336,6 +343,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
embedder,
prompt,
possible_embedding_mistakes,
embedder_stats,
threads,
sender,
embedder_id,
@ -371,6 +379,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
self.embedder_id,
self.embedder_name,
self.possible_embedding_mistakes,
self.embedder_stats.clone(),
unused_vectors_distribution,
self.threads,
self.sender,
@ -389,6 +398,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
self.embedder_id,
self.embedder_name,
self.possible_embedding_mistakes,
self.embedder_stats.clone(),
unused_vectors_distribution,
self.threads,
self.sender,
@ -407,6 +417,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
embedder_id: u8,
embedder_name: &str,
possible_embedding_mistakes: &PossibleEmbeddingMistakes,
embedder_stats: Option<Arc<EmbedderStats>>,
unused_vectors_distribution: &UnusedVectorsDistributionBump,
threads: &ThreadPoolNoAbort,
sender: EmbeddingSender<'a, 'b>,
@ -450,7 +461,7 @@ impl<'a, 'b, 'extractor> Chunks<'a, 'b, 'extractor> {
return Err(crate::Error::UserError(crate::UserError::DocumentEmbeddingError(msg)));
}
let res = match embedder.embed_index_ref(texts.as_slice(), threads, None) {
let res = match embedder.embed_index_ref(texts.as_slice(), threads, embedder_stats) {
Ok(embeddings) => {
for (docid, embedding) in ids.into_iter().zip(embeddings) {
sender.set_vector(*docid, embedder_id, embedding).unwrap();

View File

@ -1,6 +1,7 @@
use std::collections::BTreeMap;
use std::sync::atomic::AtomicBool;
use std::sync::OnceLock;
use std::sync::Arc;
use bumpalo::Bump;
use roaring::RoaringBitmap;
@ -13,6 +14,7 @@ use super::super::thread_local::{FullySend, ThreadLocal};
use super::super::FacetFieldIdsDelta;
use super::document_changes::{extract, DocumentChanges, IndexingContext};
use crate::index::IndexEmbeddingConfig;
use crate::progress::EmbedderStats;
use crate::progress::MergingWordCache;
use crate::proximity::ProximityPrecision;
use crate::update::new::extract::EmbeddingExtractor;
@ -34,6 +36,7 @@ pub(super) fn extract_all<'pl, 'extractor, DC, MSP>(
mut index_embeddings: Vec<IndexEmbeddingConfig>,
document_ids: &mut RoaringBitmap,
modified_docids: &mut RoaringBitmap,
embedder_stats: Arc<EmbedderStats>,
) -> Result<(FacetFieldIdsDelta, Vec<IndexEmbeddingConfig>)>
where
DC: DocumentChanges<'pl>,
@ -245,6 +248,7 @@ where
embedders,
embedding_sender,
field_distribution,
Some(embedder_stats),
request_threads(),
);
let mut datastore = ThreadLocal::with_capacity(rayon::current_num_threads());

View File

@ -1,6 +1,7 @@
use std::sync::atomic::AtomicBool;
use std::sync::{Once, RwLock};
use std::thread::{self, Builder};
use std::sync::Arc;
use big_s::S;
use document_changes::{DocumentChanges, IndexingContext};
@ -19,7 +20,7 @@ use super::steps::IndexingStep;
use super::thread_local::ThreadLocal;
use crate::documents::PrimaryKey;
use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder};
use crate::progress::Progress;
use crate::progress::{EmbedderStats, Progress};
use crate::update::GrenadParameters;
use crate::vector::{ArroyWrapper, EmbeddingConfigs};
use crate::{FieldsIdsMap, GlobalFieldsIdsMap, Index, InternalError, Result, ThreadPoolNoAbort};
@ -55,6 +56,7 @@ pub fn index<'pl, 'indexer, 'index, DC, MSP>(
embedders: EmbeddingConfigs,
must_stop_processing: &'indexer MSP,
progress: &'indexer Progress,
embedder_stats: Arc<EmbedderStats>,
) -> Result<ChannelCongestion>
where
DC: DocumentChanges<'pl>,
@ -158,6 +160,7 @@ where
index_embeddings,
document_ids,
modified_docids,
embedder_stats,
)
})
.unwrap()

View File

@ -475,7 +475,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
progress_callback: &FP,
should_abort: &FA,
settings_diff: InnerIndexSettingsDiff,
embedder_stats: Option<Arc<EmbedderStats>>,
embedder_stats: Arc<EmbedderStats>,
) -> Result<()>
where
FP: Fn(UpdateIndexingStep) + Sync,
@ -1358,7 +1358,7 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
}
}
pub fn execute<FP, FA>(mut self, progress_callback: FP, should_abort: FA, embedder_stats: Option<Arc<EmbedderStats>>) -> Result<()>
pub fn execute<FP, FA>(mut self, progress_callback: FP, should_abort: FA, embedder_stats: Arc<EmbedderStats>) -> Result<()>
where
FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync,

View File

@ -295,6 +295,10 @@ fn embed<S>(
where
S: Serialize,
{
use std::backtrace::Backtrace;
println!("Embedder stats? {}", embedder_stats.is_some());
let request = data.client.post(&data.url);
let request = if let Some(bearer) = &data.bearer {
request.set("Authorization", bearer)

View File

@ -19,7 +19,7 @@ macro_rules! test_distinct {
let config = milli::update::IndexerConfig::default();
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_distinct_field(S(stringify!($distinct)));
builder.execute(|_| (), || false, None).unwrap();
builder.execute(|_| (), || false, Default::default()).unwrap();
wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap();

View File

@ -25,7 +25,7 @@ fn test_facet_distribution_with_no_facet_values() {
FilterableAttributesRule::Field(S("genres")),
FilterableAttributesRule::Field(S("tags")),
]);
builder.execute(|_| (), || false, None).unwrap();
builder.execute(|_| (), || false, Default::default()).unwrap();
wtxn.commit().unwrap();
// index documents
@ -74,6 +74,7 @@ fn test_facet_distribution_with_no_facet_values() {
embedders,
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();

View File

@ -63,7 +63,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
S("america") => vec![S("the united states")],
});
builder.set_searchable_fields(vec![S("title"), S("description")]);
builder.execute(|_| (), || false, None).unwrap();
builder.execute(|_| (), || false, Default::default()).unwrap();
wtxn.commit().unwrap();
// index documents
@ -114,6 +114,7 @@ pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
embedders,
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();

View File

@ -10,7 +10,7 @@ fn set_stop_words(index: &Index, stop_words: &[&str]) {
let mut builder = Settings::new(&mut wtxn, index, &config);
let stop_words = stop_words.iter().map(|s| s.to_string()).collect();
builder.set_stop_words(stop_words);
builder.execute(|_| (), || false, None).unwrap();
builder.execute(|_| (), || false, Default::default()).unwrap();
wtxn.commit().unwrap();
}

View File

@ -236,7 +236,7 @@ fn criteria_mixup() {
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_criteria(criteria.clone());
builder.execute(|_| (), || false, None).unwrap();
builder.execute(|_| (), || false, Default::default()).unwrap();
wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap();
@ -276,7 +276,7 @@ fn criteria_ascdesc() {
S("name"),
S("age"),
});
builder.execute(|_| (), || false, None).unwrap();
builder.execute(|_| (), || false, Default::default()).unwrap();
wtxn.commit().unwrap();
let mut wtxn = index.write_txn().unwrap();
@ -344,6 +344,7 @@ fn criteria_ascdesc() {
embedders,
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -358,7 +359,7 @@ fn criteria_ascdesc() {
let mut wtxn = index.write_txn().unwrap();
let mut builder = Settings::new(&mut wtxn, &index, &config);
builder.set_criteria(vec![criterion.clone()]);
builder.execute(|_| (), || false, None).unwrap();
builder.execute(|_| (), || false, Default::default()).unwrap();
wtxn.commit().unwrap();
let rtxn = index.read_txn().unwrap();

View File

@ -46,7 +46,7 @@ fn test_typo_tolerance_one_typo() {
let config = IndexerConfig::default();
let mut builder = Settings::new(&mut txn, &index, &config);
builder.set_min_word_len_one_typo(4);
builder.execute(|_| (), || false, None).unwrap();
builder.execute(|_| (), || false, Default::default()).unwrap();
// typo is now supported for 4 letters words
let mut search = Search::new(&txn, &index);
@ -92,7 +92,7 @@ fn test_typo_tolerance_two_typo() {
let config = IndexerConfig::default();
let mut builder = Settings::new(&mut txn, &index, &config);
builder.set_min_word_len_two_typos(7);
builder.execute(|_| (), || false, None).unwrap();
builder.execute(|_| (), || false, Default::default()).unwrap();
// typo is now supported for 4 letters words
let mut search = Search::new(&txn, &index);
@ -153,6 +153,7 @@ fn test_typo_disabled_on_word() {
embedders,
&|| false,
&Progress::default(),
Default::default(),
)
.unwrap();
@ -180,7 +181,7 @@ fn test_typo_disabled_on_word() {
// `zealand` doesn't allow typos anymore
exact_words.insert("zealand".to_string());
builder.set_exact_words(exact_words);
builder.execute(|_| (), || false, None).unwrap();
builder.execute(|_| (), || false, Default::default()).unwrap();
let mut search = Search::new(&txn, &index);
search.query("zealand");
@ -218,7 +219,7 @@ fn test_disable_typo_on_attribute() {
let mut builder = Settings::new(&mut txn, &index, &config);
// disable typos on `description`
builder.set_exact_attributes(vec!["description".to_string()].into_iter().collect());
builder.execute(|_| (), || false, None).unwrap();
builder.execute(|_| (), || false, Default::default()).unwrap();
let mut search = Search::new(&txn, &index);
search.query("antebelum");