From 426ea5aa97f152ef9a0294006a9207e14f40645d Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 28 Jan 2025 14:48:01 +0100 Subject: [PATCH 1/8] Accept the max readers param by env var and increase it --- .../index-scheduler/src/index_mapper/index_map.rs | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/crates/index-scheduler/src/index_mapper/index_map.rs b/crates/index-scheduler/src/index_mapper/index_map.rs index 480dafa7c..efc11d6a2 100644 --- a/crates/index-scheduler/src/index_mapper/index_map.rs +++ b/crates/index-scheduler/src/index_mapper/index_map.rs @@ -1,4 +1,5 @@ use std::collections::BTreeMap; +use std::env::VarError; use std::path::Path; use std::time::Duration; @@ -300,9 +301,19 @@ fn create_or_open_index( enable_mdb_writemap: bool, map_size: usize, ) -> Result { + use std::str::FromStr; + let mut options = EnvOpenOptions::new(); options.map_size(clamp_to_page_size(map_size)); - options.max_readers(1024); + + let max_readers = match std::env::var("MEILI_INDEX_MAX_READERS") { + Ok(value) => u32::from_str(&value).unwrap(), + Err(VarError::NotPresent) => 100 * 1024, + Err(VarError::NotUnicode(value)) => { + panic!("Invalid unicode for the `MEILI_INDEX_MAX_READERS` env var: {value:?}") + } + }; + options.max_readers(max_readers); if enable_mdb_writemap { unsafe { options.flags(EnvFlags::WRITE_MAP) }; } From 6a1062edf54a9ccaf4dfdb3501cf9a1e9c482db5 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 28 Jan 2025 16:20:33 +0100 Subject: [PATCH 2/8] Add more logs to see calls to the embedders --- crates/milli/src/vector/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/milli/src/vector/mod.rs b/crates/milli/src/vector/mod.rs index a1d71ef93..f7b48a305 100644 --- a/crates/milli/src/vector/mod.rs +++ b/crates/milli/src/vector/mod.rs @@ -637,6 +637,7 @@ impl Embedder { } } + #[tracing::instrument(level = "debug", skip_all, target = "indexing::vector")] pub fn embed_chunks_ref( &self, texts: &[&str], From b605549bf2b71f29989f7167b0f6d256acf0e4f8 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 28 Jan 2025 16:53:34 +0100 Subject: [PATCH 3/8] Do not create too many rayon tasks --- crates/milli/src/thread_pool_no_abort.rs | 17 +++++++++-- crates/milli/src/vector/ollama.rs | 38 +++++++++++++++--------- crates/milli/src/vector/openai.rs | 37 ++++++++++++++--------- crates/milli/src/vector/rest.rs | 36 ++++++++++++++-------- 4 files changed, 85 insertions(+), 43 deletions(-) diff --git a/crates/milli/src/thread_pool_no_abort.rs b/crates/milli/src/thread_pool_no_abort.rs index 14e5b0491..b57050a63 100644 --- a/crates/milli/src/thread_pool_no_abort.rs +++ b/crates/milli/src/thread_pool_no_abort.rs @@ -1,4 +1,4 @@ -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use rayon::{ThreadPool, ThreadPoolBuilder}; @@ -9,6 +9,8 @@ use thiserror::Error; #[derive(Debug)] pub struct ThreadPoolNoAbort { thread_pool: ThreadPool, + /// The number of active operations. + active_operations: AtomicUsize, /// Set to true if the thread pool catched a panic. pool_catched_panic: Arc, } @@ -19,7 +21,9 @@ impl ThreadPoolNoAbort { OP: FnOnce() -> R + Send, R: Send, { + self.active_operations.fetch_add(1, Ordering::Relaxed); let output = self.thread_pool.install(op); + self.active_operations.fetch_sub(1, Ordering::Relaxed); // While reseting the pool panic catcher we return an error if we catched one. if self.pool_catched_panic.swap(false, Ordering::SeqCst) { Err(PanicCatched) @@ -31,6 +35,11 @@ impl ThreadPoolNoAbort { pub fn current_num_threads(&self) -> usize { self.thread_pool.current_num_threads() } + + /// The number of active operations. + pub fn active_operations(&self) -> usize { + self.active_operations.load(Ordering::Relaxed) + } } #[derive(Error, Debug)] @@ -64,6 +73,10 @@ impl ThreadPoolNoAbortBuilder { let catched_panic = pool_catched_panic.clone(); move |_result| catched_panic.store(true, Ordering::SeqCst) }); - Ok(ThreadPoolNoAbort { thread_pool: self.0.build()?, pool_catched_panic }) + Ok(ThreadPoolNoAbort { + thread_pool: self.0.build()?, + active_operations: AtomicUsize::new(0), + pool_catched_panic, + }) } } diff --git a/crates/milli/src/vector/ollama.rs b/crates/milli/src/vector/ollama.rs index 7ee775cbf..a0698c5d0 100644 --- a/crates/milli/src/vector/ollama.rs +++ b/crates/milli/src/vector/ollama.rs @@ -5,7 +5,7 @@ use rayon::slice::ParallelSlice as _; use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErrorKind}; use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions}; -use super::DistributionShift; +use super::{DistributionShift, REQUEST_PARALLELISM}; use crate::error::FaultSource; use crate::vector::Embedding; use crate::ThreadPoolNoAbort; @@ -113,20 +113,30 @@ impl Embedder { texts: &[&str], threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { - threads - .install(move || { - let embeddings: Result>, _> = texts - .par_chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed(chunk, None)) - .collect(); + if threads.active_operations() >= REQUEST_PARALLELISM { + let embeddings: Result>, _> = texts + .chunks(self.prompt_count_in_chunk_hint()) + .map(move |chunk| self.embed(chunk, None)) + .collect(); - let embeddings = embeddings?; - Ok(embeddings.into_iter().flatten().collect()) - }) - .map_err(|error| EmbedError { - kind: EmbedErrorKind::PanicInThreadPool(error), - fault: FaultSource::Bug, - })? + let embeddings = embeddings?; + Ok(embeddings.into_iter().flatten().collect()) + } else { + threads + .install(move || { + let embeddings: Result>, _> = texts + .par_chunks(self.prompt_count_in_chunk_hint()) + .map(move |chunk| self.embed(chunk, None)) + .collect(); + + let embeddings = embeddings?; + Ok(embeddings.into_iter().flatten().collect()) + }) + .map_err(|error| EmbedError { + kind: EmbedErrorKind::PanicInThreadPool(error), + fault: FaultSource::Bug, + })? + } } pub fn chunk_count_hint(&self) -> usize { diff --git a/crates/milli/src/vector/openai.rs b/crates/milli/src/vector/openai.rs index 7262bfef8..b1af381b1 100644 --- a/crates/milli/src/vector/openai.rs +++ b/crates/milli/src/vector/openai.rs @@ -6,7 +6,7 @@ use rayon::slice::ParallelSlice as _; use super::error::{EmbedError, NewEmbedderError}; use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions}; -use super::DistributionShift; +use super::{DistributionShift, REQUEST_PARALLELISM}; use crate::error::FaultSource; use crate::vector::error::EmbedErrorKind; use crate::vector::Embedding; @@ -270,20 +270,29 @@ impl Embedder { texts: &[&str], threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { - threads - .install(move || { - let embeddings: Result>, _> = texts - .par_chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed(chunk, None)) - .collect(); + if threads.active_operations() >= REQUEST_PARALLELISM { + let embeddings: Result>, _> = texts + .chunks(self.prompt_count_in_chunk_hint()) + .map(move |chunk| self.embed(chunk, None)) + .collect(); + let embeddings = embeddings?; + Ok(embeddings.into_iter().flatten().collect()) + } else { + threads + .install(move || { + let embeddings: Result>, _> = texts + .par_chunks(self.prompt_count_in_chunk_hint()) + .map(move |chunk| self.embed(chunk, None)) + .collect(); - let embeddings = embeddings?; - Ok(embeddings.into_iter().flatten().collect()) - }) - .map_err(|error| EmbedError { - kind: EmbedErrorKind::PanicInThreadPool(error), - fault: FaultSource::Bug, - })? + let embeddings = embeddings?; + Ok(embeddings.into_iter().flatten().collect()) + }) + .map_err(|error| EmbedError { + kind: EmbedErrorKind::PanicInThreadPool(error), + fault: FaultSource::Bug, + })? + } } pub fn chunk_count_hint(&self) -> usize { diff --git a/crates/milli/src/vector/rest.rs b/crates/milli/src/vector/rest.rs index 98be311d4..736dc3b2f 100644 --- a/crates/milli/src/vector/rest.rs +++ b/crates/milli/src/vector/rest.rs @@ -203,20 +203,30 @@ impl Embedder { texts: &[&str], threads: &ThreadPoolNoAbort, ) -> Result, EmbedError> { - threads - .install(move || { - let embeddings: Result>, _> = texts - .par_chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed_ref(chunk, None)) - .collect(); + if threads.active_operations() >= REQUEST_PARALLELISM { + let embeddings: Result>, _> = texts + .chunks(self.prompt_count_in_chunk_hint()) + .map(move |chunk| self.embed_ref(chunk, None)) + .collect(); - let embeddings = embeddings?; - Ok(embeddings.into_iter().flatten().collect()) - }) - .map_err(|error| EmbedError { - kind: EmbedErrorKind::PanicInThreadPool(error), - fault: FaultSource::Bug, - })? + let embeddings = embeddings?; + Ok(embeddings.into_iter().flatten().collect()) + } else { + threads + .install(move || { + let embeddings: Result>, _> = texts + .par_chunks(self.prompt_count_in_chunk_hint()) + .map(move |chunk| self.embed_ref(chunk, None)) + .collect(); + + let embeddings = embeddings?; + Ok(embeddings.into_iter().flatten().collect()) + }) + .map_err(|error| EmbedError { + kind: EmbedErrorKind::PanicInThreadPool(error), + fault: FaultSource::Bug, + })? + } } pub fn chunk_count_hint(&self) -> usize { From 3bbad823e09c25ecb1758ed8a5c308cca13965cb Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 28 Jan 2025 17:40:50 +0100 Subject: [PATCH 4/8] Refine the env variable and the max readers --- .../index-scheduler/src/index_mapper/index_map.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/crates/index-scheduler/src/index_mapper/index_map.rs b/crates/index-scheduler/src/index_mapper/index_map.rs index efc11d6a2..931cff162 100644 --- a/crates/index-scheduler/src/index_mapper/index_map.rs +++ b/crates/index-scheduler/src/index_mapper/index_map.rs @@ -1,6 +1,7 @@ use std::collections::BTreeMap; use std::env::VarError; use std::path::Path; +use std::str::FromStr; use std::time::Duration; use meilisearch_types::heed::{EnvClosingEvent, EnvFlags, EnvOpenOptions}; @@ -301,17 +302,15 @@ fn create_or_open_index( enable_mdb_writemap: bool, map_size: usize, ) -> Result { - use std::str::FromStr; - let mut options = EnvOpenOptions::new(); options.map_size(clamp_to_page_size(map_size)); - let max_readers = match std::env::var("MEILI_INDEX_MAX_READERS") { + let max_readers = match std::env::var("MEILI_EXPERIMENTAL_INDEX_MAX_READERS") { Ok(value) => u32::from_str(&value).unwrap(), - Err(VarError::NotPresent) => 100 * 1024, - Err(VarError::NotUnicode(value)) => { - panic!("Invalid unicode for the `MEILI_INDEX_MAX_READERS` env var: {value:?}") - } + Err(VarError::NotPresent) => 1024, + Err(VarError::NotUnicode(value)) => panic!( + "Invalid unicode for the `MEILI_EXPERIMENTAL_INDEX_MAX_READERS` env var: {value:?}" + ), }; options.max_readers(max_readers); if enable_mdb_writemap { From e0f446e4d39631bd891e5d018c13901e99050f80 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 28 Jan 2025 17:41:37 +0100 Subject: [PATCH 5/8] Remove a log that would log too much --- crates/milli/src/vector/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/milli/src/vector/mod.rs b/crates/milli/src/vector/mod.rs index f7b48a305..a1d71ef93 100644 --- a/crates/milli/src/vector/mod.rs +++ b/crates/milli/src/vector/mod.rs @@ -637,7 +637,6 @@ impl Embedder { } } - #[tracing::instrument(level = "debug", skip_all, target = "indexing::vector")] pub fn embed_chunks_ref( &self, texts: &[&str], From 4b488b2bafc7d12cc538d7061e280cfcf524f089 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 29 Jan 2025 17:02:06 +0100 Subject: [PATCH 6/8] Do not create too many rayon tasks when processing the settings --- crates/milli/src/vector/ollama.rs | 20 ++++++++++++-------- crates/milli/src/vector/openai.rs | 20 ++++++++++++-------- crates/milli/src/vector/rest.rs | 20 ++++++++++++-------- 3 files changed, 36 insertions(+), 24 deletions(-) diff --git a/crates/milli/src/vector/ollama.rs b/crates/milli/src/vector/ollama.rs index a0698c5d0..82c9a021f 100644 --- a/crates/milli/src/vector/ollama.rs +++ b/crates/milli/src/vector/ollama.rs @@ -98,14 +98,18 @@ impl Embedder { text_chunks: Vec>, threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { - threads - .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect() - }) - .map_err(|error| EmbedError { - kind: EmbedErrorKind::PanicInThreadPool(error), - fault: FaultSource::Bug, - })? + if threads.active_operations() >= REQUEST_PARALLELISM { + text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect() + } else { + threads + .install(move || { + text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect() + }) + .map_err(|error| EmbedError { + kind: EmbedErrorKind::PanicInThreadPool(error), + fault: FaultSource::Bug, + })? + } } pub(crate) fn embed_chunks_ref( diff --git a/crates/milli/src/vector/openai.rs b/crates/milli/src/vector/openai.rs index b1af381b1..2c7f635a2 100644 --- a/crates/milli/src/vector/openai.rs +++ b/crates/milli/src/vector/openai.rs @@ -255,14 +255,18 @@ impl Embedder { text_chunks: Vec>, threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { - threads - .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect() - }) - .map_err(|error| EmbedError { - kind: EmbedErrorKind::PanicInThreadPool(error), - fault: FaultSource::Bug, - })? + if threads.active_operations() >= REQUEST_PARALLELISM { + text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect() + } else { + threads + .install(move || { + text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect() + }) + .map_err(|error| EmbedError { + kind: EmbedErrorKind::PanicInThreadPool(error), + fault: FaultSource::Bug, + })? + } } pub(crate) fn embed_chunks_ref( diff --git a/crates/milli/src/vector/rest.rs b/crates/milli/src/vector/rest.rs index 736dc3b2f..94c3633bf 100644 --- a/crates/milli/src/vector/rest.rs +++ b/crates/milli/src/vector/rest.rs @@ -188,14 +188,18 @@ impl Embedder { text_chunks: Vec>, threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { - threads - .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect() - }) - .map_err(|error| EmbedError { - kind: EmbedErrorKind::PanicInThreadPool(error), - fault: FaultSource::Bug, - })? + if threads.active_operations() >= REQUEST_PARALLELISM { + text_chunks.into_iter().map(move |chunk| self.embed(chunk, None)).collect() + } else { + threads + .install(move || { + text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect() + }) + .map_err(|error| EmbedError { + kind: EmbedErrorKind::PanicInThreadPool(error), + fault: FaultSource::Bug, + })? + } } pub(crate) fn embed_chunks_ref( From 24e0919d15b34505f6e100ca07dca1b4cae612b2 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 30 Jan 2025 11:29:41 +0100 Subject: [PATCH 7/8] Better document the rayon limitation condition --- crates/milli/src/vector/ollama.rs | 4 ++++ crates/milli/src/vector/openai.rs | 4 ++++ crates/milli/src/vector/rest.rs | 4 ++++ 3 files changed, 12 insertions(+) diff --git a/crates/milli/src/vector/ollama.rs b/crates/milli/src/vector/ollama.rs index 82c9a021f..863a6c39d 100644 --- a/crates/milli/src/vector/ollama.rs +++ b/crates/milli/src/vector/ollama.rs @@ -98,6 +98,8 @@ impl Embedder { text_chunks: Vec>, threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { + // This condition helps reduce the number of active rayon jobs + // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect() } else { @@ -117,6 +119,8 @@ impl Embedder { texts: &[&str], threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { + // This condition helps reduce the number of active rayon jobs + // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { let embeddings: Result>, _> = texts .chunks(self.prompt_count_in_chunk_hint()) diff --git a/crates/milli/src/vector/openai.rs b/crates/milli/src/vector/openai.rs index 2c7f635a2..681fab142 100644 --- a/crates/milli/src/vector/openai.rs +++ b/crates/milli/src/vector/openai.rs @@ -255,6 +255,8 @@ impl Embedder { text_chunks: Vec>, threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { + // This condition helps reduce the number of active rayon jobs + // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect() } else { @@ -274,6 +276,8 @@ impl Embedder { texts: &[&str], threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { + // This condition helps reduce the number of active rayon jobs + // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { let embeddings: Result>, _> = texts .chunks(self.prompt_count_in_chunk_hint()) diff --git a/crates/milli/src/vector/rest.rs b/crates/milli/src/vector/rest.rs index 94c3633bf..5c2d6fe7a 100644 --- a/crates/milli/src/vector/rest.rs +++ b/crates/milli/src/vector/rest.rs @@ -188,6 +188,8 @@ impl Embedder { text_chunks: Vec>, threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { + // This condition helps reduce the number of active rayon jobs + // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { text_chunks.into_iter().map(move |chunk| self.embed(chunk, None)).collect() } else { @@ -207,6 +209,8 @@ impl Embedder { texts: &[&str], threads: &ThreadPoolNoAbort, ) -> Result, EmbedError> { + // This condition helps reduce the number of active rayon jobs + // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { let embeddings: Result>, _> = texts .chunks(self.prompt_count_in_chunk_hint()) From 350093baa305c722ac6135413da976b663742477 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 30 Jan 2025 11:43:01 +0100 Subject: [PATCH 8/8] Add a link to the experimental feature GitHub discussion --- crates/index-scheduler/src/index_mapper/index_map.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/index-scheduler/src/index_mapper/index_map.rs b/crates/index-scheduler/src/index_mapper/index_map.rs index 931cff162..ad89ffbf9 100644 --- a/crates/index-scheduler/src/index_mapper/index_map.rs +++ b/crates/index-scheduler/src/index_mapper/index_map.rs @@ -305,6 +305,9 @@ fn create_or_open_index( let mut options = EnvOpenOptions::new(); options.map_size(clamp_to_page_size(map_size)); + // You can find more details about this experimental + // environment variable on the following GitHub discussion: + // let max_readers = match std::env::var("MEILI_EXPERIMENTAL_INDEX_MAX_READERS") { Ok(value) => u32::from_str(&value).unwrap(), Err(VarError::NotPresent) => 1024,