mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-03 03:47:02 +02:00
Remove lots of Arcs
This commit is contained in:
parent
ef007d547d
commit
29f6eeff8f
22 changed files with 112 additions and 118 deletions
|
@ -1,4 +1,3 @@
|
|||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use arroy::Distance;
|
||||
|
@ -154,7 +153,7 @@ impl SubEmbedder {
|
|||
&self,
|
||||
texts: Vec<String>,
|
||||
deadline: Option<Instant>,
|
||||
embedder_stats: Option<Arc<EmbedderStats>>,
|
||||
embedder_stats: Option<&EmbedderStats>,
|
||||
) -> std::result::Result<Vec<Embedding>, EmbedError> {
|
||||
match self {
|
||||
SubEmbedder::HuggingFace(embedder) => embedder.embed(texts),
|
||||
|
@ -169,7 +168,7 @@ impl SubEmbedder {
|
|||
&self,
|
||||
text: &str,
|
||||
deadline: Option<Instant>,
|
||||
embedder_stats: Option<Arc<EmbedderStats>>,
|
||||
embedder_stats: Option<&EmbedderStats>,
|
||||
) -> std::result::Result<Embedding, EmbedError> {
|
||||
match self {
|
||||
SubEmbedder::HuggingFace(embedder) => embedder.embed_one(text),
|
||||
|
@ -196,7 +195,7 @@ impl SubEmbedder {
|
|||
&self,
|
||||
text_chunks: Vec<Vec<String>>,
|
||||
threads: &ThreadPoolNoAbort,
|
||||
embedder_stats: Arc<EmbedderStats>,
|
||||
embedder_stats: &EmbedderStats,
|
||||
) -> std::result::Result<Vec<Vec<Embedding>>, EmbedError> {
|
||||
match self {
|
||||
SubEmbedder::HuggingFace(embedder) => embedder.embed_index(text_chunks),
|
||||
|
@ -218,7 +217,7 @@ impl SubEmbedder {
|
|||
&self,
|
||||
texts: &[&str],
|
||||
threads: &ThreadPoolNoAbort,
|
||||
embedder_stats: Arc<EmbedderStats>,
|
||||
embedder_stats: &EmbedderStats,
|
||||
) -> std::result::Result<Vec<Embedding>, EmbedError> {
|
||||
match self {
|
||||
SubEmbedder::HuggingFace(embedder) => embedder.embed_index_ref(texts),
|
||||
|
|
|
@ -749,7 +749,7 @@ impl Embedder {
|
|||
&self,
|
||||
text_chunks: Vec<Vec<String>>,
|
||||
threads: &ThreadPoolNoAbort,
|
||||
embedder_stats: Arc<EmbedderStats>,
|
||||
embedder_stats: &EmbedderStats,
|
||||
) -> std::result::Result<Vec<Vec<Embedding>>, EmbedError> {
|
||||
match self {
|
||||
Embedder::HuggingFace(embedder) => embedder.embed_index(text_chunks),
|
||||
|
@ -772,7 +772,7 @@ impl Embedder {
|
|||
&self,
|
||||
texts: &[&str],
|
||||
threads: &ThreadPoolNoAbort,
|
||||
embedder_stats: Arc<EmbedderStats>,
|
||||
embedder_stats: &EmbedderStats,
|
||||
) -> std::result::Result<Vec<Embedding>, EmbedError> {
|
||||
match self {
|
||||
Embedder::HuggingFace(embedder) => embedder.embed_index_ref(texts),
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use rayon::iter::{IntoParallelIterator as _, ParallelIterator as _};
|
||||
|
@ -106,7 +105,7 @@ impl Embedder {
|
|||
&self,
|
||||
texts: &[S],
|
||||
deadline: Option<Instant>,
|
||||
embedder_stats: Option<Arc<EmbedderStats>>,
|
||||
embedder_stats: Option<&EmbedderStats>,
|
||||
) -> Result<Vec<Embedding>, EmbedError> {
|
||||
match self.rest_embedder.embed_ref(texts, deadline, embedder_stats) {
|
||||
Ok(embeddings) => Ok(embeddings),
|
||||
|
@ -121,21 +120,21 @@ impl Embedder {
|
|||
&self,
|
||||
text_chunks: Vec<Vec<String>>,
|
||||
threads: &ThreadPoolNoAbort,
|
||||
embedder_stats: Arc<EmbedderStats>,
|
||||
embedder_stats: &EmbedderStats,
|
||||
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
|
||||
// This condition helps reduce the number of active rayon jobs
|
||||
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
|
||||
if threads.active_operations() >= REQUEST_PARALLELISM {
|
||||
text_chunks
|
||||
.into_iter()
|
||||
.map(move |chunk| self.embed(&chunk, None, Some(embedder_stats.clone())))
|
||||
.map(move |chunk| self.embed(&chunk, None, Some(embedder_stats)))
|
||||
.collect()
|
||||
} else {
|
||||
threads
|
||||
.install(move || {
|
||||
text_chunks
|
||||
.into_par_iter()
|
||||
.map(move |chunk| self.embed(&chunk, None, Some(embedder_stats.clone())))
|
||||
.map(move |chunk| self.embed(&chunk, None, Some(embedder_stats)))
|
||||
.collect()
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
|
@ -149,14 +148,14 @@ impl Embedder {
|
|||
&self,
|
||||
texts: &[&str],
|
||||
threads: &ThreadPoolNoAbort,
|
||||
embedder_stats: Arc<EmbedderStats>,
|
||||
embedder_stats: &EmbedderStats,
|
||||
) -> Result<Vec<Vec<f32>>, EmbedError> {
|
||||
// This condition helps reduce the number of active rayon jobs
|
||||
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
|
||||
if threads.active_operations() >= REQUEST_PARALLELISM {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed(chunk, None, Some(embedder_stats.clone())))
|
||||
.map(move |chunk| self.embed(chunk, None, Some(embedder_stats)))
|
||||
.collect();
|
||||
|
||||
let embeddings = embeddings?;
|
||||
|
@ -166,7 +165,7 @@ impl Embedder {
|
|||
.install(move || {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed(chunk, None, Some(embedder_stats.clone())))
|
||||
.map(move |chunk| self.embed(chunk, None, Some(embedder_stats)))
|
||||
.collect();
|
||||
|
||||
let embeddings = embeddings?;
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
use std::fmt;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use ordered_float::OrderedFloat;
|
||||
|
@ -217,7 +216,7 @@ impl Embedder {
|
|||
&self,
|
||||
texts: &[S],
|
||||
deadline: Option<Instant>,
|
||||
embedder_stats: Option<Arc<EmbedderStats>>,
|
||||
embedder_stats: Option<&EmbedderStats>,
|
||||
) -> Result<Vec<Embedding>, EmbedError> {
|
||||
match self.rest_embedder.embed_ref(texts, deadline, embedder_stats) {
|
||||
Ok(embeddings) => Ok(embeddings),
|
||||
|
@ -262,21 +261,21 @@ impl Embedder {
|
|||
&self,
|
||||
text_chunks: Vec<Vec<String>>,
|
||||
threads: &ThreadPoolNoAbort,
|
||||
embedder_stats: Arc<EmbedderStats>,
|
||||
embedder_stats: &EmbedderStats,
|
||||
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
|
||||
// This condition helps reduce the number of active rayon jobs
|
||||
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
|
||||
if threads.active_operations() >= REQUEST_PARALLELISM {
|
||||
text_chunks
|
||||
.into_iter()
|
||||
.map(move |chunk| self.embed(&chunk, None, Some(embedder_stats.clone())))
|
||||
.map(move |chunk| self.embed(&chunk, None, Some(embedder_stats)))
|
||||
.collect()
|
||||
} else {
|
||||
threads
|
||||
.install(move || {
|
||||
text_chunks
|
||||
.into_par_iter()
|
||||
.map(move |chunk| self.embed(&chunk, None, Some(embedder_stats.clone())))
|
||||
.map(move |chunk| self.embed(&chunk, None, Some(embedder_stats)))
|
||||
.collect()
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
|
@ -290,14 +289,14 @@ impl Embedder {
|
|||
&self,
|
||||
texts: &[&str],
|
||||
threads: &ThreadPoolNoAbort,
|
||||
embedder_stats: Arc<EmbedderStats>,
|
||||
embedder_stats: &EmbedderStats,
|
||||
) -> Result<Vec<Vec<f32>>, EmbedError> {
|
||||
// This condition helps reduce the number of active rayon jobs
|
||||
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
|
||||
if threads.active_operations() >= REQUEST_PARALLELISM {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed(chunk, None, Some(embedder_stats.clone())))
|
||||
.map(move |chunk| self.embed(chunk, None, Some(embedder_stats)))
|
||||
.collect();
|
||||
let embeddings = embeddings?;
|
||||
Ok(embeddings.into_iter().flatten().collect())
|
||||
|
@ -306,7 +305,7 @@ impl Embedder {
|
|||
.install(move || {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed(chunk, None, Some(embedder_stats.clone())))
|
||||
.map(move |chunk| self.embed(chunk, None, Some(embedder_stats)))
|
||||
.collect();
|
||||
|
||||
let embeddings = embeddings?;
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use deserr::Deserr;
|
||||
|
@ -170,7 +169,7 @@ impl Embedder {
|
|||
&self,
|
||||
texts: Vec<String>,
|
||||
deadline: Option<Instant>,
|
||||
embedder_stats: Option<Arc<EmbedderStats>>,
|
||||
embedder_stats: Option<&EmbedderStats>,
|
||||
) -> Result<Vec<Embedding>, EmbedError> {
|
||||
embed(
|
||||
&self.data,
|
||||
|
@ -186,7 +185,7 @@ impl Embedder {
|
|||
&self,
|
||||
texts: &[S],
|
||||
deadline: Option<Instant>,
|
||||
embedder_stats: Option<Arc<EmbedderStats>>,
|
||||
embedder_stats: Option<&EmbedderStats>,
|
||||
) -> Result<Vec<Embedding>, EmbedError>
|
||||
where
|
||||
S: AsRef<str> + Serialize,
|
||||
|
@ -208,21 +207,21 @@ impl Embedder {
|
|||
&self,
|
||||
text_chunks: Vec<Vec<String>>,
|
||||
threads: &ThreadPoolNoAbort,
|
||||
embedder_stats: Arc<EmbedderStats>,
|
||||
embedder_stats: &EmbedderStats,
|
||||
) -> Result<Vec<Vec<Embedding>>, EmbedError> {
|
||||
// This condition helps reduce the number of active rayon jobs
|
||||
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
|
||||
if threads.active_operations() >= REQUEST_PARALLELISM {
|
||||
text_chunks
|
||||
.into_iter()
|
||||
.map(move |chunk| self.embed(chunk, None, Some(embedder_stats.clone())))
|
||||
.map(move |chunk| self.embed(chunk, None, Some(embedder_stats)))
|
||||
.collect()
|
||||
} else {
|
||||
threads
|
||||
.install(move || {
|
||||
text_chunks
|
||||
.into_par_iter()
|
||||
.map(move |chunk| self.embed(chunk, None, Some(embedder_stats.clone())))
|
||||
.map(move |chunk| self.embed(chunk, None, Some(embedder_stats)))
|
||||
.collect()
|
||||
})
|
||||
.map_err(|error| EmbedError {
|
||||
|
@ -236,14 +235,14 @@ impl Embedder {
|
|||
&self,
|
||||
texts: &[&str],
|
||||
threads: &ThreadPoolNoAbort,
|
||||
embedder_stats: Arc<EmbedderStats>,
|
||||
embedder_stats: &EmbedderStats,
|
||||
) -> Result<Vec<Embedding>, EmbedError> {
|
||||
// This condition helps reduce the number of active rayon jobs
|
||||
// so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
|
||||
if threads.active_operations() >= REQUEST_PARALLELISM {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed_ref(chunk, None, Some(embedder_stats.clone())))
|
||||
.map(move |chunk| self.embed_ref(chunk, None, Some(embedder_stats)))
|
||||
.collect();
|
||||
|
||||
let embeddings = embeddings?;
|
||||
|
@ -253,7 +252,7 @@ impl Embedder {
|
|||
.install(move || {
|
||||
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
|
||||
.par_chunks(self.prompt_count_in_chunk_hint())
|
||||
.map(move |chunk| self.embed_ref(chunk, None, Some(embedder_stats.clone())))
|
||||
.map(move |chunk| self.embed_ref(chunk, None, Some(embedder_stats)))
|
||||
.collect();
|
||||
|
||||
let embeddings = embeddings?;
|
||||
|
@ -303,7 +302,7 @@ fn embed<S>(
|
|||
expected_count: usize,
|
||||
expected_dimension: Option<usize>,
|
||||
deadline: Option<Instant>,
|
||||
embedder_stats: Option<Arc<EmbedderStats>>,
|
||||
embedder_stats: Option<&EmbedderStats>,
|
||||
) -> Result<Vec<Embedding>, EmbedError>
|
||||
where
|
||||
S: Serialize,
|
||||
|
@ -323,7 +322,7 @@ where
|
|||
|
||||
for attempt in 0..10 {
|
||||
if let Some(embedder_stats) = &embedder_stats {
|
||||
embedder_stats.as_ref().total_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
embedder_stats.total_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
let response = request.clone().send_json(&body);
|
||||
let result = check_response(response, data.configuration_source).and_then(|response| {
|
||||
|
@ -367,7 +366,7 @@ where
|
|||
}
|
||||
|
||||
if let Some(embedder_stats) = &embedder_stats {
|
||||
embedder_stats.as_ref().total_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
embedder_stats.total_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||
}
|
||||
let response = request.send_json(&body);
|
||||
let result = check_response(response, data.configuration_source).and_then(|response| {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue