Disable the cache by default and make it experimental

This commit is contained in:
Louis Dureuil 2025-03-13 14:54:31 +01:00
parent 1876132172
commit e2d372823a
No known key found for this signature in database
14 changed files with 101 additions and 34 deletions

View File

@ -125,6 +125,10 @@ pub struct IndexSchedulerOptions {
pub instance_features: InstanceTogglableFeatures, pub instance_features: InstanceTogglableFeatures,
/// The experimental features enabled for this instance. /// The experimental features enabled for this instance.
pub auto_upgrade: bool, pub auto_upgrade: bool,
/// The maximal number of entries in the search query cache of an embedder.
///
/// 0 disables the cache.
pub embedding_cache_cap: usize,
} }
/// Structure which holds meilisearch's indexes and schedules the tasks /// Structure which holds meilisearch's indexes and schedules the tasks
@ -156,6 +160,11 @@ pub struct IndexScheduler {
/// The Authorization header to send to the webhook URL. /// The Authorization header to send to the webhook URL.
pub(crate) webhook_authorization_header: Option<String>, pub(crate) webhook_authorization_header: Option<String>,
/// A map to retrieve the runtime representation of an embedder depending on its configuration.
///
/// This map may return the same embedder object for two different indexes or embedder settings,
/// but it will only do this if the embedder configuration options are the same, leading
/// to the same embeddings for the same input text.
embedders: Arc<RwLock<HashMap<EmbedderOptions, Arc<Embedder>>>>, embedders: Arc<RwLock<HashMap<EmbedderOptions, Arc<Embedder>>>>,
// ================= test // ================= test
@ -818,7 +827,7 @@ impl IndexScheduler {
// add missing embedder // add missing embedder
let embedder = Arc::new( let embedder = Arc::new(
Embedder::new(embedder_options.clone()) Embedder::new(embedder_options.clone(), self.scheduler.embedding_cache_cap)
.map_err(meilisearch_types::milli::vector::Error::from) .map_err(meilisearch_types::milli::vector::Error::from)
.map_err(|err| { .map_err(|err| {
Error::from_milli(err.into(), Some(index_uid.clone())) Error::from_milli(err.into(), Some(index_uid.clone()))

View File

@ -76,6 +76,11 @@ pub struct Scheduler {
/// The path to the version file of Meilisearch. /// The path to the version file of Meilisearch.
pub(crate) version_file_path: PathBuf, pub(crate) version_file_path: PathBuf,
/// The maximal number of entries in the search query cache of an embedder.
///
/// 0 disables the cache.
pub(crate) embedding_cache_cap: usize,
} }
impl Scheduler { impl Scheduler {
@ -90,6 +95,7 @@ impl Scheduler {
snapshots_path: self.snapshots_path.clone(), snapshots_path: self.snapshots_path.clone(),
auth_env: self.auth_env.clone(), auth_env: self.auth_env.clone(),
version_file_path: self.version_file_path.clone(), version_file_path: self.version_file_path.clone(),
embedding_cache_cap: self.embedding_cache_cap,
} }
} }
@ -105,6 +111,7 @@ impl Scheduler {
snapshots_path: options.snapshots_path.clone(), snapshots_path: options.snapshots_path.clone(),
auth_env, auth_env,
version_file_path: options.version_file_path.clone(), version_file_path: options.version_file_path.clone(),
embedding_cache_cap: options.embedding_cache_cap,
} }
} }
} }

View File

@ -112,6 +112,7 @@ impl IndexScheduler {
batched_tasks_size_limit: u64::MAX, batched_tasks_size_limit: u64::MAX,
instance_features: Default::default(), instance_features: Default::default(),
auto_upgrade: true, // Don't cost much and will ensure the happy path works auto_upgrade: true, // Don't cost much and will ensure the happy path works
embedding_cache_cap: 10,
}; };
let version = configuration(&mut options).unwrap_or_else(|| { let version = configuration(&mut options).unwrap_or_else(|| {
( (

View File

@ -199,6 +199,7 @@ struct Infos {
experimental_network: bool, experimental_network: bool,
experimental_get_task_documents_route: bool, experimental_get_task_documents_route: bool,
experimental_composite_embedders: bool, experimental_composite_embedders: bool,
experimental_embedding_cache_entries: usize,
gpu_enabled: bool, gpu_enabled: bool,
db_path: bool, db_path: bool,
import_dump: bool, import_dump: bool,
@ -246,6 +247,7 @@ impl Infos {
experimental_reduce_indexing_memory_usage, experimental_reduce_indexing_memory_usage,
experimental_max_number_of_batched_tasks, experimental_max_number_of_batched_tasks,
experimental_limit_batched_tasks_total_size, experimental_limit_batched_tasks_total_size,
experimental_embedding_cache_entries,
http_addr, http_addr,
master_key: _, master_key: _,
env, env,
@ -312,6 +314,7 @@ impl Infos {
experimental_network: network, experimental_network: network,
experimental_get_task_documents_route: get_task_documents_route, experimental_get_task_documents_route: get_task_documents_route,
experimental_composite_embedders: composite_embedders, experimental_composite_embedders: composite_embedders,
experimental_embedding_cache_entries,
gpu_enabled: meilisearch_types::milli::vector::is_cuda_enabled(), gpu_enabled: meilisearch_types::milli::vector::is_cuda_enabled(),
db_path: db_path != PathBuf::from("./data.ms"), db_path: db_path != PathBuf::from("./data.ms"),
import_dump: import_dump.is_some(), import_dump: import_dump.is_some(),

View File

@ -233,6 +233,7 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<(Arc<IndexScheduler>, Arc<
index_count: DEFAULT_INDEX_COUNT, index_count: DEFAULT_INDEX_COUNT,
instance_features: opt.to_instance_features(), instance_features: opt.to_instance_features(),
auto_upgrade: opt.experimental_dumpless_upgrade, auto_upgrade: opt.experimental_dumpless_upgrade,
embedding_cache_cap: opt.experimental_embedding_cache_entries,
}; };
let bin_major: u32 = VERSION_MAJOR.parse().unwrap(); let bin_major: u32 = VERSION_MAJOR.parse().unwrap();
let bin_minor: u32 = VERSION_MINOR.parse().unwrap(); let bin_minor: u32 = VERSION_MINOR.parse().unwrap();

View File

@ -63,7 +63,8 @@ const MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS: &str =
"MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS"; "MEILI_EXPERIMENTAL_MAX_NUMBER_OF_BATCHED_TASKS";
const MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE: &str = const MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE: &str =
"MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_SIZE"; "MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_SIZE";
const MEILI_EXPERIMENTAL_EMBEDDING_CACHE_ENTRIES: &str =
"MEILI_EXPERIMENTAL_EMBEDDING_CACHE_ENTRIES";
const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml"; const DEFAULT_CONFIG_FILE_PATH: &str = "./config.toml";
const DEFAULT_DB_PATH: &str = "./data.ms"; const DEFAULT_DB_PATH: &str = "./data.ms";
const DEFAULT_HTTP_ADDR: &str = "localhost:7700"; const DEFAULT_HTTP_ADDR: &str = "localhost:7700";
@ -446,6 +447,14 @@ pub struct Opt {
#[serde(default = "default_limit_batched_tasks_total_size")] #[serde(default = "default_limit_batched_tasks_total_size")]
pub experimental_limit_batched_tasks_total_size: u64, pub experimental_limit_batched_tasks_total_size: u64,
/// Enables experimental caching of search query embeddings. The value represents the maximal number of entries in the cache of each
/// distinct embedder.
///
/// For more information, see <https://github.com/orgs/meilisearch/discussions/818>.
#[clap(long, env = MEILI_EXPERIMENTAL_EMBEDDING_CACHE_ENTRIES, default_value_t = default_embedding_cache_entries())]
#[serde(default = "default_embedding_cache_entries")]
pub experimental_embedding_cache_entries: usize,
#[serde(flatten)] #[serde(flatten)]
#[clap(flatten)] #[clap(flatten)]
pub indexer_options: IndexerOpts, pub indexer_options: IndexerOpts,
@ -549,6 +558,7 @@ impl Opt {
experimental_reduce_indexing_memory_usage, experimental_reduce_indexing_memory_usage,
experimental_max_number_of_batched_tasks, experimental_max_number_of_batched_tasks,
experimental_limit_batched_tasks_total_size, experimental_limit_batched_tasks_total_size,
experimental_embedding_cache_entries,
} = self; } = self;
export_to_env_if_not_present(MEILI_DB_PATH, db_path); export_to_env_if_not_present(MEILI_DB_PATH, db_path);
export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr); export_to_env_if_not_present(MEILI_HTTP_ADDR, http_addr);
@ -641,6 +651,10 @@ impl Opt {
MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE, MEILI_EXPERIMENTAL_LIMIT_BATCHED_TASKS_TOTAL_SIZE,
experimental_limit_batched_tasks_total_size.to_string(), experimental_limit_batched_tasks_total_size.to_string(),
); );
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_EMBEDDING_CACHE_ENTRIES,
experimental_embedding_cache_entries.to_string(),
);
indexer_options.export_to_env(); indexer_options.export_to_env();
} }
@ -948,6 +962,10 @@ fn default_limit_batched_tasks_total_size() -> u64 {
u64::MAX u64::MAX
} }
fn default_embedding_cache_entries() -> usize {
0
}
fn default_snapshot_dir() -> PathBuf { fn default_snapshot_dir() -> PathBuf {
PathBuf::from(DEFAULT_SNAPSHOT_DIR) PathBuf::from(DEFAULT_SNAPSHOT_DIR)
} }

View File

@ -2806,8 +2806,9 @@ mod tests {
embedding_configs.pop().unwrap(); embedding_configs.pop().unwrap();
insta::assert_snapshot!(embedder_name, @"manual"); insta::assert_snapshot!(embedder_name, @"manual");
insta::assert_debug_snapshot!(user_provided, @"RoaringBitmap<[0, 1, 2]>"); insta::assert_debug_snapshot!(user_provided, @"RoaringBitmap<[0, 1, 2]>");
let embedder = let embedder = std::sync::Arc::new(
std::sync::Arc::new(crate::vector::Embedder::new(embedder.embedder_options).unwrap()); crate::vector::Embedder::new(embedder.embedder_options, 0).unwrap(),
);
let res = index let res = index
.search(&rtxn) .search(&rtxn)
.semantic(embedder_name, embedder, false, Some([0.0, 1.0, 2.0].to_vec())) .semantic(embedder_name, embedder, false, Some([0.0, 1.0, 2.0].to_vec()))

View File

@ -1628,7 +1628,8 @@ fn embedders(embedding_configs: Vec<IndexEmbeddingConfig>) -> Result<EmbeddingCo
let prompt = Arc::new(prompt.try_into().map_err(crate::Error::from)?); let prompt = Arc::new(prompt.try_into().map_err(crate::Error::from)?);
let embedder = Arc::new( let embedder = Arc::new(
Embedder::new(embedder_options.clone()) // cache_cap: no cache needed for indexing purposes
Embedder::new(embedder_options.clone(), 0)
.map_err(crate::vector::Error::from) .map_err(crate::vector::Error::from)
.map_err(crate::Error::from)?, .map_err(crate::Error::from)?,
); );

View File

@ -59,9 +59,11 @@ pub struct EmbedderOptions {
impl Embedder { impl Embedder {
pub fn new( pub fn new(
EmbedderOptions { search, index }: EmbedderOptions, EmbedderOptions { search, index }: EmbedderOptions,
cache_cap: usize,
) -> Result<Self, NewEmbedderError> { ) -> Result<Self, NewEmbedderError> {
let search = SubEmbedder::new(search)?; let search = SubEmbedder::new(search, cache_cap)?;
let index = SubEmbedder::new(index)?; // cache is only used at search
let index = SubEmbedder::new(index, 0)?;
// check dimensions // check dimensions
if search.dimensions() != index.dimensions() { if search.dimensions() != index.dimensions() {
@ -119,19 +121,28 @@ impl Embedder {
} }
impl SubEmbedder { impl SubEmbedder {
pub fn new(options: SubEmbedderOptions) -> std::result::Result<Self, NewEmbedderError> { pub fn new(
options: SubEmbedderOptions,
cache_cap: usize,
) -> std::result::Result<Self, NewEmbedderError> {
Ok(match options { Ok(match options {
SubEmbedderOptions::HuggingFace(options) => { SubEmbedderOptions::HuggingFace(options) => {
Self::HuggingFace(hf::Embedder::new(options)?) Self::HuggingFace(hf::Embedder::new(options, cache_cap)?)
}
SubEmbedderOptions::OpenAi(options) => {
Self::OpenAi(openai::Embedder::new(options, cache_cap)?)
}
SubEmbedderOptions::Ollama(options) => {
Self::Ollama(ollama::Embedder::new(options, cache_cap)?)
} }
SubEmbedderOptions::OpenAi(options) => Self::OpenAi(openai::Embedder::new(options)?),
SubEmbedderOptions::Ollama(options) => Self::Ollama(ollama::Embedder::new(options)?),
SubEmbedderOptions::UserProvided(options) => { SubEmbedderOptions::UserProvided(options) => {
Self::UserProvided(manual::Embedder::new(options)) Self::UserProvided(manual::Embedder::new(options))
} }
SubEmbedderOptions::Rest(options) => { SubEmbedderOptions::Rest(options) => Self::Rest(rest::Embedder::new(
Self::Rest(rest::Embedder::new(options, rest::ConfigurationSource::User)?) options,
} cache_cap,
rest::ConfigurationSource::User,
)?),
}) })
} }

View File

@ -150,7 +150,10 @@ impl From<PoolingConfig> for Pooling {
} }
impl Embedder { impl Embedder {
pub fn new(options: EmbedderOptions) -> std::result::Result<Self, NewEmbedderError> { pub fn new(
options: EmbedderOptions,
cache_cap: usize,
) -> std::result::Result<Self, NewEmbedderError> {
let device = match candle_core::Device::cuda_if_available(0) { let device = match candle_core::Device::cuda_if_available(0) {
Ok(device) => device, Ok(device) => device,
Err(error) => { Err(error) => {
@ -252,7 +255,7 @@ impl Embedder {
options, options,
dimensions: 0, dimensions: 0,
pooling, pooling,
cache: EmbeddingCache::new(super::CACHE_CAP), cache: EmbeddingCache::new(cache_cap),
}; };
let embeddings = this let embeddings = this

View File

@ -560,8 +560,8 @@ struct EmbeddingCache {
impl EmbeddingCache { impl EmbeddingCache {
const MAX_TEXT_LEN: usize = 2000; const MAX_TEXT_LEN: usize = 2000;
pub fn new(cap: u16) -> Self { pub fn new(cap: usize) -> Self {
let data = NonZeroUsize::new(cap.into()).map(lru::LruCache::new).map(Mutex::new); let data = NonZeroUsize::new(cap).map(lru::LruCache::new).map(Mutex::new);
Self { data } Self { data }
} }
@ -584,14 +584,14 @@ impl EmbeddingCache {
if text.len() > Self::MAX_TEXT_LEN { if text.len() > Self::MAX_TEXT_LEN {
return; return;
} }
tracing::trace!(text, "embedding added to cache");
let mut cache = data.lock().unwrap(); let mut cache = data.lock().unwrap();
cache.put(text, embedding); cache.put(text, embedding);
} }
} }
pub const CACHE_CAP: u16 = 150;
/// Configuration for an embedder. /// Configuration for an embedder.
#[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)] #[derive(Debug, Clone, Default, serde::Deserialize, serde::Serialize)]
pub struct EmbeddingConfig { pub struct EmbeddingConfig {
@ -670,19 +670,30 @@ impl Default for EmbedderOptions {
impl Embedder { impl Embedder {
/// Spawns a new embedder built from its options. /// Spawns a new embedder built from its options.
pub fn new(options: EmbedderOptions) -> std::result::Result<Self, NewEmbedderError> { pub fn new(
options: EmbedderOptions,
cache_cap: usize,
) -> std::result::Result<Self, NewEmbedderError> {
Ok(match options { Ok(match options {
EmbedderOptions::HuggingFace(options) => Self::HuggingFace(hf::Embedder::new(options)?), EmbedderOptions::HuggingFace(options) => {
EmbedderOptions::OpenAi(options) => Self::OpenAi(openai::Embedder::new(options)?), Self::HuggingFace(hf::Embedder::new(options, cache_cap)?)
EmbedderOptions::Ollama(options) => Self::Ollama(ollama::Embedder::new(options)?), }
EmbedderOptions::OpenAi(options) => {
Self::OpenAi(openai::Embedder::new(options, cache_cap)?)
}
EmbedderOptions::Ollama(options) => {
Self::Ollama(ollama::Embedder::new(options, cache_cap)?)
}
EmbedderOptions::UserProvided(options) => { EmbedderOptions::UserProvided(options) => {
Self::UserProvided(manual::Embedder::new(options)) Self::UserProvided(manual::Embedder::new(options))
} }
EmbedderOptions::Rest(options) => { EmbedderOptions::Rest(options) => Self::Rest(rest::Embedder::new(
Self::Rest(rest::Embedder::new(options, rest::ConfigurationSource::User)?) options,
} cache_cap,
rest::ConfigurationSource::User,
)?),
EmbedderOptions::Composite(options) => { EmbedderOptions::Composite(options) => {
Self::Composite(composite::Embedder::new(options)?) Self::Composite(composite::Embedder::new(options, cache_cap)?)
} }
}) })
} }
@ -718,7 +729,6 @@ impl Embedder {
}?; }?;
if let Some(cache) = self.cache() { if let Some(cache) = self.cache() {
tracing::trace!(text, "embedding added to cache");
cache.put(text.to_owned(), embedding.clone()); cache.put(text.to_owned(), embedding.clone());
} }

View File

@ -75,9 +75,10 @@ impl EmbedderOptions {
} }
impl Embedder { impl Embedder {
pub fn new(options: EmbedderOptions) -> Result<Self, NewEmbedderError> { pub fn new(options: EmbedderOptions, cache_cap: usize) -> Result<Self, NewEmbedderError> {
let rest_embedder = match RestEmbedder::new( let rest_embedder = match RestEmbedder::new(
options.into_rest_embedder_config()?, options.into_rest_embedder_config()?,
cache_cap,
super::rest::ConfigurationSource::Ollama, super::rest::ConfigurationSource::Ollama,
) { ) {
Ok(embedder) => embedder, Ok(embedder) => embedder,

View File

@ -176,7 +176,7 @@ pub struct Embedder {
} }
impl Embedder { impl Embedder {
pub fn new(options: EmbedderOptions) -> Result<Self, NewEmbedderError> { pub fn new(options: EmbedderOptions, cache_cap: usize) -> Result<Self, NewEmbedderError> {
let mut inferred_api_key = Default::default(); let mut inferred_api_key = Default::default();
let api_key = options.api_key.as_ref().unwrap_or_else(|| { let api_key = options.api_key.as_ref().unwrap_or_else(|| {
inferred_api_key = infer_api_key(); inferred_api_key = infer_api_key();
@ -201,6 +201,7 @@ impl Embedder {
}), }),
headers: Default::default(), headers: Default::default(),
}, },
cache_cap,
super::rest::ConfigurationSource::OpenAi, super::rest::ConfigurationSource::OpenAi,
)?; )?;

View File

@ -10,8 +10,7 @@ use serde::{Deserialize, Serialize};
use super::error::EmbedErrorKind; use super::error::EmbedErrorKind;
use super::json_template::ValueTemplate; use super::json_template::ValueTemplate;
use super::{ use super::{
DistributionShift, EmbedError, Embedding, EmbeddingCache, NewEmbedderError, CACHE_CAP, DistributionShift, EmbedError, Embedding, EmbeddingCache, NewEmbedderError, REQUEST_PARALLELISM,
REQUEST_PARALLELISM,
}; };
use crate::error::FaultSource; use crate::error::FaultSource;
use crate::ThreadPoolNoAbort; use crate::ThreadPoolNoAbort;
@ -127,6 +126,7 @@ enum InputType {
impl Embedder { impl Embedder {
pub fn new( pub fn new(
options: EmbedderOptions, options: EmbedderOptions,
cache_cap: usize,
configuration_source: ConfigurationSource, configuration_source: ConfigurationSource,
) -> Result<Self, NewEmbedderError> { ) -> Result<Self, NewEmbedderError> {
let bearer = options.api_key.as_deref().map(|api_key| format!("Bearer {api_key}")); let bearer = options.api_key.as_deref().map(|api_key| format!("Bearer {api_key}"));
@ -160,7 +160,7 @@ impl Embedder {
data, data,
dimensions, dimensions,
distribution: options.distribution, distribution: options.distribution,
cache: EmbeddingCache::new(CACHE_CAP), cache: EmbeddingCache::new(cache_cap),
}) })
} }