From d9111fe8ce3946905e7686623ba97e83ad4370a4 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 13 Mar 2025 11:11:54 +0100 Subject: [PATCH 1/5] Add lru crate to milli again --- Cargo.lock | 10 ++++++++++ crates/milli/Cargo.toml | 1 + 2 files changed, 11 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 0afbdf5ad..ee161b8d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3498,6 +3498,15 @@ version = "0.4.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e" +[[package]] +name = "lru" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "227748d55f2f0ab4735d87fd623798cb6b664512fe979705f829c9f81c934465" +dependencies = [ + "hashbrown 0.15.2", +] + [[package]] name = "lzma-rs" version = "0.3.0" @@ -3778,6 +3787,7 @@ dependencies = [ "json-depth-checker", "levenshtein_automata", "liquid", + "lru", "maplit", "md5", "meili-snap", diff --git a/crates/milli/Cargo.toml b/crates/milli/Cargo.toml index dc95135a2..2d7a3ca0c 100644 --- a/crates/milli/Cargo.toml +++ b/crates/milli/Cargo.toml @@ -110,6 +110,7 @@ utoipa = { version = "5.3.1", features = [ "time", "openapi_extensions", ] } +lru = "0.13.0" [dev-dependencies] mimalloc = { version = "0.1.43", default-features = false } From b08544e86dfdd7c31585edd4134b7c6608ff5dbf Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 13 Mar 2025 11:13:14 +0100 Subject: [PATCH 2/5] Add embedding cache --- crates/meilisearch/src/search/mod.rs | 2 +- crates/milli/src/search/hybrid.rs | 2 +- crates/milli/src/vector/composite.rs | 34 +++++++++- crates/milli/src/vector/hf.rs | 16 ++++- crates/milli/src/vector/mod.rs | 95 +++++++++++++++++++++++++--- crates/milli/src/vector/ollama.rs | 6 +- crates/milli/src/vector/openai.rs | 6 +- crates/milli/src/vector/rest.rs | 17 ++++- 8 files changed, 159 insertions(+), 19 deletions(-) diff --git a/crates/meilisearch/src/search/mod.rs b/crates/meilisearch/src/search/mod.rs index 69b306abc..35bb883ad 100644 --- a/crates/meilisearch/src/search/mod.rs +++ b/crates/meilisearch/src/search/mod.rs @@ -916,7 +916,7 @@ fn prepare_search<'t>( let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10); embedder - .embed_search(query.q.clone().unwrap(), Some(deadline)) + .embed_search(query.q.as_ref().unwrap(), Some(deadline)) .map_err(milli::vector::Error::from) .map_err(milli::Error::from)? } diff --git a/crates/milli/src/search/hybrid.rs b/crates/milli/src/search/hybrid.rs index a1c8b71da..298248c8b 100644 --- a/crates/milli/src/search/hybrid.rs +++ b/crates/milli/src/search/hybrid.rs @@ -203,7 +203,7 @@ impl<'a> Search<'a> { let deadline = std::time::Instant::now() + std::time::Duration::from_secs(3); - match embedder.embed_search(query, Some(deadline)) { + match embedder.embed_search(&query, Some(deadline)) { Ok(embedding) => embedding, Err(error) => { tracing::error!(error=%error, "Embedding failed"); diff --git a/crates/milli/src/vector/composite.rs b/crates/milli/src/vector/composite.rs index d174232bf..368fb7f18 100644 --- a/crates/milli/src/vector/composite.rs +++ b/crates/milli/src/vector/composite.rs @@ -4,7 +4,8 @@ use arroy::Distance; use super::error::CompositeEmbedderContainsHuggingFace; use super::{ - hf, manual, ollama, openai, rest, DistributionShift, EmbedError, Embedding, NewEmbedderError, + hf, manual, ollama, openai, rest, DistributionShift, EmbedError, Embedding, EmbeddingCache, + NewEmbedderError, }; use crate::ThreadPoolNoAbort; @@ -148,6 +149,27 @@ impl SubEmbedder { } } + pub fn embed_one( + &self, + text: &str, + deadline: Option, + ) -> std::result::Result { + match self { + SubEmbedder::HuggingFace(embedder) => embedder.embed_one(text), + SubEmbedder::OpenAi(embedder) => { + embedder.embed(&[text], deadline)?.pop().ok_or_else(EmbedError::missing_embedding) + } + SubEmbedder::Ollama(embedder) => { + embedder.embed(&[text], deadline)?.pop().ok_or_else(EmbedError::missing_embedding) + } + SubEmbedder::UserProvided(embedder) => embedder.embed_one(text), + SubEmbedder::Rest(embedder) => embedder + .embed_ref(&[text], deadline)? + .pop() + .ok_or_else(EmbedError::missing_embedding), + } + } + /// Embed multiple chunks of texts. /// /// Each chunk is composed of one or multiple texts. @@ -233,6 +255,16 @@ impl SubEmbedder { SubEmbedder::Rest(embedder) => embedder.distribution(), } } + + pub(super) fn cache(&self) -> Option<&EmbeddingCache> { + match self { + SubEmbedder::HuggingFace(embedder) => Some(embedder.cache()), + SubEmbedder::OpenAi(embedder) => Some(embedder.cache()), + SubEmbedder::UserProvided(_) => None, + SubEmbedder::Ollama(embedder) => Some(embedder.cache()), + SubEmbedder::Rest(embedder) => Some(embedder.cache()), + } + } } fn check_similarity( diff --git a/crates/milli/src/vector/hf.rs b/crates/milli/src/vector/hf.rs index 60e40e367..6e73c8247 100644 --- a/crates/milli/src/vector/hf.rs +++ b/crates/milli/src/vector/hf.rs @@ -7,7 +7,7 @@ use hf_hub::{Repo, RepoType}; use tokenizers::{PaddingParams, Tokenizer}; pub use super::error::{EmbedError, Error, NewEmbedderError}; -use super::{DistributionShift, Embedding}; +use super::{DistributionShift, Embedding, EmbeddingCache}; #[derive( Debug, @@ -84,6 +84,7 @@ pub struct Embedder { options: EmbedderOptions, dimensions: usize, pooling: Pooling, + cache: EmbeddingCache, } impl std::fmt::Debug for Embedder { @@ -245,7 +246,14 @@ impl Embedder { tokenizer.with_padding(Some(pp)); } - let mut this = Self { model, tokenizer, options, dimensions: 0, pooling }; + let mut this = Self { + model, + tokenizer, + options, + dimensions: 0, + pooling, + cache: EmbeddingCache::new(super::CAP_PER_THREAD), + }; let embeddings = this .embed(vec!["test".into()]) @@ -355,4 +363,8 @@ impl Embedder { pub(crate) fn embed_index_ref(&self, texts: &[&str]) -> Result, EmbedError> { texts.iter().map(|text| self.embed_one(text)).collect() } + + pub(super) fn cache(&self) -> &EmbeddingCache { + &self.cache + } } diff --git a/crates/milli/src/vector/mod.rs b/crates/milli/src/vector/mod.rs index 80efc210d..55b865b4a 100644 --- a/crates/milli/src/vector/mod.rs +++ b/crates/milli/src/vector/mod.rs @@ -1,4 +1,6 @@ +use std::cell::RefCell; use std::collections::HashMap; +use std::num::{NonZeroUsize, TryFromIntError}; use std::sync::Arc; use std::time::Instant; @@ -551,6 +553,51 @@ pub enum Embedder { Composite(composite::Embedder), } +#[derive(Debug)] +struct EmbeddingCache { + data: thread_local::ThreadLocal>>, + cap_per_thread: u16, +} + +impl EmbeddingCache { + pub fn new(cap_per_thread: u16) -> Self { + Self { cap_per_thread, data: thread_local::ThreadLocal::new() } + } + + /// Get the embedding corresponding to `text`, if any is present in the cache. + pub fn get(&self, text: &str) -> Option { + let mut cache = self + .data + .get_or_try(|| -> Result>>, TryFromIntError> { + Ok(RefCell::new(lru::LruCache::new(NonZeroUsize::try_from( + self.cap_per_thread as usize, + )?))) + }) + .ok()? + .borrow_mut(); + + cache.get(text).cloned() + } + + /// Puts a new embedding for the specified `text` + pub fn put(&self, text: String, embedding: Embedding) { + let Ok(cache) = self.data.get_or_try( + || -> Result>>, TryFromIntError> { + Ok(RefCell::new(lru::LruCache::new(NonZeroUsize::try_from( + self.cap_per_thread as usize, + )?))) + }, + ) else { + return; + }; + let mut cache = cache.borrow_mut(); + + cache.put(text, embedding); + } +} + +pub const CAP_PER_THREAD: u16 = 20; + /// Configuration for an embedder. #[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)] pub struct EmbeddingConfig { @@ -651,19 +698,36 @@ impl Embedder { #[tracing::instrument(level = "debug", skip_all, target = "search")] pub fn embed_search( &self, - text: String, + text: &str, deadline: Option, ) -> std::result::Result { - let texts = vec![text]; - let mut embedding = match self { - Embedder::HuggingFace(embedder) => embedder.embed(texts), - Embedder::OpenAi(embedder) => embedder.embed(&texts, deadline), - Embedder::Ollama(embedder) => embedder.embed(&texts, deadline), - Embedder::UserProvided(embedder) => embedder.embed(&texts), - Embedder::Rest(embedder) => embedder.embed(texts, deadline), - Embedder::Composite(embedder) => embedder.search.embed(texts, deadline), + if let Some(cache) = self.cache() { + if let Some(embedding) = cache.get(text) { + tracing::trace!(text, "embedding found in cache"); + return Ok(embedding); + } + } + let embedding = match self { + Embedder::HuggingFace(embedder) => embedder.embed_one(text), + Embedder::OpenAi(embedder) => { + embedder.embed(&[text], deadline)?.pop().ok_or_else(EmbedError::missing_embedding) + } + Embedder::Ollama(embedder) => { + embedder.embed(&[text], deadline)?.pop().ok_or_else(EmbedError::missing_embedding) + } + Embedder::UserProvided(embedder) => embedder.embed_one(text), + Embedder::Rest(embedder) => embedder + .embed_ref(&[text], deadline)? + .pop() + .ok_or_else(EmbedError::missing_embedding), + Embedder::Composite(embedder) => embedder.search.embed_one(text, deadline), }?; - let embedding = embedding.pop().ok_or_else(EmbedError::missing_embedding)?; + + if let Some(cache) = self.cache() { + tracing::trace!(text, "embedding added to cache"); + cache.put(text.to_owned(), embedding.clone()); + } + Ok(embedding) } @@ -759,6 +823,17 @@ impl Embedder { Embedder::Composite(embedder) => embedder.index.uses_document_template(), } } + + fn cache(&self) -> Option<&EmbeddingCache> { + match self { + Embedder::HuggingFace(embedder) => Some(embedder.cache()), + Embedder::OpenAi(embedder) => Some(embedder.cache()), + Embedder::UserProvided(_) => None, + Embedder::Ollama(embedder) => Some(embedder.cache()), + Embedder::Rest(embedder) => Some(embedder.cache()), + Embedder::Composite(embedder) => embedder.search.cache(), + } + } } /// Describes the mean and sigma of distribution of embedding similarity in the embedding space. diff --git a/crates/milli/src/vector/ollama.rs b/crates/milli/src/vector/ollama.rs index 130e90cee..57c71538e 100644 --- a/crates/milli/src/vector/ollama.rs +++ b/crates/milli/src/vector/ollama.rs @@ -5,7 +5,7 @@ use rayon::slice::ParallelSlice as _; use super::error::{EmbedError, EmbedErrorKind, NewEmbedderError, NewEmbedderErrorKind}; use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions}; -use super::{DistributionShift, REQUEST_PARALLELISM}; +use super::{DistributionShift, EmbeddingCache, REQUEST_PARALLELISM}; use crate::error::FaultSource; use crate::vector::Embedding; use crate::ThreadPoolNoAbort; @@ -182,6 +182,10 @@ impl Embedder { pub fn distribution(&self) -> Option { self.rest_embedder.distribution() } + + pub(super) fn cache(&self) -> &EmbeddingCache { + self.rest_embedder.cache() + } } fn get_ollama_path() -> String { diff --git a/crates/milli/src/vector/openai.rs b/crates/milli/src/vector/openai.rs index 8a5e6266a..66680adb0 100644 --- a/crates/milli/src/vector/openai.rs +++ b/crates/milli/src/vector/openai.rs @@ -7,7 +7,7 @@ use rayon::slice::ParallelSlice as _; use super::error::{EmbedError, NewEmbedderError}; use super::rest::{Embedder as RestEmbedder, EmbedderOptions as RestEmbedderOptions}; -use super::{DistributionShift, REQUEST_PARALLELISM}; +use super::{DistributionShift, EmbeddingCache, REQUEST_PARALLELISM}; use crate::error::FaultSource; use crate::vector::error::EmbedErrorKind; use crate::vector::Embedding; @@ -318,6 +318,10 @@ impl Embedder { pub fn distribution(&self) -> Option { self.options.distribution() } + + pub(super) fn cache(&self) -> &EmbeddingCache { + self.rest_embedder.cache() + } } impl fmt::Debug for Embedder { diff --git a/crates/milli/src/vector/rest.rs b/crates/milli/src/vector/rest.rs index a31bc5d2f..b9b8b0fb3 100644 --- a/crates/milli/src/vector/rest.rs +++ b/crates/milli/src/vector/rest.rs @@ -9,7 +9,10 @@ use serde::{Deserialize, Serialize}; use super::error::EmbedErrorKind; use super::json_template::ValueTemplate; -use super::{DistributionShift, EmbedError, Embedding, NewEmbedderError, REQUEST_PARALLELISM}; +use super::{ + DistributionShift, EmbedError, Embedding, EmbeddingCache, NewEmbedderError, CAP_PER_THREAD, + REQUEST_PARALLELISM, +}; use crate::error::FaultSource; use crate::ThreadPoolNoAbort; @@ -75,6 +78,7 @@ pub struct Embedder { data: EmbedderData, dimensions: usize, distribution: Option, + cache: EmbeddingCache, } /// All data needed to perform requests and parse responses @@ -152,7 +156,12 @@ impl Embedder { infer_dimensions(&data)? }; - Ok(Self { data, dimensions, distribution: options.distribution }) + Ok(Self { + data, + dimensions, + distribution: options.distribution, + cache: EmbeddingCache::new(CAP_PER_THREAD), + }) } pub fn embed( @@ -256,6 +265,10 @@ impl Embedder { pub fn distribution(&self) -> Option { self.distribution } + + pub(super) fn cache(&self) -> &EmbeddingCache { + &self.cache + } } fn infer_dimensions(data: &EmbedderData) -> Result { From d0b0b90d17166a0d440f95656d0fef4447ba7b22 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 13 Mar 2025 11:13:36 +0100 Subject: [PATCH 3/5] fixup tests, in particular foil the cache for the timeout test --- crates/index-scheduler/src/scheduler/test_embedders.rs | 7 +++---- crates/meilisearch/tests/vector/openai.rs | 2 +- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/crates/index-scheduler/src/scheduler/test_embedders.rs b/crates/index-scheduler/src/scheduler/test_embedders.rs index 05929b651..772aa1520 100644 --- a/crates/index-scheduler/src/scheduler/test_embedders.rs +++ b/crates/index-scheduler/src/scheduler/test_embedders.rs @@ -104,10 +104,9 @@ fn import_vectors() { let configs = index_scheduler.embedders("doggos".to_string(), configs).unwrap(); let (hf_embedder, _, _) = configs.get(&simple_hf_name).unwrap(); - let beagle_embed = - hf_embedder.embed_search(S("Intel the beagle best doggo"), None).unwrap(); - let lab_embed = hf_embedder.embed_search(S("Max the lab best doggo"), None).unwrap(); - let patou_embed = hf_embedder.embed_search(S("kefir the patou best doggo"), None).unwrap(); + let beagle_embed = hf_embedder.embed_search("Intel the beagle best doggo", None).unwrap(); + let lab_embed = hf_embedder.embed_search("Max the lab best doggo", None).unwrap(); + let patou_embed = hf_embedder.embed_search("kefir the patou best doggo", None).unwrap(); (fakerest_name, simple_hf_name, beagle_embed, lab_embed, patou_embed) }; diff --git a/crates/meilisearch/tests/vector/openai.rs b/crates/meilisearch/tests/vector/openai.rs index b02111639..4ae8cb041 100644 --- a/crates/meilisearch/tests/vector/openai.rs +++ b/crates/meilisearch/tests/vector/openai.rs @@ -1995,7 +1995,7 @@ async fn timeout() { let (response, code) = index .search_post(json!({ - "q": "grand chien de berger des montagnes", + "q": "grand chien de berger des montagnes foil the cache", "hybrid": {"semanticRatio": 0.99, "embedder": "default"} })) .await; From 187613217277db7875ef9d567fe792be6969fbbf Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 13 Mar 2025 12:00:11 +0100 Subject: [PATCH 4/5] Mutex-based implementation --- crates/milli/src/vector/hf.rs | 2 +- crates/milli/src/vector/mod.rs | 44 ++++++++++++++------------------- crates/milli/src/vector/rest.rs | 4 +-- 3 files changed, 22 insertions(+), 28 deletions(-) diff --git a/crates/milli/src/vector/hf.rs b/crates/milli/src/vector/hf.rs index 6e73c8247..ce7429d36 100644 --- a/crates/milli/src/vector/hf.rs +++ b/crates/milli/src/vector/hf.rs @@ -252,7 +252,7 @@ impl Embedder { options, dimensions: 0, pooling, - cache: EmbeddingCache::new(super::CAP_PER_THREAD), + cache: EmbeddingCache::new(super::CACHE_CAP), }; let embeddings = this diff --git a/crates/milli/src/vector/mod.rs b/crates/milli/src/vector/mod.rs index 55b865b4a..476ba28c9 100644 --- a/crates/milli/src/vector/mod.rs +++ b/crates/milli/src/vector/mod.rs @@ -1,7 +1,6 @@ -use std::cell::RefCell; use std::collections::HashMap; -use std::num::{NonZeroUsize, TryFromIntError}; -use std::sync::Arc; +use std::num::NonZeroUsize; +use std::sync::{Arc, Mutex}; use std::time::Instant; use arroy::distances::{BinaryQuantizedCosine, Cosine}; @@ -555,48 +554,43 @@ pub enum Embedder { #[derive(Debug)] struct EmbeddingCache { - data: thread_local::ThreadLocal>>, - cap_per_thread: u16, + data: Option>>, } impl EmbeddingCache { - pub fn new(cap_per_thread: u16) -> Self { - Self { cap_per_thread, data: thread_local::ThreadLocal::new() } + const MAX_TEXT_LEN: usize = 2000; + + pub fn new(cap: u16) -> Self { + let data = NonZeroUsize::new(cap.into()).map(lru::LruCache::new).map(Mutex::new); + Self { data } } /// Get the embedding corresponding to `text`, if any is present in the cache. pub fn get(&self, text: &str) -> Option { - let mut cache = self - .data - .get_or_try(|| -> Result>>, TryFromIntError> { - Ok(RefCell::new(lru::LruCache::new(NonZeroUsize::try_from( - self.cap_per_thread as usize, - )?))) - }) - .ok()? - .borrow_mut(); + let data = self.data.as_ref()?; + if text.len() > Self::MAX_TEXT_LEN { + return None; + } + let mut cache = data.lock().unwrap(); cache.get(text).cloned() } /// Puts a new embedding for the specified `text` pub fn put(&self, text: String, embedding: Embedding) { - let Ok(cache) = self.data.get_or_try( - || -> Result>>, TryFromIntError> { - Ok(RefCell::new(lru::LruCache::new(NonZeroUsize::try_from( - self.cap_per_thread as usize, - )?))) - }, - ) else { + let Some(data) = self.data.as_ref() else { return; }; - let mut cache = cache.borrow_mut(); + if text.len() > Self::MAX_TEXT_LEN { + return; + } + let mut cache = data.lock().unwrap(); cache.put(text, embedding); } } -pub const CAP_PER_THREAD: u16 = 20; +pub const CACHE_CAP: u16 = 150; /// Configuration for an embedder. #[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)] diff --git a/crates/milli/src/vector/rest.rs b/crates/milli/src/vector/rest.rs index b9b8b0fb3..9761c753e 100644 --- a/crates/milli/src/vector/rest.rs +++ b/crates/milli/src/vector/rest.rs @@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize}; use super::error::EmbedErrorKind; use super::json_template::ValueTemplate; use super::{ - DistributionShift, EmbedError, Embedding, EmbeddingCache, NewEmbedderError, CAP_PER_THREAD, + DistributionShift, EmbedError, Embedding, EmbeddingCache, NewEmbedderError, CACHE_CAP, REQUEST_PARALLELISM, }; use crate::error::FaultSource; @@ -160,7 +160,7 @@ impl Embedder { data, dimensions, distribution: options.distribution, - cache: EmbeddingCache::new(CAP_PER_THREAD), + cache: EmbeddingCache::new(CACHE_CAP), }) } From e2d372823a31360fe730609ca5cf3fe6fdeb9970 Mon Sep 17 00:00:00 2001 From: Louis Dureuil Date: Thu, 13 Mar 2025 14:54:31 +0100 Subject: [PATCH 5/5] Disable the cache by default and make it experimental --- crates/index-scheduler/src/lib.rs | 11 +++++- crates/index-scheduler/src/scheduler/mod.rs | 7 ++++ crates/index-scheduler/src/test_utils.rs | 1 + .../src/analytics/segment_analytics.rs | 3 ++ crates/meilisearch/src/lib.rs | 1 + crates/meilisearch/src/option.rs | 20 ++++++++++- .../milli/src/update/index_documents/mod.rs | 5 +-- crates/milli/src/update/settings.rs | 3 +- crates/milli/src/vector/composite.rs | 29 ++++++++++----- crates/milli/src/vector/hf.rs | 7 ++-- crates/milli/src/vector/mod.rs | 36 ++++++++++++------- crates/milli/src/vector/ollama.rs | 3 +- crates/milli/src/vector/openai.rs | 3 +- crates/milli/src/vector/rest.rs | 6 ++-- 14 files changed, 101 insertions(+), 34 deletions(-) diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index 70b280301..5c8517650 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -125,6 +125,10 @@ pub struct IndexSchedulerOptions { pub instance_features: InstanceTogglableFeatures, /// The experimental features enabled for this instance. pub auto_upgrade: bool, + /// The maximal number of entries in the search query cache of an embedder. + /// + /// 0 disables the cache. + pub embedding_cache_cap: usize, } /// Structure which holds meilisearch's indexes and schedules the tasks @@ -156,6 +160,11 @@ pub struct IndexScheduler { /// The Authorization header to send to the webhook URL. pub(crate) webhook_authorization_header: Option, + /// A map to retrieve the runtime representation of an embedder depending on its configuration. + /// + /// This map may return the same embedder object for two different indexes or embedder settings, + /// but it will only do this if the embedder configuration options are the same, leading + /// to the same embeddings for the same input text. embedders: Arc>>>, // ================= test @@ -818,7 +827,7 @@ impl IndexScheduler { // add missing embedder let embedder = Arc::new( - Embedder::new(embedder_options.clone()) + Embedder::new(embedder_options.clone(), self.scheduler.embedding_cache_cap) .map_err(meilisearch_types::milli::vector::Error::from) .map_err(|err| { Error::from_milli(err.into(), Some(index_uid.clone())) diff --git a/crates/index-scheduler/src/scheduler/mod.rs b/crates/index-scheduler/src/scheduler/mod.rs index 68591d664..1cbfece34 100644 --- a/crates/index-scheduler/src/scheduler/mod.rs +++ b/crates/index-scheduler/src/scheduler/mod.rs @@ -76,6 +76,11 @@ pub struct Scheduler { /// The path to the version file of Meilisearch. pub(crate) version_file_path: PathBuf, + + /// The maximal number of entries in the search query cache of an embedder. + /// + /// 0 disables the cache. + pub(crate) embedding_cache_cap: usize, } impl Scheduler { @@ -90,6 +95,7 @@ impl Scheduler { snapshots_path: self.snapshots_path.clone(), auth_env: self.auth_env.clone(), version_file_path: self.version_file_path.clone(), + embedding_cache_cap: self.embedding_cache_cap, } } @@ -105,6 +111,7 @@ impl Scheduler { snapshots_path: options.snapshots_path.clone(), auth_env, version_file_path: options.version_file_path.clone(), + embedding_cache_cap: options.embedding_cache_cap, } } } diff --git a/crates/index-scheduler/src/test_utils.rs b/crates/index-scheduler/src/test_utils.rs index 3efcc523a..5c04a66ff 100644 --- a/crates/index-scheduler/src/test_utils.rs +++ b/crates/index-scheduler/src/test_utils.rs @@ -112,6 +112,7 @@ impl IndexScheduler { batched_tasks_size_limit: u64::MAX, instance_features: Default::default(), auto_upgrade: true, // Don't cost much and will ensure the happy path works + embedding_cache_cap: 10, }; let version = configuration(&mut options).unwrap_or_else(|| { ( diff --git a/crates/meilisearch/src/analytics/segment_analytics.rs b/crates/meilisearch/src/analytics/segment_analytics.rs index a681e9e29..504701739 100644 --- a/crates/meilisearch/src/analytics/segment_analytics.rs +++ b/crates/meilisearch/src/analytics/segment_analytics.rs @@ -199,6 +199,7 @@ struct Infos { experimental_network: bool, experimental_get_task_documents_route: bool, experimental_composite_embedders: bool, + experimental_embedding_cache_entries: usize, gpu_enabled: bool, db_path: bool, import_dump: bool, @@ -246,6 +247,7 @@ impl Infos { experimental_reduce_indexing_memory_usage, experimental_max_number_of_batched_tasks, experimental_limit_batched_tasks_total_size, + experimental_embedding_cache_entries, http_addr, master_key: _, env, @@ -312,6 +314,7 @@ impl Infos { experimental_network: network, experimental_get_task_documents_route: get_task_documents_route, experimental_composite_embedders: composite_embedders, + experimental_embedding_cache_entries, gpu_enabled: meilisearch_types::milli::vector::is_cuda_enabled(), db_path: db_path != PathBuf::from("./data.ms"), import_dump: import_dump.is_some(), diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 1841d5556..6ac36caf3 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -233,6 +233,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc, Arc< index_count: DEFAULT_INDEX_COUNT, instance_features: opt.to_instance_features(), auto_upgrade: opt.experimental_dumpless_upgrade, + embedding_cache_cap: opt.experimental_embedding_cache_entries, }; let bin_major: u32 = VERSION_MAJOR.parse().unwrap(); let bin_minor: u32 = VERSION_MINOR.parse().unwrap(); diff --git a/crates/meilisearch/src/option.rs b/crates/meilisearch/src/option.rs index acf4393d3..781d55aef 100644 --- a/crates/meilisearch/src/option.rs +++ b/crates/meilisearch/src/option.rs @@ -63,7 +63,8 @@ const MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS: &str = "MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS"; const MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE: &str = "MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_SIZE"; - +const MEILI_EXPERIMENTAL_EMBEDDING_CACHE_ENTRIES: &str = + "MEILI_EXPERIMENTAL_EMBEDDING_CACHE_ENTRIES"; const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml"; const DEFAULT_DB_PATH: &str = "./data.ms"; const DEFAULT_HTTP_ADDR: &str = "localhost:7700"; @@ -446,6 +447,14 @@ pub struct Opt { #[serde(default = "default_limit_batched_tasks_total_size")] pub experimental_limit_batched_tasks_total_size: u64, + /// Enables experimental caching of search query embeddings. The value represents the maximal number of entries in the cache of each + /// distinct embedder. + /// + /// For more information, see . + #[clap(long, env = MEILI_EXPERIMENTAL_EMBEDDING_CACHE_ENTRIES, default_value_t = default_embedding_cache_entries())] + #[serde(default = "default_embedding_cache_entries")] + pub experimental_embedding_cache_entries: usize, + #[serde(flatten)] #[clap(flatten)] pub indexer_options: IndexerOpts, @@ -549,6 +558,7 @@ impl Opt { experimental_reduce_indexing_memory_usage, experimental_max_number_of_batched_tasks, experimental_limit_batched_tasks_total_size, + experimental_embedding_cache_entries, } = self; export_to_env_if_not_present(MEILI_DB_PATH, db_path); export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr); @@ -641,6 +651,10 @@ impl Opt { MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE, experimental_limit_batched_tasks_total_size.to_string(), ); + export_to_env_if_not_present( + MEILI_EXPERIMENTAL_EMBEDDING_CACHE_ENTRIES, + experimental_embedding_cache_entries.to_string(), + ); indexer_options.export_to_env(); } @@ -948,6 +962,10 @@ fn default_limit_batched_tasks_total_size() -> u64 { u64::MAX } +fn default_embedding_cache_entries() -> usize { + 0 +} + fn default_snapshot_dir() -> PathBuf { PathBuf::from(DEFAULT_SNAPSHOT_DIR) } diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index a0228e9cf..dbbf58e4a 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -2806,8 +2806,9 @@ mod tests { embedding_configs.pop().unwrap(); insta::assert_snapshot!(embedder_name, @"manual"); insta::assert_debug_snapshot!(user_provided, @"RoaringBitmap<[0, 1, 2]>"); - let embedder = - std::sync::Arc::new(crate::vector::Embedder::new(embedder.embedder_options).unwrap()); + let embedder = std::sync::Arc::new( + crate::vector::Embedder::new(embedder.embedder_options, 0).unwrap(), + ); let res = index .search(&rtxn) .semantic(embedder_name, embedder, false, Some([0.0, 1.0, 2.0].to_vec())) diff --git a/crates/milli/src/update/settings.rs b/crates/milli/src/update/settings.rs index 571ffe1c6..325a9f15c 100644 --- a/crates/milli/src/update/settings.rs +++ b/crates/milli/src/update/settings.rs @@ -1628,7 +1628,8 @@ fn embedders(embedding_configs: Vec) -> Result Result { - let search = SubEmbedder::new(search)?; - let index = SubEmbedder::new(index)?; + let search = SubEmbedder::new(search, cache_cap)?; + // cache is only used at search + let index = SubEmbedder::new(index, 0)?; // check dimensions if search.dimensions() != index.dimensions() { @@ -119,19 +121,28 @@ impl Embedder { } impl SubEmbedder { - pub fn new(options: SubEmbedderOptions) -> std::result::Result { + pub fn new( + options: SubEmbedderOptions, + cache_cap: usize, + ) -> std::result::Result { Ok(match options { SubEmbedderOptions::HuggingFace(options) => { - Self::HuggingFace(hf::Embedder::new(options)?) + Self::HuggingFace(hf::Embedder::new(options, cache_cap)?) + } + SubEmbedderOptions::OpenAi(options) => { + Self::OpenAi(openai::Embedder::new(options, cache_cap)?) + } + SubEmbedderOptions::Ollama(options) => { + Self::Ollama(ollama::Embedder::new(options, cache_cap)?) } - SubEmbedderOptions::OpenAi(options) => Self::OpenAi(openai::Embedder::new(options)?), - SubEmbedderOptions::Ollama(options) => Self::Ollama(ollama::Embedder::new(options)?), SubEmbedderOptions::UserProvided(options) => { Self::UserProvided(manual::Embedder::new(options)) } - SubEmbedderOptions::Rest(options) => { - Self::Rest(rest::Embedder::new(options, rest::ConfigurationSource::User)?) - } + SubEmbedderOptions::Rest(options) => Self::Rest(rest::Embedder::new( + options, + cache_cap, + rest::ConfigurationSource::User, + )?), }) } diff --git a/crates/milli/src/vector/hf.rs b/crates/milli/src/vector/hf.rs index ce7429d36..1e5c7bd1c 100644 --- a/crates/milli/src/vector/hf.rs +++ b/crates/milli/src/vector/hf.rs @@ -150,7 +150,10 @@ impl From for Pooling { } impl Embedder { - pub fn new(options: EmbedderOptions) -> std::result::Result { + pub fn new( + options: EmbedderOptions, + cache_cap: usize, + ) -> std::result::Result { let device = match candle_core::Device::cuda_if_available(0) { Ok(device) => device, Err(error) => { @@ -252,7 +255,7 @@ impl Embedder { options, dimensions: 0, pooling, - cache: EmbeddingCache::new(super::CACHE_CAP), + cache: EmbeddingCache::new(cache_cap), }; let embeddings = this diff --git a/crates/milli/src/vector/mod.rs b/crates/milli/src/vector/mod.rs index 476ba28c9..3f85f636c 100644 --- a/crates/milli/src/vector/mod.rs +++ b/crates/milli/src/vector/mod.rs @@ -560,8 +560,8 @@ struct EmbeddingCache { impl EmbeddingCache { const MAX_TEXT_LEN: usize = 2000; - pub fn new(cap: u16) -> Self { - let data = NonZeroUsize::new(cap.into()).map(lru::LruCache::new).map(Mutex::new); + pub fn new(cap: usize) -> Self { + let data = NonZeroUsize::new(cap).map(lru::LruCache::new).map(Mutex::new); Self { data } } @@ -584,14 +584,14 @@ impl EmbeddingCache { if text.len() > Self::MAX_TEXT_LEN { return; } + tracing::trace!(text, "embedding added to cache"); + let mut cache = data.lock().unwrap(); cache.put(text, embedding); } } -pub const CACHE_CAP: u16 = 150; - /// Configuration for an embedder. #[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)] pub struct EmbeddingConfig { @@ -670,19 +670,30 @@ impl Default for EmbedderOptions { impl Embedder { /// Spawns a new embedder built from its options. - pub fn new(options: EmbedderOptions) -> std::result::Result { + pub fn new( + options: EmbedderOptions, + cache_cap: usize, + ) -> std::result::Result { Ok(match options { - EmbedderOptions::HuggingFace(options) => Self::HuggingFace(hf::Embedder::new(options)?), - EmbedderOptions::OpenAi(options) => Self::OpenAi(openai::Embedder::new(options)?), - EmbedderOptions::Ollama(options) => Self::Ollama(ollama::Embedder::new(options)?), + EmbedderOptions::HuggingFace(options) => { + Self::HuggingFace(hf::Embedder::new(options, cache_cap)?) + } + EmbedderOptions::OpenAi(options) => { + Self::OpenAi(openai::Embedder::new(options, cache_cap)?) + } + EmbedderOptions::Ollama(options) => { + Self::Ollama(ollama::Embedder::new(options, cache_cap)?) + } EmbedderOptions::UserProvided(options) => { Self::UserProvided(manual::Embedder::new(options)) } - EmbedderOptions::Rest(options) => { - Self::Rest(rest::Embedder::new(options, rest::ConfigurationSource::User)?) - } + EmbedderOptions::Rest(options) => Self::Rest(rest::Embedder::new( + options, + cache_cap, + rest::ConfigurationSource::User, + )?), EmbedderOptions::Composite(options) => { - Self::Composite(composite::Embedder::new(options)?) + Self::Composite(composite::Embedder::new(options, cache_cap)?) } }) } @@ -718,7 +729,6 @@ impl Embedder { }?; if let Some(cache) = self.cache() { - tracing::trace!(text, "embedding added to cache"); cache.put(text.to_owned(), embedding.clone()); } diff --git a/crates/milli/src/vector/ollama.rs b/crates/milli/src/vector/ollama.rs index 57c71538e..8beae6205 100644 --- a/crates/milli/src/vector/ollama.rs +++ b/crates/milli/src/vector/ollama.rs @@ -75,9 +75,10 @@ impl EmbedderOptions { } impl Embedder { - pub fn new(options: EmbedderOptions) -> Result { + pub fn new(options: EmbedderOptions, cache_cap: usize) -> Result { let rest_embedder = match RestEmbedder::new( options.into_rest_embedder_config()?, + cache_cap, super::rest::ConfigurationSource::Ollama, ) { Ok(embedder) => embedder, diff --git a/crates/milli/src/vector/openai.rs b/crates/milli/src/vector/openai.rs index 66680adb0..df29f6916 100644 --- a/crates/milli/src/vector/openai.rs +++ b/crates/milli/src/vector/openai.rs @@ -176,7 +176,7 @@ pub struct Embedder { } impl Embedder { - pub fn new(options: EmbedderOptions) -> Result { + pub fn new(options: EmbedderOptions, cache_cap: usize) -> Result { let mut inferred_api_key = Default::default(); let api_key = options.api_key.as_ref().unwrap_or_else(|| { inferred_api_key = infer_api_key(); @@ -201,6 +201,7 @@ impl Embedder { }), headers: Default::default(), }, + cache_cap, super::rest::ConfigurationSource::OpenAi, )?; diff --git a/crates/milli/src/vector/rest.rs b/crates/milli/src/vector/rest.rs index 9761c753e..b87ac9f77 100644 --- a/crates/milli/src/vector/rest.rs +++ b/crates/milli/src/vector/rest.rs @@ -10,8 +10,7 @@ use serde::{Deserialize, Serialize}; use super::error::EmbedErrorKind; use super::json_template::ValueTemplate; use super::{ - DistributionShift, EmbedError, Embedding, EmbeddingCache, NewEmbedderError, CACHE_CAP, - REQUEST_PARALLELISM, + DistributionShift, EmbedError, Embedding, EmbeddingCache, NewEmbedderError, REQUEST_PARALLELISM, }; use crate::error::FaultSource; use crate::ThreadPoolNoAbort; @@ -127,6 +126,7 @@ enum InputType { impl Embedder { pub fn new( options: EmbedderOptions, + cache_cap: usize, configuration_source: ConfigurationSource, ) -> Result { let bearer = options.api_key.as_deref().map(|api_key| format!("Bearer {api_key}")); @@ -160,7 +160,7 @@ impl Embedder { data, dimensions, distribution: options.distribution, - cache: EmbeddingCache::new(CACHE_CAP), + cache: EmbeddingCache::new(cache_cap), }) }