This commit is contained in:
Mubelotix 2025-06-24 12:20:22 +02:00
parent 4a179fb3c0
commit d7721fe607
No known key found for this signature in database
GPG key ID: 89F391DBCC8CE7F0
18 changed files with 124 additions and 63 deletions

View file

@ -1,7 +1,7 @@
use std::collections::BTreeSet; use std::collections::BTreeSet;
use std::fmt::Write; use std::fmt::Write;
use meilisearch_types::batches::{Batch, EmbedderStatsView, BatchEnqueuedAt, BatchStats}; use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchStats};
use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{Database, RoTxn}; use meilisearch_types::heed::{Database, RoTxn};
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32}; use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
@ -367,7 +367,10 @@ pub fn snapshot_batch(batch: &Batch) -> String {
snap.push_str(&format!("uid: {uid}, ")); snap.push_str(&format!("uid: {uid}, "));
snap.push_str(&format!("details: {}, ", serde_json::to_string(details).unwrap())); snap.push_str(&format!("details: {}, ", serde_json::to_string(details).unwrap()));
snap.push_str(&format!("stats: {}, ", serde_json::to_string(&stats).unwrap())); snap.push_str(&format!("stats: {}, ", serde_json::to_string(&stats).unwrap()));
snap.push_str(&format!("embedder_stats: {}, ", serde_json::to_string(&embedder_stats).unwrap())); snap.push_str(&format!(
"embedder_stats: {}, ",
serde_json::to_string(&embedder_stats).unwrap()
));
snap.push_str(&format!("stop reason: {}, ", serde_json::to_string(&stop_reason).unwrap())); snap.push_str(&format!("stop reason: {}, ", serde_json::to_string(&stop_reason).unwrap()));
snap.push('}'); snap.push('}');
snap snap

View file

@ -1,7 +1,7 @@
use std::collections::HashSet; use std::collections::HashSet;
use std::ops::{Bound, RangeBounds}; use std::ops::{Bound, RangeBounds};
use meilisearch_types::batches::{Batch, EmbedderStatsView, BatchId}; use meilisearch_types::batches::{Batch, BatchId};
use meilisearch_types::heed::types::{DecodeIgnore, SerdeBincode, SerdeJson, Str}; use meilisearch_types::heed::types::{DecodeIgnore, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls}; use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32}; use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};

View file

@ -1,11 +1,10 @@
use std::collections::{BTreeSet, HashMap, HashSet}; use std::collections::{BTreeSet, HashMap, HashSet};
use std::panic::{catch_unwind, AssertUnwindSafe}; use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::Arc;
use meilisearch_types::batches::{BatchEnqueuedAt, BatchId}; use meilisearch_types::batches::{BatchEnqueuedAt, BatchId};
use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::progress::{EmbedderStats, Progress, VariableNameStep}; use meilisearch_types::milli::progress::{Progress, VariableNameStep};
use meilisearch_types::milli::{self, ChannelCongestion}; use meilisearch_types::milli::{self, ChannelCongestion};
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task}; use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
@ -163,8 +162,13 @@ impl IndexScheduler {
.set_currently_updating_index(Some((index_uid.clone(), index.clone()))); .set_currently_updating_index(Some((index_uid.clone(), index.clone())));
let pre_commit_dabases_sizes = index.database_sizes(&index_wtxn)?; let pre_commit_dabases_sizes = index.database_sizes(&index_wtxn)?;
let (tasks, congestion) = let (tasks, congestion) = self.apply_index_operation(
self.apply_index_operation(&mut index_wtxn, &index, op, &progress, current_batch.embedder_stats.clone())?; &mut index_wtxn,
&index,
op,
&progress,
current_batch.embedder_stats.clone(),
)?;
{ {
progress.update_progress(FinalizingIndexStep::Committing); progress.update_progress(FinalizingIndexStep::Committing);

View file

@ -1,11 +1,11 @@
//! Utility functions on the DBs. Mainly getter and setters. //! Utility functions on the DBs. Mainly getter and setters.
use crate::milli::progress::EmbedderStats;
use std::collections::{BTreeSet, HashSet}; use std::collections::{BTreeSet, HashSet};
use std::ops::Bound; use std::ops::Bound;
use std::sync::Arc; use std::sync::Arc;
use crate::milli::progress::EmbedderStats;
use meilisearch_types::batches::{Batch, EmbedderStatsView, BatchEnqueuedAt, BatchId, BatchStats}; use meilisearch_types::batches::{Batch, BatchEnqueuedAt, BatchId, BatchStats};
use meilisearch_types::heed::{Database, RoTxn, RwTxn}; use meilisearch_types::heed::{Database, RoTxn, RwTxn};
use meilisearch_types::milli::CboRoaringBitmapCodec; use meilisearch_types::milli::CboRoaringBitmapCodec;
use meilisearch_types::task_view::DetailsView; use meilisearch_types::task_view::DetailsView;

View file

@ -3,7 +3,7 @@ use serde::Serialize;
use time::{Duration, OffsetDateTime}; use time::{Duration, OffsetDateTime};
use utoipa::ToSchema; use utoipa::ToSchema;
use crate::batches::{Batch, EmbedderStatsView, BatchId, BatchStats}; use crate::batches::{Batch, BatchId, BatchStats, EmbedderStatsView};
use crate::task_view::DetailsView; use crate::task_view::DetailsView;
use crate::tasks::serialize_duration; use crate::tasks::serialize_duration;

View file

@ -1,5 +1,4 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::sync::Arc;
use milli::progress::{EmbedderStats, ProgressView}; use milli::progress::{EmbedderStats, ProgressView};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};

View file

@ -544,8 +544,11 @@ fn import_dump(
let settings = index_reader.settings()?; let settings = index_reader.settings()?;
apply_settings_to_builder(&settings, &mut builder); apply_settings_to_builder(&settings, &mut builder);
let embedder_stats: Arc<EmbedderStats> = Default::default(); // FIXME: this isn't linked to anything let embedder_stats: Arc<EmbedderStats> = Default::default(); // FIXME: this isn't linked to anything
builder builder.execute(
.execute(|indexing_step| tracing::debug!("update: {:?}", indexing_step), || false, embedder_stats.clone())?; |indexing_step| tracing::debug!("update: {:?}", indexing_step),
|| false,
embedder_stats.clone(),
)?;
// 4.3 Import the documents. // 4.3 Import the documents.
// 4.3.1 We need to recreate the grenad+obkv format accepted by the index. // 4.3.1 We need to recreate the grenad+obkv format accepted by the index.

View file

@ -1,14 +1,12 @@
use std::collections::{BTreeMap, BTreeSet}; use std::collections::BTreeMap;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use meili_snap::{json_string, snapshot}; use meili_snap::{json_string, snapshot};
use reqwest::IntoUrl; use reqwest::IntoUrl;
use tokio::spawn; use std::time::Duration;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use wiremock::matchers::{method, path}; use wiremock::matchers::{method, path};
use wiremock::{Mock, MockServer, Request, ResponseTemplate}; use wiremock::{Mock, MockServer, Request, ResponseTemplate};
use tokio::time::sleep;
use std::time::Duration;
use crate::common::Value; use crate::common::Value;
use crate::json; use crate::json;
@ -359,14 +357,13 @@ async fn create_faulty_mock_raw(sender: mpsc::Sender<()>) -> (MockServer, Value)
if count >= 5 { if count >= 5 {
let _ = sender.try_send(()); let _ = sender.try_send(());
ResponseTemplate::new(500) ResponseTemplate::new(500).set_delay(Duration::from_secs(u64::MAX)).set_body_json(
.set_delay(Duration::from_secs(u64::MAX)) json!({
.set_body_json(json!({
"error": "Service Unavailable", "error": "Service Unavailable",
"text": req_body "text": req_body
})) }),
)
} else { } else {
ResponseTemplate::new(500).set_body_json(json!({ ResponseTemplate::new(500).set_body_json(json!({
"error": "Service Unavailable", "error": "Service Unavailable",
"text": req_body "text": req_body
@ -2168,7 +2165,6 @@ async fn searchable_reindex() {
"###); "###);
} }
#[actix_rt::test] #[actix_rt::test]
async fn last_error_stats() { async fn last_error_stats() {
let (sender, mut receiver) = mpsc::channel(10); let (sender, mut receiver) = mpsc::channel(10);
@ -2191,7 +2187,7 @@ async fn last_error_stats() {
{"id": 1, "name": "will_error"}, {"id": 1, "name": "will_error"},
{"id": 2, "name": "must_error"}, {"id": 2, "name": "must_error"},
]); ]);
let (value, code) = index.add_documents(documents, None).await; let (_value, code) = index.add_documents(documents, None).await;
snapshot!(code, @"202 Accepted"); snapshot!(code, @"202 Accepted");
// The task will eventually fail, so let's not wait for it. // The task will eventually fail, so let's not wait for it.

View file

@ -25,7 +25,7 @@ pub struct Progress {
#[derive(Default)] #[derive(Default)]
pub struct EmbedderStats { pub struct EmbedderStats {
pub errors: Arc<RwLock<(Option<String>, u32)>>, pub errors: Arc<RwLock<(Option<String>, u32)>>,
pub total_count: AtomicUsize pub total_count: AtomicUsize,
} }
impl std::fmt::Debug for EmbedderStats { impl std::fmt::Debug for EmbedderStats {

View file

@ -1,7 +1,7 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::OnceLock;
use std::sync::Arc; use std::sync::Arc;
use std::sync::OnceLock;
use bumpalo::Bump; use bumpalo::Bump;
use roaring::RoaringBitmap; use roaring::RoaringBitmap;

View file

@ -1,7 +1,7 @@
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::{Once, RwLock}; use std::sync::{Once, RwLock};
use std::thread::{self, Builder}; use std::thread::{self, Builder};
use std::sync::Arc;
use big_s::S; use big_s::S;
use document_changes::{DocumentChanges, IndexingContext}; use document_changes::{DocumentChanges, IndexingContext};

View file

@ -1358,7 +1358,12 @@ impl<'a, 't, 'i> Settings<'a, 't, 'i> {
} }
} }
pub fn execute<FP, FA>(mut self, progress_callback: FP, should_abort: FA, embedder_stats: Arc<EmbedderStats>) -> Result<()> pub fn execute<FP, FA>(
mut self,
progress_callback: FP,
should_abort: FA,
embedder_stats: Arc<EmbedderStats>,
) -> Result<()>
where where
FP: Fn(UpdateIndexingStep) + Sync, FP: Fn(UpdateIndexingStep) + Sync,
FA: Fn() -> bool + Sync, FA: Fn() -> bool + Sync,

View file

@ -173,12 +173,14 @@ impl SubEmbedder {
) -> std::result::Result<Embedding, EmbedError> { ) -> std::result::Result<Embedding, EmbedError> {
match self { match self {
SubEmbedder::HuggingFace(embedder) => embedder.embed_one(text), SubEmbedder::HuggingFace(embedder) => embedder.embed_one(text),
SubEmbedder::OpenAi(embedder) => { SubEmbedder::OpenAi(embedder) => embedder
embedder.embed(&[text], deadline, embedder_stats)?.pop().ok_or_else(EmbedError::missing_embedding) .embed(&[text], deadline, embedder_stats)?
} .pop()
SubEmbedder::Ollama(embedder) => { .ok_or_else(EmbedError::missing_embedding),
embedder.embed(&[text], deadline, embedder_stats)?.pop().ok_or_else(EmbedError::missing_embedding) SubEmbedder::Ollama(embedder) => embedder
} .embed(&[text], deadline, embedder_stats)?
.pop()
.ok_or_else(EmbedError::missing_embedding),
SubEmbedder::UserProvided(embedder) => embedder.embed_one(text), SubEmbedder::UserProvided(embedder) => embedder.embed_one(text),
SubEmbedder::Rest(embedder) => embedder SubEmbedder::Rest(embedder) => embedder
.embed_ref(&[text], deadline, embedder_stats)? .embed_ref(&[text], deadline, embedder_stats)?
@ -198,10 +200,16 @@ impl SubEmbedder {
) -> std::result::Result<Vec<Vec<Embedding>>, EmbedError> { ) -> std::result::Result<Vec<Vec<Embedding>>, EmbedError> {
match self { match self {
SubEmbedder::HuggingFace(embedder) => embedder.embed_index(text_chunks), SubEmbedder::HuggingFace(embedder) => embedder.embed_index(text_chunks),
SubEmbedder::OpenAi(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), SubEmbedder::OpenAi(embedder) => {
SubEmbedder::Ollama(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), embedder.embed_index(text_chunks, threads, embedder_stats)
}
SubEmbedder::Ollama(embedder) => {
embedder.embed_index(text_chunks, threads, embedder_stats)
}
SubEmbedder::UserProvided(embedder) => embedder.embed_index(text_chunks), SubEmbedder::UserProvided(embedder) => embedder.embed_index(text_chunks),
SubEmbedder::Rest(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), SubEmbedder::Rest(embedder) => {
embedder.embed_index(text_chunks, threads, embedder_stats)
}
} }
} }
@ -214,8 +222,12 @@ impl SubEmbedder {
) -> std::result::Result<Vec<Embedding>, EmbedError> { ) -> std::result::Result<Vec<Embedding>, EmbedError> {
match self { match self {
SubEmbedder::HuggingFace(embedder) => embedder.embed_index_ref(texts), SubEmbedder::HuggingFace(embedder) => embedder.embed_index_ref(texts),
SubEmbedder::OpenAi(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), SubEmbedder::OpenAi(embedder) => {
SubEmbedder::Ollama(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), embedder.embed_index_ref(texts, threads, embedder_stats)
}
SubEmbedder::Ollama(embedder) => {
embedder.embed_index_ref(texts, threads, embedder_stats)
}
SubEmbedder::UserProvided(embedder) => embedder.embed_index_ref(texts), SubEmbedder::UserProvided(embedder) => embedder.embed_index_ref(texts),
SubEmbedder::Rest(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), SubEmbedder::Rest(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats),
} }

View file

@ -719,12 +719,14 @@ impl Embedder {
} }
let embedding = match self { let embedding = match self {
Embedder::HuggingFace(embedder) => embedder.embed_one(text), Embedder::HuggingFace(embedder) => embedder.embed_one(text),
Embedder::OpenAi(embedder) => { Embedder::OpenAi(embedder) => embedder
embedder.embed(&[text], deadline, None)?.pop().ok_or_else(EmbedError::missing_embedding) .embed(&[text], deadline, None)?
} .pop()
Embedder::Ollama(embedder) => { .ok_or_else(EmbedError::missing_embedding),
embedder.embed(&[text], deadline, None)?.pop().ok_or_else(EmbedError::missing_embedding) Embedder::Ollama(embedder) => embedder
} .embed(&[text], deadline, None)?
.pop()
.ok_or_else(EmbedError::missing_embedding),
Embedder::UserProvided(embedder) => embedder.embed_one(text), Embedder::UserProvided(embedder) => embedder.embed_one(text),
Embedder::Rest(embedder) => embedder Embedder::Rest(embedder) => embedder
.embed_ref(&[text], deadline, None)? .embed_ref(&[text], deadline, None)?
@ -751,11 +753,17 @@ impl Embedder {
) -> std::result::Result<Vec<Vec<Embedding>>, EmbedError> { ) -> std::result::Result<Vec<Vec<Embedding>>, EmbedError> {
match self { match self {
Embedder::HuggingFace(embedder) => embedder.embed_index(text_chunks), Embedder::HuggingFace(embedder) => embedder.embed_index(text_chunks),
Embedder::OpenAi(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), Embedder::OpenAi(embedder) => {
Embedder::Ollama(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), embedder.embed_index(text_chunks, threads, embedder_stats)
}
Embedder::Ollama(embedder) => {
embedder.embed_index(text_chunks, threads, embedder_stats)
}
Embedder::UserProvided(embedder) => embedder.embed_index(text_chunks), Embedder::UserProvided(embedder) => embedder.embed_index(text_chunks),
Embedder::Rest(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats), Embedder::Rest(embedder) => embedder.embed_index(text_chunks, threads, embedder_stats),
Embedder::Composite(embedder) => embedder.index.embed_index(text_chunks, threads, embedder_stats), Embedder::Composite(embedder) => {
embedder.index.embed_index(text_chunks, threads, embedder_stats)
}
} }
} }
@ -772,7 +780,9 @@ impl Embedder {
Embedder::Ollama(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), Embedder::Ollama(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats),
Embedder::UserProvided(embedder) => embedder.embed_index_ref(texts), Embedder::UserProvided(embedder) => embedder.embed_index_ref(texts),
Embedder::Rest(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats), Embedder::Rest(embedder) => embedder.embed_index_ref(texts, threads, embedder_stats),
Embedder::Composite(embedder) => embedder.index.embed_index_ref(texts, threads, embedder_stats), Embedder::Composite(embedder) => {
embedder.index.embed_index_ref(texts, threads, embedder_stats)
}
} }
} }

View file

@ -106,7 +106,7 @@ impl Embedder {
&self, &self,
texts: &[S], texts: &[S],
deadline: Option<Instant>, deadline: Option<Instant>,
embedder_stats: Option<Arc<EmbedderStats>> embedder_stats: Option<Arc<EmbedderStats>>,
) -> Result<Vec<Embedding>, EmbedError> { ) -> Result<Vec<Embedding>, EmbedError> {
match self.rest_embedder.embed_ref(texts, deadline, embedder_stats) { match self.rest_embedder.embed_ref(texts, deadline, embedder_stats) {
Ok(embeddings) => Ok(embeddings), Ok(embeddings) => Ok(embeddings),
@ -126,11 +126,17 @@ impl Embedder {
// This condition helps reduce the number of active rayon jobs // This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows. // so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM { if threads.active_operations() >= REQUEST_PARALLELISM {
text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None, embedder_stats.clone())).collect() text_chunks
.into_iter()
.map(move |chunk| self.embed(&chunk, None, embedder_stats.clone()))
.collect()
} else { } else {
threads threads
.install(move || { .install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None, embedder_stats.clone())).collect() text_chunks
.into_par_iter()
.map(move |chunk| self.embed(&chunk, None, embedder_stats.clone()))
.collect()
}) })
.map_err(|error| EmbedError { .map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error), kind: EmbedErrorKind::PanicInThreadPool(error),
@ -143,7 +149,7 @@ impl Embedder {
&self, &self,
texts: &[&str], texts: &[&str],
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
embedder_stats: Option<Arc<EmbedderStats>> embedder_stats: Option<Arc<EmbedderStats>>,
) -> Result<Vec<Vec<f32>>, EmbedError> { ) -> Result<Vec<Vec<f32>>, EmbedError> {
// This condition helps reduce the number of active rayon jobs // This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows. // so that we avoid consuming all the LMDB rtxns and avoid stack overflows.

View file

@ -241,7 +241,11 @@ impl Embedder {
let encoded = self.tokenizer.encode_ordinary(text); let encoded = self.tokenizer.encode_ordinary(text);
let len = encoded.len(); let len = encoded.len();
if len < max_token_count { if len < max_token_count {
all_embeddings.append(&mut self.rest_embedder.embed_ref(&[text], deadline, None)?); all_embeddings.append(&mut self.rest_embedder.embed_ref(
&[text],
deadline,
None,
)?);
continue; continue;
} }
@ -263,11 +267,17 @@ impl Embedder {
// This condition helps reduce the number of active rayon jobs // This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows. // so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM { if threads.active_operations() >= REQUEST_PARALLELISM {
text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None, embedder_stats.clone())).collect() text_chunks
.into_iter()
.map(move |chunk| self.embed(&chunk, None, embedder_stats.clone()))
.collect()
} else { } else {
threads threads
.install(move || { .install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None, embedder_stats.clone())).collect() text_chunks
.into_par_iter()
.map(move |chunk| self.embed(&chunk, None, embedder_stats.clone()))
.collect()
}) })
.map_err(|error| EmbedError { .map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error), kind: EmbedErrorKind::PanicInThreadPool(error),

View file

@ -14,8 +14,8 @@ use super::{
DistributionShift, EmbedError, Embedding, EmbeddingCache, NewEmbedderError, REQUEST_PARALLELISM, DistributionShift, EmbedError, Embedding, EmbeddingCache, NewEmbedderError, REQUEST_PARALLELISM,
}; };
use crate::error::FaultSource; use crate::error::FaultSource;
use crate::ThreadPoolNoAbort;
use crate::progress::EmbedderStats; use crate::progress::EmbedderStats;
use crate::ThreadPoolNoAbort;
// retrying in case of failure // retrying in case of failure
pub struct Retry { pub struct Retry {
@ -172,7 +172,14 @@ impl Embedder {
deadline: Option<Instant>, deadline: Option<Instant>,
embedder_stats: Option<Arc<EmbedderStats>>, embedder_stats: Option<Arc<EmbedderStats>>,
) -> Result<Vec<Embedding>, EmbedError> { ) -> Result<Vec<Embedding>, EmbedError> {
embed(&self.data, texts.as_slice(), texts.len(), Some(self.dimensions), deadline, embedder_stats) embed(
&self.data,
texts.as_slice(),
texts.len(),
Some(self.dimensions),
deadline,
embedder_stats,
)
} }
pub fn embed_ref<S>( pub fn embed_ref<S>(
@ -206,11 +213,17 @@ impl Embedder {
// This condition helps reduce the number of active rayon jobs // This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows. // so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
if threads.active_operations() >= REQUEST_PARALLELISM { if threads.active_operations() >= REQUEST_PARALLELISM {
text_chunks.into_iter().map(move |chunk| self.embed(chunk, None, embedder_stats.clone())).collect() text_chunks
.into_iter()
.map(move |chunk| self.embed(chunk, None, embedder_stats.clone()))
.collect()
} else { } else {
threads threads
.install(move || { .install(move || {
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None, embedder_stats.clone())).collect() text_chunks
.into_par_iter()
.map(move |chunk| self.embed(chunk, None, embedder_stats.clone()))
.collect()
}) })
.map_err(|error| EmbedError { .map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error), kind: EmbedErrorKind::PanicInThreadPool(error),
@ -223,7 +236,7 @@ impl Embedder {
&self, &self,
texts: &[&str], texts: &[&str],
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
embedder_stats: Option<Arc<EmbedderStats>> embedder_stats: Option<Arc<EmbedderStats>>,
) -> Result<Vec<Embedding>, EmbedError> { ) -> Result<Vec<Embedding>, EmbedError> {
// This condition helps reduce the number of active rayon jobs // This condition helps reduce the number of active rayon jobs
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows. // so that we avoid consuming all the LMDB rtxns and avoid stack overflows.