From 97e17f52a1d1a0b66ae0f304b3f5214ff5d9acc6 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 28 Jan 2025 16:20:33 +0100 Subject: [PATCH 1/7] Add more logs to see calls to the embedders --- crates/milli/src/vector/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/milli/src/vector/mod.rs b/crates/milli/src/vector/mod.rs index 0be698027..92611b400 100644 --- a/crates/milli/src/vector/mod.rs +++ b/crates/milli/src/vector/mod.rs @@ -638,6 +638,7 @@ impl Embedder { } } + #[tracing::instrument(level = "debug", skip_all, target = "indexing::vector")] pub fn embed_chunks_ref( &self, texts: &[&str], From aaefbfae1f445d50d9eb856ae09283c34fe7109d Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 28 Jan 2025 16:53:34 +0100 Subject: [PATCH 2/7] Do not create too many rayon tasks --- crates/milli/src/thread_pool_no_abort.rs | 17 +++++++++-- crates/milli/src/vector/ollama.rs | 38 +++++++++++++++--------- crates/milli/src/vector/openai.rs | 37 ++++++++++++++--------- crates/milli/src/vector/rest.rs | 36 ++++++++++++++-------- 4 files changed, 85 insertions(+), 43 deletions(-) diff --git a/crates/milli/src/thread_pool_no_abort.rs b/crates/milli/src/thread_pool_no_abort.rs index 14e5b0491..b57050a63 100644 --- a/crates/milli/src/thread_pool_no_abort.rs +++ b/crates/milli/src/thread_pool_no_abort.rs @@ -1,4 +1,4 @@ -use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use rayon::{ThreadPool, ThreadPoolBuilder}; @@ -9,6 +9,8 @@ use thiserror::Error; #[derive(Debug)] pub struct ThreadPoolNoAbort { thread_pool: ThreadPool, + /// The number of active operations. + active_operations: AtomicUsize, /// Set to true if the thread pool catched a panic. pool_catched_panic: Arc, } @@ -19,7 +21,9 @@ impl ThreadPoolNoAbort { OP: FnOnce() -> R + Send, R: Send, { + self.active_operations.fetch_add(1, Ordering::Relaxed); let output = self.thread_pool.install(op); + self.active_operations.fetch_sub(1, Ordering::Relaxed); // While reseting the pool panic catcher we return an error if we catched one. if self.pool_catched_panic.swap(false, Ordering::SeqCst) { Err(PanicCatched) @@ -31,6 +35,11 @@ impl ThreadPoolNoAbort { pub fn current_num_threads(&self) -> usize { self.thread_pool.current_num_threads() } + + /// The number of active operations. + pub fn active_operations(&self) -> usize { + self.active_operations.load(Ordering::Relaxed) + } } #[derive(Error, Debug)] @@ -64,6 +73,10 @@ impl ThreadPoolNoAbortBuilder { let catched_panic = pool_catched_panic.clone(); move |_result| catched_panic.store(true, Ordering::SeqCst) }); - Ok(ThreadPoolNoAbort { thread_pool: self.0.build()?, pool_catched_panic }) + Ok(ThreadPoolNoAbort { + thread_pool: self.0.build()?, + active_operations: AtomicUsize::new(0), + pool_catched_panic, + }) } } diff --git a/crates/milli/src/vector/ollama.rs b/crates/milli/src/vector/ollama.rs index cc70e2c47..2276bbd3e 100644 --- a/crates/milli/src/vector/ollama.rs +++ b/crates/milli/src/vector/ollama.rs @@ -5,7 +5,7 @@ use rayon::slice::ParallelSlice as _; use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErrorKind}; use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions}; -use super::DistributionShift; +use super::{DistributionShift, REQUEST_PARALLELISM}; use crate::error::FaultSource; use crate::vector::Embedding; use crate::ThreadPoolNoAbort; @@ -133,20 +133,30 @@ impl Embedder { texts: &[&str], threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { - threads - .install(move || { - let embeddings: Result>, _> = texts - .par_chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed(chunk, None)) - .collect(); + if threads.active_operations() >= REQUEST_PARALLELISM { + let embeddings: Result>, _> = texts + .chunks(self.prompt_count_in_chunk_hint()) + .map(move |chunk| self.embed(chunk, None)) + .collect(); - let embeddings = embeddings?; - Ok(embeddings.into_iter().flatten().collect()) - }) - .map_err(|error| EmbedError { - kind: EmbedErrorKind::PanicInThreadPool(error), - fault: FaultSource::Bug, - })? + let embeddings = embeddings?; + Ok(embeddings.into_iter().flatten().collect()) + } else { + threads + .install(move || { + let embeddings: Result>, _> = texts + .par_chunks(self.prompt_count_in_chunk_hint()) + .map(move |chunk| self.embed(chunk, None)) + .collect(); + + let embeddings = embeddings?; + Ok(embeddings.into_iter().flatten().collect()) + }) + .map_err(|error| EmbedError { + kind: EmbedErrorKind::PanicInThreadPool(error), + fault: FaultSource::Bug, + })? + } } pub fn chunk_count_hint(&self) -> usize { diff --git a/crates/milli/src/vector/openai.rs b/crates/milli/src/vector/openai.rs index 938c04fe3..c9da3d2da 100644 --- a/crates/milli/src/vector/openai.rs +++ b/crates/milli/src/vector/openai.rs @@ -7,7 +7,7 @@ use rayon::slice::ParallelSlice as _; use super::error::{EmbedError, NewEmbedderError}; use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions}; -use super::DistributionShift; +use super::{DistributionShift, REQUEST_PARALLELISM}; use crate::error::FaultSource; use crate::vector::error::EmbedErrorKind; use crate::vector::Embedding; @@ -270,20 +270,29 @@ impl Embedder { texts: &[&str], threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { - threads - .install(move || { - let embeddings: Result>, _> = texts - .par_chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed(chunk, None)) - .collect(); + if threads.active_operations() >= REQUEST_PARALLELISM { + let embeddings: Result>, _> = texts + .chunks(self.prompt_count_in_chunk_hint()) + .map(move |chunk| self.embed(chunk, None)) + .collect(); + let embeddings = embeddings?; + Ok(embeddings.into_iter().flatten().collect()) + } else { + threads + .install(move || { + let embeddings: Result>, _> = texts + .par_chunks(self.prompt_count_in_chunk_hint()) + .map(move |chunk| self.embed(chunk, None)) + .collect(); - let embeddings = embeddings?; - Ok(embeddings.into_iter().flatten().collect()) - }) - .map_err(|error| EmbedError { - kind: EmbedErrorKind::PanicInThreadPool(error), - fault: FaultSource::Bug, - })? + let embeddings = embeddings?; + Ok(embeddings.into_iter().flatten().collect()) + }) + .map_err(|error| EmbedError { + kind: EmbedErrorKind::PanicInThreadPool(error), + fault: FaultSource::Bug, + })? + } } pub fn chunk_count_hint(&self) -> usize { diff --git a/crates/milli/src/vector/rest.rs b/crates/milli/src/vector/rest.rs index eb05bac64..0abb98315 100644 --- a/crates/milli/src/vector/rest.rs +++ b/crates/milli/src/vector/rest.rs @@ -203,20 +203,30 @@ impl Embedder { texts: &[&str], threads: &ThreadPoolNoAbort, ) -> Result, EmbedError> { - threads - .install(move || { - let embeddings: Result>, _> = texts - .par_chunks(self.prompt_count_in_chunk_hint()) - .map(move |chunk| self.embed_ref(chunk, None)) - .collect(); + if threads.active_operations() >= REQUEST_PARALLELISM { + let embeddings: Result>, _> = texts + .chunks(self.prompt_count_in_chunk_hint()) + .map(move |chunk| self.embed_ref(chunk, None)) + .collect(); - let embeddings = embeddings?; - Ok(embeddings.into_iter().flatten().collect()) - }) - .map_err(|error| EmbedError { - kind: EmbedErrorKind::PanicInThreadPool(error), - fault: FaultSource::Bug, - })? + let embeddings = embeddings?; + Ok(embeddings.into_iter().flatten().collect()) + } else { + threads + .install(move || { + let embeddings: Result>, _> = texts + .par_chunks(self.prompt_count_in_chunk_hint()) + .map(move |chunk| self.embed_ref(chunk, None)) + .collect(); + + let embeddings = embeddings?; + Ok(embeddings.into_iter().flatten().collect()) + }) + .map_err(|error| EmbedError { + kind: EmbedErrorKind::PanicInThreadPool(error), + fault: FaultSource::Bug, + })? + } } pub fn chunk_count_hint(&self) -> usize { From 915cc377fb0cebd4b6948852398b9358082efe92 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 28 Jan 2025 17:40:50 +0100 Subject: [PATCH 3/7] Refine the env variable and the max readers --- crates/index-scheduler/src/index_mapper/index_map.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/crates/index-scheduler/src/index_mapper/index_map.rs b/crates/index-scheduler/src/index_mapper/index_map.rs index 3031043a9..8324eefd0 100644 --- a/crates/index-scheduler/src/index_mapper/index_map.rs +++ b/crates/index-scheduler/src/index_mapper/index_map.rs @@ -1,5 +1,7 @@ use std::collections::BTreeMap; +use std::env::VarError; use std::path::Path; +use std::str::FromStr; use std::time::Duration; use meilisearch_types::heed::{EnvClosingEvent, EnvFlags, EnvOpenOptions}; @@ -304,7 +306,15 @@ fn create_or_open_index( ) -> Result { let mut options = EnvOpenOptions::new(); options.map_size(clamp_to_page_size(map_size)); - options.max_readers(1024); + + let max_readers = match std::env::var("MEILI_EXPERIMENTAL_INDEX_MAX_READERS") { + Ok(value) => u32::from_str(&value).unwrap(), + Err(VarError::NotPresent) => 1024, + Err(VarError::NotUnicode(value)) => panic!( + "Invalid unicode for the `MEILI_EXPERIMENTAL_INDEX_MAX_READERS` env var: {value:?}" + ), + }; + options.max_readers(max_readers); if enable_mdb_writemap { unsafe { options.flags(EnvFlags::WRITE_MAP) }; } From 48812229a9e8d4ece7330b37c6de24a5a7037c2a Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 28 Jan 2025 17:41:37 +0100 Subject: [PATCH 4/7] Remove a log that would log too much --- crates/milli/src/vector/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/milli/src/vector/mod.rs b/crates/milli/src/vector/mod.rs index 92611b400..0be698027 100644 --- a/crates/milli/src/vector/mod.rs +++ b/crates/milli/src/vector/mod.rs @@ -638,7 +638,6 @@ impl Embedder { } } - #[tracing::instrument(level = "debug", skip_all, target = "indexing::vector")] pub fn embed_chunks_ref( &self, texts: &[&str], From 62dabeba5f0efa0464ba5b6e3db2a5536fd476ce Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Wed, 29 Jan 2025 17:02:06 +0100 Subject: [PATCH 5/7] Do not create too many rayon tasks when processing the settings --- crates/milli/src/vector/ollama.rs | 20 ++++++++++++-------- crates/milli/src/vector/openai.rs | 20 ++++++++++++-------- crates/milli/src/vector/rest.rs | 20 ++++++++++++-------- 3 files changed, 36 insertions(+), 24 deletions(-) diff --git a/crates/milli/src/vector/ollama.rs b/crates/milli/src/vector/ollama.rs index 2276bbd3e..ef5cfd937 100644 --- a/crates/milli/src/vector/ollama.rs +++ b/crates/milli/src/vector/ollama.rs @@ -118,14 +118,18 @@ impl Embedder { text_chunks: Vec>, threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { - threads - .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect() - }) - .map_err(|error| EmbedError { - kind: EmbedErrorKind::PanicInThreadPool(error), - fault: FaultSource::Bug, - })? + if threads.active_operations() >= REQUEST_PARALLELISM { + text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect() + } else { + threads + .install(move || { + text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect() + }) + .map_err(|error| EmbedError { + kind: EmbedErrorKind::PanicInThreadPool(error), + fault: FaultSource::Bug, + })? + } } pub(crate) fn embed_chunks_ref( diff --git a/crates/milli/src/vector/openai.rs b/crates/milli/src/vector/openai.rs index c9da3d2da..afb48bdcd 100644 --- a/crates/milli/src/vector/openai.rs +++ b/crates/milli/src/vector/openai.rs @@ -255,14 +255,18 @@ impl Embedder { text_chunks: Vec>, threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { - threads - .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect() - }) - .map_err(|error| EmbedError { - kind: EmbedErrorKind::PanicInThreadPool(error), - fault: FaultSource::Bug, - })? + if threads.active_operations() >= REQUEST_PARALLELISM { + text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect() + } else { + threads + .install(move || { + text_chunks.into_par_iter().map(move |chunk| self.embed(&chunk, None)).collect() + }) + .map_err(|error| EmbedError { + kind: EmbedErrorKind::PanicInThreadPool(error), + fault: FaultSource::Bug, + })? + } } pub(crate) fn embed_chunks_ref( diff --git a/crates/milli/src/vector/rest.rs b/crates/milli/src/vector/rest.rs index 0abb98315..49be155c1 100644 --- a/crates/milli/src/vector/rest.rs +++ b/crates/milli/src/vector/rest.rs @@ -188,14 +188,18 @@ impl Embedder { text_chunks: Vec>, threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { - threads - .install(move || { - text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect() - }) - .map_err(|error| EmbedError { - kind: EmbedErrorKind::PanicInThreadPool(error), - fault: FaultSource::Bug, - })? + if threads.active_operations() >= REQUEST_PARALLELISM { + text_chunks.into_iter().map(move |chunk| self.embed(chunk, None)).collect() + } else { + threads + .install(move || { + text_chunks.into_par_iter().map(move |chunk| self.embed(chunk, None)).collect() + }) + .map_err(|error| EmbedError { + kind: EmbedErrorKind::PanicInThreadPool(error), + fault: FaultSource::Bug, + })? + } } pub(crate) fn embed_chunks_ref( From 7a9382b115374ee8aa6e5d07a6b160eff4ebf4a2 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 30 Jan 2025 11:29:41 +0100 Subject: [PATCH 6/7] Better document the rayon limitation condition --- crates/milli/src/vector/ollama.rs | 4 ++++ crates/milli/src/vector/openai.rs | 4 ++++ crates/milli/src/vector/rest.rs | 4 ++++ 3 files changed, 12 insertions(+) diff --git a/crates/milli/src/vector/ollama.rs b/crates/milli/src/vector/ollama.rs index ef5cfd937..d2a80d6b5 100644 --- a/crates/milli/src/vector/ollama.rs +++ b/crates/milli/src/vector/ollama.rs @@ -118,6 +118,8 @@ impl Embedder { text_chunks: Vec>, threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { + // This condition helps reduce the number of active rayon jobs + // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect() } else { @@ -137,6 +139,8 @@ impl Embedder { texts: &[&str], threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { + // This condition helps reduce the number of active rayon jobs + // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { let embeddings: Result>, _> = texts .chunks(self.prompt_count_in_chunk_hint()) diff --git a/crates/milli/src/vector/openai.rs b/crates/milli/src/vector/openai.rs index afb48bdcd..c7aec5d93 100644 --- a/crates/milli/src/vector/openai.rs +++ b/crates/milli/src/vector/openai.rs @@ -255,6 +255,8 @@ impl Embedder { text_chunks: Vec>, threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { + // This condition helps reduce the number of active rayon jobs + // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { text_chunks.into_iter().map(move |chunk| self.embed(&chunk, None)).collect() } else { @@ -274,6 +276,8 @@ impl Embedder { texts: &[&str], threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { + // This condition helps reduce the number of active rayon jobs + // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { let embeddings: Result>, _> = texts .chunks(self.prompt_count_in_chunk_hint()) diff --git a/crates/milli/src/vector/rest.rs b/crates/milli/src/vector/rest.rs index 49be155c1..58d805aaf 100644 --- a/crates/milli/src/vector/rest.rs +++ b/crates/milli/src/vector/rest.rs @@ -188,6 +188,8 @@ impl Embedder { text_chunks: Vec>, threads: &ThreadPoolNoAbort, ) -> Result>, EmbedError> { + // This condition helps reduce the number of active rayon jobs + // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { text_chunks.into_iter().map(move |chunk| self.embed(chunk, None)).collect() } else { @@ -207,6 +209,8 @@ impl Embedder { texts: &[&str], threads: &ThreadPoolNoAbort, ) -> Result, EmbedError> { + // This condition helps reduce the number of active rayon jobs + // so that we avoid consuming all the LMDB rtxns and avoid stack overflows. if threads.active_operations() >= REQUEST_PARALLELISM { let embeddings: Result>, _> = texts .chunks(self.prompt_count_in_chunk_hint()) From 6a70c0ec928cbedfbfdd4b78ec3d8ef8b51460d4 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Thu, 30 Jan 2025 11:43:01 +0100 Subject: [PATCH 7/7] Add a link to the experimental feature GitHub discussion --- crates/index-scheduler/src/index_mapper/index_map.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/index-scheduler/src/index_mapper/index_map.rs b/crates/index-scheduler/src/index_mapper/index_map.rs index 8324eefd0..e4eb9bfb8 100644 --- a/crates/index-scheduler/src/index_mapper/index_map.rs +++ b/crates/index-scheduler/src/index_mapper/index_map.rs @@ -307,6 +307,9 @@ fn create_or_open_index( let mut options = EnvOpenOptions::new(); options.map_size(clamp_to_page_size(map_size)); + // You can find more details about this experimental + // environment variable on the following GitHub discussion: + // let max_readers = match std::env::var("MEILI_EXPERIMENTAL_INDEX_MAX_READERS") { Ok(value) => u32::from_str(&value).unwrap(), Err(VarError::NotPresent) => 1024,