5303: Bring back changes from v1.12.8 into v1.13.0 r=Kerollmops a=Kerollmops

Fixes #5087 and other problems that you can find in the original PR #5294.

Co-authored-by: Kerollmops <clement@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2025-02-03 10:49:26 +00:00 committed by GitHub
commit 6425451bbc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 147 additions and 68 deletions

View File

@ -1,5 +1,7 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::env::VarError;
use std::path::Path; use std::path::Path;
use std::str::FromStr;
use std::time::Duration; use std::time::Duration;
use meilisearch_types::heed::{EnvClosingEvent, EnvFlags, EnvOpenOptions}; use meilisearch_types::heed::{EnvClosingEvent, EnvFlags, EnvOpenOptions};
@ -304,7 +306,18 @@ fn create_or_open_index(
) -> Result<Index> { ) -> Result<Index> {
let mut options = EnvOpenOptions::new(); let mut options = EnvOpenOptions::new();
options.map_size(clamp_to_page_size(map_size)); options.map_size(clamp_to_page_size(map_size));
options.max_readers(1024);
// You can find more details about this experimental
// environment variable on the following GitHub discussion:
// <https://github.com/orgs/meilisearch/discussions/806>
let max_readers = match std::env::var("MEILI_EXPERIMENTAL_INDEX_MAX_READERS") {
Ok(value) => u32::from_str(&value).unwrap(),
Err(VarError::NotPresent) => 1024,
Err(VarError::NotUnicode(value)) => panic!(
"Invalid unicode for the `MEILI_EXPERIMENTAL_INDEX_MAX_READERS` env var: {value:?}"
),
};
options.max_readers(max_readers);
if enable_mdb_writemap { if enable_mdb_writemap {
unsafe { options.flags(EnvFlags::WRITE_MAP) }; unsafe { options.flags(EnvFlags::WRITE_MAP) };
} }

View File

@ -1,4 +1,4 @@
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use rayon::{ThreadPool, ThreadPoolBuilder}; use rayon::{ThreadPool, ThreadPoolBuilder};
@ -9,6 +9,8 @@ use thiserror::Error;
#[derive(Debug)] #[derive(Debug)]
pub struct ThreadPoolNoAbort { pub struct ThreadPoolNoAbort {
thread_pool: ThreadPool, thread_pool: ThreadPool,
/// The number of active operations.
active_operations: AtomicUsize,
/// Set to true if the thread pool catched a panic. /// Set to true if the thread pool catched a panic.
pool_catched_panic: Arc<AtomicBool>, pool_catched_panic: Arc<AtomicBool>,
} }
@ -19,7 +21,9 @@ impl ThreadPoolNoAbort {
OP: FnOnce() -> R + Send, OP: FnOnce() -> R + Send,
R: Send, R: Send,
{ {
self.active_operations.fetch_add(1, Ordering::Relaxed);
let output = self.thread_pool.install(op); let output = self.thread_pool.install(op);
self.active_operations.fetch_sub(1, Ordering::Relaxed);
// While reseting the pool panic catcher we return an error if we catched one. // While reseting the pool panic catcher we return an error if we catched one.
if self.pool_catched_panic.swap(false, Ordering::SeqCst) { if self.pool_catched_panic.swap(false, Ordering::SeqCst) {
Err(PanicCatched) Err(PanicCatched)
@ -31,6 +35,11 @@ impl ThreadPoolNoAbort {
pub fn current_num_threads(&self) -> usize { pub fn current_num_threads(&self) -> usize {
self.thread_pool.current_num_threads() self.thread_pool.current_num_threads()
} }
/// The number of active operations.
pub fn active_operations(&self) -> usize {
self.active_operations.load(Ordering::Relaxed)
}
} }
#[derive(Error, Debug)] #[derive(Error, Debug)]
@ -64,6 +73,10 @@ impl ThreadPoolNoAbortBuilder {
let catched_panic = pool_catched_panic.clone(); let catched_panic = pool_catched_panic.clone();
move |_result| catched_panic.store(true, Ordering::SeqCst) move |_result| catched_panic.store(true, Ordering::SeqCst)
}); });
Ok(ThreadPoolNoAbort { thread_pool: self.0.build()?, pool_catched_panic }) Ok(ThreadPoolNoAbort {
thread_pool: self.0.build()?,
active_operations: AtomicUsize::new(0),
pool_catched_panic,
})
} }
} }

View File

@ -5,7 +5,7 @@ use rayon::slice::ParallelSlice as _;
use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErrorKind}; use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErrorKind};
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions}; use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
use super::DistributionShift; use super::{DistributionShift, REQUEST_PARALLELISM};
use crate::error::FaultSource; use crate::error::FaultSource;
use crate::vector::Embedding; use crate::vector::Embedding;
use crate::ThreadPoolNoAbort; use crate::ThreadPoolNoAbort;
@ -118,14 +118,20 @@ impl Embedder {
text_chunks: Vec<Vec<String>>, text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<Embedding>>, EmbedError> { ) -> Result<Vec<Vec<Embedding>>, EmbedError> {
threads // This condition helps reduce the number of active rayon jobs
.install(move || { // so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect() if threads.active_operations() >= REQUEST_PARALLELISM {
}) text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect()
.map_err(|error| EmbedError { } else {
kind: EmbedErrorKind::PanicInThreadPool(error), threads
fault: FaultSource::Bug, .install(move || {
})? text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
} }
pub(crate) fn embed_chunks_ref( pub(crate) fn embed_chunks_ref(
@ -133,20 +139,32 @@ impl Embedder {
texts: &[&str], texts: &[&str],
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<f32>>, EmbedError> { ) -> Result<Vec<Vec<f32>>, EmbedError> {
threads // This condition helps reduce the number of active rayon jobs
.install(move || { // so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts if threads.active_operations() >= REQUEST_PARALLELISM {
.par_chunks(self.prompt_count_in_chunk_hint()) let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.map(move |chunk| self.embed(chunk, None)) .chunks(self.prompt_count_in_chunk_hint())
.collect(); .map(move |chunk| self.embed(chunk, None))
.collect();
let embeddings = embeddings?; let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect()) Ok(embeddings.into_iter().flatten().collect())
}) } else {
.map_err(|error| EmbedError { threads
kind: EmbedErrorKind::PanicInThreadPool(error), .install(move || {
fault: FaultSource::Bug, let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
})? .par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk, None))
.collect();
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
} }
pub fn chunk_count_hint(&self) -> usize { pub fn chunk_count_hint(&self) -> usize {

View File

@ -7,7 +7,7 @@ use rayon::slice::ParallelSlice as _;
use super::error::{EmbedError, NewEmbedderError}; use super::error::{EmbedError, NewEmbedderError};
use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions}; use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions};
use super::DistributionShift; use super::{DistributionShift, REQUEST_PARALLELISM};
use crate::error::FaultSource; use crate::error::FaultSource;
use crate::vector::error::EmbedErrorKind; use crate::vector::error::EmbedErrorKind;
use crate::vector::Embedding; use crate::vector::Embedding;
@ -255,14 +255,20 @@ impl Embedder {
text_chunks: Vec<Vec<String>>, text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<Embedding>>, EmbedError> { ) -> Result<Vec<Vec<Embedding>>, EmbedError> {
threads // This condition helps reduce the number of active rayon jobs
.install(move || { // so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect() if threads.active_operations() >= REQUEST_PARALLELISM {
}) text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect()
.map_err(|error| EmbedError { } else {
kind: EmbedErrorKind::PanicInThreadPool(error), threads
fault: FaultSource::Bug, .install(move || {
})? text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
} }
pub(crate) fn embed_chunks_ref( pub(crate) fn embed_chunks_ref(
@ -270,20 +276,31 @@ impl Embedder {
texts: &[&str], texts: &[&str],
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<f32>>, EmbedError> { ) -> Result<Vec<Vec<f32>>, EmbedError> {
threads // This condition helps reduce the number of active rayon jobs
.install(move || { // so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts if threads.active_operations() >= REQUEST_PARALLELISM {
.par_chunks(self.prompt_count_in_chunk_hint()) let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.map(move |chunk| self.embed(chunk, None)) .chunks(self.prompt_count_in_chunk_hint())
.collect(); .map(move |chunk| self.embed(chunk, None))
.collect();
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
} else {
threads
.install(move || {
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed(chunk, None))
.collect();
let embeddings = embeddings?; let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect()) Ok(embeddings.into_iter().flatten().collect())
}) })
.map_err(|error| EmbedError { .map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error), kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug, fault: FaultSource::Bug,
})? })?
}
} }
pub fn chunk_count_hint(&self) -> usize { pub fn chunk_count_hint(&self) -> usize {

View File

@ -188,14 +188,20 @@ impl Embedder {
text_chunks: Vec<Vec<String>>, text_chunks: Vec<Vec<String>>,
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
) -> Result<Vec<Vec<Embedding>>, EmbedError> { ) -> Result<Vec<Vec<Embedding>>, EmbedError> {
threads // This condition helps reduce the number of active rayon jobs
.install(move || { // so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect() if threads.active_operations() >= REQUEST_PARALLELISM {
}) text_chunks.into_iter().map(move |chunk| self.embed(chunk, None)).collect()
.map_err(|error| EmbedError { } else {
kind: EmbedErrorKind::PanicInThreadPool(error), threads
fault: FaultSource::Bug, .install(move || {
})? text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect()
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
} }
pub(crate) fn embed_chunks_ref( pub(crate) fn embed_chunks_ref(
@ -203,20 +209,32 @@ impl Embedder {
texts: &[&str], texts: &[&str],
threads: &ThreadPoolNoAbort, threads: &ThreadPoolNoAbort,
) -> Result<Vec<Embedding>, EmbedError> { ) -> Result<Vec<Embedding>, EmbedError> {
threads // This condition helps reduce the number of active rayon jobs
.install(move || { // so that we avoid consuming all the LMDB rtxns and avoid stack overflows.
let embeddings: Result<Vec<Vec<Embedding>>, _> = texts if threads.active_operations() >= REQUEST_PARALLELISM {
.par_chunks(self.prompt_count_in_chunk_hint()) let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
.map(move |chunk| self.embed_ref(chunk, None)) .chunks(self.prompt_count_in_chunk_hint())
.collect(); .map(move |chunk| self.embed_ref(chunk, None))
.collect();
let embeddings = embeddings?; let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect()) Ok(embeddings.into_iter().flatten().collect())
}) } else {
.map_err(|error| EmbedError { threads
kind: EmbedErrorKind::PanicInThreadPool(error), .install(move || {
fault: FaultSource::Bug, let embeddings: Result<Vec<Vec<Embedding>>, _> = texts
})? .par_chunks(self.prompt_count_in_chunk_hint())
.map(move |chunk| self.embed_ref(chunk, None))
.collect();
let embeddings = embeddings?;
Ok(embeddings.into_iter().flatten().collect())
})
.map_err(|error| EmbedError {
kind: EmbedErrorKind::PanicInThreadPool(error),
fault: FaultSource::Bug,
})?
}
} }
pub fn chunk_count_hint(&self) -> usize { pub fn chunk_count_hint(&self) -> usize {