diff --git a/Cargo.lock b/Cargo.lock index 0ceb2c780..0afbdf5ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -47,7 +47,7 @@ dependencies = [ "actix-utils", "ahash 0.8.11", "base64 0.22.1", - "bitflags 2.6.0", + "bitflags 2.9.0", "brotli", "bytes", "bytestring", @@ -393,41 +393,23 @@ checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" [[package]] name = "arroy" -version = "0.5.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc5f272f38fa063bbff0a7ab5219404e221493de005e2b4078c62d626ef567e" +checksum = "4a885313dfac15b64fd61a39d1970a2befa076c69a763434117c5b6163f9fecb" dependencies = [ "bytemuck", "byteorder", "heed", - "log", "memmap2", "nohash", "ordered-float", + "page_size", "rand", "rayon", "roaring", "tempfile", - "thiserror 1.0.69", -] - -[[package]] -name = "arroy" -version = "0.5.0" -source = "git+https://github.com/meilisearch/arroy/?tag=DO-NOT-DELETE-upgrade-v04-to-v05#053807bf38dc079f25b003f19fc30fbf3613f6e7" -dependencies = [ - "bytemuck", - "byteorder", - "heed", - "log", - "memmap2", - "nohash", - "ordered-float", - "rand", - "rayon", - "roaring", - "tempfile", - "thiserror 1.0.69", + "thiserror 2.0.9", + "tracing", ] [[package]] @@ -553,7 +535,7 @@ version = "0.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f49d8fed880d473ea71efb9bf597651e77201bdd4893efe54c9e5d65ae04ce6f" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "cexpr", "clang-sys", "itertools 0.13.0", @@ -599,9 +581,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.6.0" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +checksum = "5c8214115b7bf84099f1309324e63141d4c5d7cc26862f97a0a857dbefe165bd" dependencies = [ "serde", ] @@ -2082,7 +2064,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ce20bbb48248608ba4908b45fe36e17e40f56f8c6bb385ecf5d3c4a1e8b05a22" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "debugid", "fxhash", "serde", @@ -2249,7 +2231,7 @@ version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b903b73e45dc0c6c596f2d37eccece7c1c8bb6e4407b001096387c63d0d93724" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "libc", "libgit2-sys", "log", @@ -2397,11 +2379,11 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "heed" -version = "0.20.5" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d4f449bab7320c56003d37732a917e18798e2f1709d80263face2b4f9436ddb" +checksum = "6a56c94661ddfb51aa9cdfbf102cfcc340aa69267f95ebccc4af08d7c530d393" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "byteorder", "heed-traits", "heed-types", @@ -2421,9 +2403,9 @@ checksum = "eb3130048d404c57ce5a1ac61a903696e8fcde7e8c2991e9fcfc1f27c3ef74ff" [[package]] name = "heed-types" -version = "0.20.1" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d3f528b053a6d700b2734eabcd0fd49cb8230647aa72958467527b0b7917114" +checksum = "13c255bdf46e07fb840d120a36dcc81f385140d7191c76a7391672675c01a55d" dependencies = [ "bincode", "byteorder", @@ -2746,7 +2728,6 @@ name = "index-scheduler" version = "1.13.3" dependencies = [ "anyhow", - "arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "big_s", "bincode", "bumpalo", @@ -3013,9 +2994,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.169" +version = "0.2.171" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5aba8db14291edd000dfcc4d620c7ebfb122c613afb886ca8803fa4e128a20a" +checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" [[package]] name = "libgit2-sys" @@ -3468,9 +3449,9 @@ checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104" [[package]] name = "lmdb-master-sys" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "472c3760e2a8d0f61f322fb36788021bb36d573c502b50fa3e2bcaac3ec326c9" +checksum = "864808e0b19fb6dd3b70ba94ee671b82fce17554cf80aeb0a155c65bb08027df" dependencies = [ "cc", "doxygen-rs", @@ -3513,9 +3494,9 @@ checksum = "9374ef4228402d4b7e403e5838cb880d9ee663314b0a900d5a6aabf0c213552e" [[package]] name = "log" -version = "0.4.21" +version = "0.4.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e" [[package]] name = "lzma-rs" @@ -3726,7 +3707,6 @@ name = "meilitool" version = "1.13.3" dependencies = [ "anyhow", - "arroy 0.5.0 (git+https://github.com/meilisearch/arroy/?tag=DO-NOT-DELETE-upgrade-v04-to-v05)", "clap", "dump", "file-store", @@ -3761,7 +3741,7 @@ name = "milli" version = "1.13.3" dependencies = [ "allocator-api2", - "arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "arroy", "bbqueue", "big_s", "bimap", @@ -4129,9 +4109,9 @@ checksum = "ae4512a8f418ac322335255a72361b9ac927e106f4d7fe6ab4d8ac59cb01f7a9" [[package]] name = "once_cell" -version = "1.20.2" +version = "1.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +checksum = "cde51589ab56b20a6f686b2c68f7a0bd6add753d697abf720d63f8db3ab7b1ad" [[package]] name = "onig" @@ -4518,7 +4498,7 @@ version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "731e0d9356b0c25f16f33b5be79b1c57b562f141ebfcdb0ad8ac2c13a24293b4" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "hex", "lazy_static", "procfs-core", @@ -4531,7 +4511,7 @@ version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d3554923a69f4ce04c4a754260c338f505ce22642d3830e049a399fc2059a29" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "hex", ] @@ -4872,7 +4852,7 @@ version = "1.20.0" source = "git+https://github.com/rhaiscript/rhai?rev=ef3df63121d27aacd838f366f2b83fd65f20a1e4#ef3df63121d27aacd838f366f2b83fd65f20a1e4" dependencies = [ "ahash 0.8.11", - "bitflags 2.6.0", + "bitflags 2.9.0", "instant", "num-traits", "once_cell", @@ -5008,7 +4988,7 @@ version = "0.38.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7f649912bc1495e167a6edee79151c84b1bad49748cb4f1f1167f459f6224f6" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "errno", "libc", "linux-raw-sys", @@ -5130,9 +5110,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.217" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" dependencies = [ "serde_derive", ] @@ -5148,9 +5128,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.217" +version = "1.0.219" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" dependencies = [ "proc-macro2", "quote", @@ -5159,9 +5139,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.138" +version = "1.0.140" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d434192e7da787e94a6ea7e9670b26a036d0ca41e0b7efb2676dd32bae872949" +checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" dependencies = [ "indexmap", "itoa", @@ -5529,7 +5509,7 @@ version = "0.5.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec7dddc5f0fee506baf8b9fdb989e242f17e4b11c61dfbb0635b705217199eea" dependencies = [ - "bitflags 2.6.0", + "bitflags 2.9.0", "byteorder", "enum-as-inner", "libc", diff --git a/Cargo.toml b/Cargo.toml index 0a16810af..320ecfa57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,12 @@ license = "MIT" [profile.release] codegen-units = 1 +# We now compile heed without the NDEBUG define for better performance. +# However, we still enable debug assertions for a better detection of +# disk corruption on the cloud or in OSS. +[profile.release.package.heed] +debug-assertions = true + [profile.dev.package.flate2] opt-level = 3 diff --git a/crates/benchmarks/benches/indexing.rs b/crates/benchmarks/benches/indexing.rs index 9938fca26..9199c3877 100644 --- a/crates/benchmarks/benches/indexing.rs +++ b/crates/benchmarks/benches/indexing.rs @@ -35,7 +35,8 @@ fn setup_dir(path: impl AsRef) { fn setup_index() -> Index { let path = "benches.mmdb"; setup_dir(path); - let mut options = EnvOpenOptions::new(); + let options = EnvOpenOptions::new(); + let mut options = options.read_txn_without_tls(); options.map_size(100 * 1024 * 1024 * 1024); // 100 GB options.max_readers(100); Index::new(options, path, true).unwrap() diff --git a/crates/benchmarks/benches/utils.rs b/crates/benchmarks/benches/utils.rs index 5baeca869..aaa2d50a0 100644 --- a/crates/benchmarks/benches/utils.rs +++ b/crates/benchmarks/benches/utils.rs @@ -65,7 +65,8 @@ pub fn base_setup(conf: &Conf) -> Index { } create_dir_all(conf.database_name).unwrap(); - let mut options = EnvOpenOptions::new(); + let options = EnvOpenOptions::new(); + let mut options = options.read_txn_without_tls(); options.map_size(100 * 1024 * 1024 * 1024); // 100 GB options.max_readers(100); let index = Index::new(options, conf.database_name, true).unwrap(); diff --git a/crates/fuzzers/src/bin/fuzz-indexing.rs b/crates/fuzzers/src/bin/fuzz-indexing.rs index e26303010..4df989b51 100644 --- a/crates/fuzzers/src/bin/fuzz-indexing.rs +++ b/crates/fuzzers/src/bin/fuzz-indexing.rs @@ -57,7 +57,8 @@ fn main() { let opt = opt.clone(); let handle = std::thread::spawn(move || { - let mut options = EnvOpenOptions::new(); + let options = EnvOpenOptions::new(); + let mut options = options.read_txn_without_tls(); options.map_size(1024 * 1024 * 1024 * 1024); let tempdir = match opt.path { Some(path) => TempDir::new_in(path).unwrap(), diff --git a/crates/index-scheduler/Cargo.toml b/crates/index-scheduler/Cargo.toml index 881460d86..37b3ea835 100644 --- a/crates/index-scheduler/Cargo.toml +++ b/crates/index-scheduler/Cargo.toml @@ -44,7 +44,6 @@ ureq = "2.12.1" uuid = { version = "1.11.0", features = ["serde", "v4"] } [dev-dependencies] -arroy = "0.5.0" big_s = "1.0.2" crossbeam-channel = "0.5.14" # fixed version due to format breakages in v1.40 diff --git a/crates/index-scheduler/src/features.rs b/crates/index-scheduler/src/features.rs index b52b194e6..109e6b867 100644 --- a/crates/index-scheduler/src/features.rs +++ b/crates/index-scheduler/src/features.rs @@ -2,7 +2,7 @@ use std::sync::{Arc, RwLock}; use meilisearch_types::features::{InstanceTogglableFeatures, Network, RuntimeTogglableFeatures}; use meilisearch_types::heed::types::{SerdeJson, Str}; -use meilisearch_types::heed::{Database, Env, RwTxn}; +use meilisearch_types::heed::{Database, Env, RwTxn, WithoutTls}; use crate::error::FeatureNotEnabledError; use crate::Result; @@ -139,7 +139,7 @@ impl FeatureData { } pub fn new( - env: &Env, + env: &Env, wtxn: &mut RwTxn, instance_features: InstanceTogglableFeatures, ) -> Result { diff --git a/crates/index-scheduler/src/index_mapper/index_map.rs b/crates/index-scheduler/src/index_mapper/index_map.rs index e4eb9bfb8..acb95f249 100644 --- a/crates/index-scheduler/src/index_mapper/index_map.rs +++ b/crates/index-scheduler/src/index_mapper/index_map.rs @@ -304,7 +304,8 @@ fn create_or_open_index( map_size: usize, creation: bool, ) -> Result { - let mut options = EnvOpenOptions::new(); + let options = EnvOpenOptions::new(); + let mut options = options.read_txn_without_tls(); options.map_size(clamp_to_page_size(map_size)); // You can find more details about this experimental @@ -333,7 +334,7 @@ fn create_or_open_index( #[cfg(test)] mod tests { - use meilisearch_types::heed::Env; + use meilisearch_types::heed::{Env, WithoutTls}; use meilisearch_types::Index; use uuid::Uuid; @@ -343,7 +344,7 @@ mod tests { use crate::IndexScheduler; impl IndexMapper { - fn test() -> (Self, Env, IndexSchedulerHandle) { + fn test() -> (Self, Env, IndexSchedulerHandle) { let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); (index_scheduler.index_mapper, index_scheduler.env, handle) } diff --git a/crates/index-scheduler/src/index_mapper/mod.rs b/crates/index-scheduler/src/index_mapper/mod.rs index 32cfa94ad..c1f6ff472 100644 --- a/crates/index-scheduler/src/index_mapper/mod.rs +++ b/crates/index-scheduler/src/index_mapper/mod.rs @@ -4,7 +4,7 @@ use std::time::Duration; use std::{fs, thread}; use meilisearch_types::heed::types::{SerdeJson, Str}; -use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn}; +use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls}; use meilisearch_types::milli; use meilisearch_types::milli::database_stats::DatabaseStats; use meilisearch_types::milli::update::IndexerConfig; @@ -164,7 +164,7 @@ impl IndexMapper { } pub fn new( - env: &Env, + env: &Env, wtxn: &mut RwTxn, options: &IndexSchedulerOptions, budget: IndexBudget, diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index 3b61b5dc4..70b280301 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -54,7 +54,7 @@ use meilisearch_types::batches::Batch; use meilisearch_types::features::{InstanceTogglableFeatures, Network, RuntimeTogglableFeatures}; use meilisearch_types::heed::byteorder::BE; use meilisearch_types::heed::types::I128; -use meilisearch_types::heed::{self, Env, RoTxn}; +use meilisearch_types::heed::{self, Env, RoTxn, WithoutTls}; use meilisearch_types::milli::index::IndexEmbeddingConfig; use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs}; @@ -131,7 +131,7 @@ pub struct IndexSchedulerOptions { /// to be performed on them. pub struct IndexScheduler { /// The LMDB environment which the DBs are associated with. - pub(crate) env: Env, + pub(crate) env: Env, /// The list of tasks currently processing pub(crate) processing_tasks: Arc>, @@ -209,6 +209,7 @@ impl IndexScheduler { #[allow(private_interfaces)] // because test_utils is private pub fn new( options: IndexSchedulerOptions, + auth_env: Env, from_db_version: (u32, u32, u32), #[cfg(test)] test_breakpoint_sdr: crossbeam_channel::Sender<(test_utils::Breakpoint, bool)>, #[cfg(test)] planned_failures: Vec<(usize, test_utils::FailureLocation)>, @@ -240,7 +241,9 @@ impl IndexScheduler { }; let env = unsafe { - heed::EnvOpenOptions::new() + let env_options = heed::EnvOpenOptions::new(); + let mut env_options = env_options.read_txn_without_tls(); + env_options .max_dbs(Self::nb_db()) .map_size(budget.task_db_size) .open(&options.tasks_path) @@ -260,7 +263,7 @@ impl IndexScheduler { processing_tasks: Arc::new(RwLock::new(ProcessingTasks::new())), version, queue, - scheduler: Scheduler::new(&options), + scheduler: Scheduler::new(&options, auth_env), index_mapper, env, @@ -358,7 +361,7 @@ impl IndexScheduler { } } - pub fn read_txn(&self) -> Result { + pub fn read_txn(&self) -> Result> { self.env.read_txn().map_err(|e| e.into()) } @@ -427,12 +430,14 @@ impl IndexScheduler { /// If you need to fetch information from or perform an action on all indexes, /// see the `try_for_each_index` function. pub fn index(&self, name: &str) -> Result { - self.index_mapper.index(&self.env.read_txn()?, name) + let rtxn = self.env.read_txn()?; + self.index_mapper.index(&rtxn, name) } /// Return the boolean referring if index exists. pub fn index_exists(&self, name: &str) -> Result { - self.index_mapper.index_exists(&self.env.read_txn()?, name) + let rtxn = self.env.read_txn()?; + self.index_mapper.index_exists(&rtxn, name) } /// Return the name of all indexes without opening them. @@ -507,7 +512,8 @@ impl IndexScheduler { /// 2. The name of the specific data related to the property can be `enqueued` for the `statuses`, `settingsUpdate` for the `types`, or the name of the index for the `indexes`, for example. /// 3. The number of times the properties appeared. pub fn get_stats(&self) -> Result>> { - self.queue.get_stats(&self.read_txn()?, &self.processing_tasks.read().unwrap()) + let rtxn = self.read_txn()?; + self.queue.get_stats(&rtxn, &self.processing_tasks.read().unwrap()) } // Return true if there is at least one task that is processing. diff --git a/crates/index-scheduler/src/queue/batches.rs b/crates/index-scheduler/src/queue/batches.rs index 970e41110..785d24931 100644 --- a/crates/index-scheduler/src/queue/batches.rs +++ b/crates/index-scheduler/src/queue/batches.rs @@ -3,7 +3,7 @@ use std::ops::{Bound, RangeBounds}; use meilisearch_types::batches::{Batch, BatchId}; use meilisearch_types::heed::types::{DecodeIgnore, SerdeBincode, SerdeJson, Str}; -use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn}; +use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls}; use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32}; use meilisearch_types::tasks::{Kind, Status}; use roaring::{MultiOps, RoaringBitmap}; @@ -66,7 +66,7 @@ impl BatchQueue { NUMBER_OF_DATABASES } - pub(super) fn new(env: &Env, wtxn: &mut RwTxn) -> Result { + pub(super) fn new(env: &Env, wtxn: &mut RwTxn) -> Result { Ok(Self { all_batches: env.create_database(wtxn, Some(db_name::ALL_BATCHES))?, status: env.create_database(wtxn, Some(db_name::BATCH_STATUS))?, diff --git a/crates/index-scheduler/src/queue/mod.rs b/crates/index-scheduler/src/queue/mod.rs index 8850eb8fa..b13e3ffe2 100644 --- a/crates/index-scheduler/src/queue/mod.rs +++ b/crates/index-scheduler/src/queue/mod.rs @@ -13,7 +13,7 @@ use std::time::Duration; use file_store::FileStore; use meilisearch_types::batches::BatchId; -use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn}; +use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls}; use meilisearch_types::milli::{CboRoaringBitmapCodec, BEU32}; use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task}; use roaring::RoaringBitmap; @@ -157,7 +157,7 @@ impl Queue { /// Create an index scheduler and start its run loop. pub(crate) fn new( - env: &Env, + env: &Env, wtxn: &mut RwTxn, options: &IndexSchedulerOptions, ) -> Result { diff --git a/crates/index-scheduler/src/queue/tasks.rs b/crates/index-scheduler/src/queue/tasks.rs index 913ebcb30..afe510955 100644 --- a/crates/index-scheduler/src/queue/tasks.rs +++ b/crates/index-scheduler/src/queue/tasks.rs @@ -1,7 +1,7 @@ use std::ops::{Bound, RangeBounds}; use meilisearch_types::heed::types::{DecodeIgnore, SerdeBincode, SerdeJson, Str}; -use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn}; +use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls}; use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32}; use meilisearch_types::tasks::{Kind, Status, Task}; use roaring::{MultiOps, RoaringBitmap}; @@ -68,7 +68,7 @@ impl TaskQueue { NUMBER_OF_DATABASES } - pub(crate) fn new(env: &Env, wtxn: &mut RwTxn) -> Result { + pub(crate) fn new(env: &Env, wtxn: &mut RwTxn) -> Result { Ok(Self { all_tasks: env.create_database(wtxn, Some(db_name::ALL_TASKS))?, status: env.create_database(wtxn, Some(db_name::STATUS))?, diff --git a/crates/index-scheduler/src/scheduler/mod.rs b/crates/index-scheduler/src/scheduler/mod.rs index 41ff7f809..68591d664 100644 --- a/crates/index-scheduler/src/scheduler/mod.rs +++ b/crates/index-scheduler/src/scheduler/mod.rs @@ -21,6 +21,7 @@ use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::sync::Arc; use meilisearch_types::error::ResponseError; +use meilisearch_types::heed::{Env, WithoutTls}; use meilisearch_types::milli; use meilisearch_types::tasks::Status; use rayon::current_num_threads; @@ -71,7 +72,7 @@ pub struct Scheduler { pub(crate) snapshots_path: PathBuf, /// The path to the folder containing the auth LMDB env. - pub(crate) auth_path: PathBuf, + pub(crate) auth_env: Env, /// The path to the version file of Meilisearch. pub(crate) version_file_path: PathBuf, @@ -87,12 +88,12 @@ impl Scheduler { batched_tasks_size_limit: self.batched_tasks_size_limit, dumps_path: self.dumps_path.clone(), snapshots_path: self.snapshots_path.clone(), - auth_path: self.auth_path.clone(), + auth_env: self.auth_env.clone(), version_file_path: self.version_file_path.clone(), } } - pub fn new(options: &IndexSchedulerOptions) -> Scheduler { + pub fn new(options: &IndexSchedulerOptions, auth_env: Env) -> Scheduler { Scheduler { must_stop_processing: MustStopProcessing::default(), // we want to start the loop right away in case meilisearch was ctrl+Ced while processing things @@ -102,7 +103,7 @@ impl Scheduler { batched_tasks_size_limit: options.batched_tasks_size_limit, dumps_path: options.dumps_path.clone(), snapshots_path: options.snapshots_path.clone(), - auth_path: options.auth_path.clone(), + auth_env, version_file_path: options.version_file_path.clone(), } } diff --git a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs index 3e1a63ce3..f1bafed01 100644 --- a/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs +++ b/crates/index-scheduler/src/scheduler/process_snapshot_creation.rs @@ -4,7 +4,6 @@ use std::sync::atomic::Ordering; use meilisearch_types::heed::CompactionOption; use meilisearch_types::milli::progress::{Progress, VariableNameStep}; -use meilisearch_types::milli::{self}; use meilisearch_types::tasks::{Status, Task}; use meilisearch_types::{compression, VERSION_FILE_NAME}; @@ -28,7 +27,7 @@ impl IndexScheduler { // 2. Snapshot the index-scheduler LMDB env // - // When we call copy_to_file, LMDB opens a read transaction by itself, + // When we call copy_to_path, LMDB opens a read transaction by itself, // we can't provide our own. It is an issue as we would like to know // the update files to copy but new ones can be enqueued between the copy // of the env and the new transaction we open to retrieve the enqueued tasks. @@ -42,7 +41,7 @@ impl IndexScheduler { progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler); let dst = temp_snapshot_dir.path().join("tasks"); fs::create_dir_all(&dst)?; - self.env.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?; + self.env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?; // 2.2 Create a read transaction on the index-scheduler let rtxn = self.env.read_txn()?; @@ -81,7 +80,7 @@ impl IndexScheduler { let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string()); fs::create_dir_all(&dst)?; index - .copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled) + .copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled) .map_err(|e| Error::from_milli(e, Some(name.to_string())))?; } @@ -91,14 +90,7 @@ impl IndexScheduler { progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys); let dst = temp_snapshot_dir.path().join("auth"); fs::create_dir_all(&dst)?; - // TODO We can't use the open_auth_store_env function here but we should - let auth = unsafe { - milli::heed::EnvOpenOptions::new() - .map_size(1024 * 1024 * 1024) // 1 GiB - .max_dbs(2) - .open(&self.scheduler.auth_path) - }?; - auth.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?; + self.scheduler.auth_env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?; // 5. Copy and tarball the flat snapshot progress.update_progress(SnapshotCreationProgress::CreateTheTarball); diff --git a/crates/index-scheduler/src/test_utils.rs b/crates/index-scheduler/src/test_utils.rs index 072220b6c..3efcc523a 100644 --- a/crates/index-scheduler/src/test_utils.rs +++ b/crates/index-scheduler/src/test_utils.rs @@ -5,6 +5,7 @@ use std::time::Duration; use big_s::S; use crossbeam_channel::RecvTimeoutError; use file_store::File; +use meilisearch_auth::open_auth_store_env; use meilisearch_types::document_formats::DocumentFormatError; use meilisearch_types::milli::update::IndexDocumentsMethod::ReplaceDocuments; use meilisearch_types::milli::update::IndexerConfig; @@ -120,7 +121,10 @@ impl IndexScheduler { ) }); - let index_scheduler = Self::new(options, version, sender, planned_failures).unwrap(); + std::fs::create_dir_all(&options.auth_path).unwrap(); + let auth_env = open_auth_store_env(&options.auth_path).unwrap(); + let index_scheduler = + Self::new(options, auth_env, version, sender, planned_failures).unwrap(); // To be 100% consistent between all test we're going to start the scheduler right now // and ensure it's in the expected starting state. diff --git a/crates/index-scheduler/src/upgrade/mod.rs b/crates/index-scheduler/src/upgrade/mod.rs index 017685198..86ca755b8 100644 --- a/crates/index-scheduler/src/upgrade/mod.rs +++ b/crates/index-scheduler/src/upgrade/mod.rs @@ -1,5 +1,5 @@ use anyhow::bail; -use meilisearch_types::heed::{Env, RwTxn}; +use meilisearch_types::heed::{Env, RwTxn, WithoutTls}; use meilisearch_types::tasks::{Details, KindWithContent, Status, Task}; use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH}; use time::OffsetDateTime; @@ -9,13 +9,17 @@ use crate::queue::TaskQueue; use crate::versioning::Versioning; trait UpgradeIndexScheduler { - fn upgrade(&self, env: &Env, wtxn: &mut RwTxn, original: (u32, u32, u32)) - -> anyhow::Result<()>; + fn upgrade( + &self, + env: &Env, + wtxn: &mut RwTxn, + original: (u32, u32, u32), + ) -> anyhow::Result<()>; fn target_version(&self) -> (u32, u32, u32); } pub fn upgrade_index_scheduler( - env: &Env, + env: &Env, versioning: &Versioning, from: (u32, u32, u32), to: (u32, u32, u32), @@ -91,7 +95,7 @@ struct ToCurrentNoOp {} impl UpgradeIndexScheduler for ToCurrentNoOp { fn upgrade( &self, - _env: &Env, + _env: &Env, _wtxn: &mut RwTxn, _original: (u32, u32, u32), ) -> anyhow::Result<()> { diff --git a/crates/index-scheduler/src/versioning.rs b/crates/index-scheduler/src/versioning.rs index 22132bf5f..107b8e0ba 100644 --- a/crates/index-scheduler/src/versioning.rs +++ b/crates/index-scheduler/src/versioning.rs @@ -1,5 +1,5 @@ use meilisearch_types::heed::types::Str; -use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn}; +use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn, WithoutTls}; use meilisearch_types::milli::heed_codec::version::VersionCodec; use meilisearch_types::versioning; @@ -46,12 +46,12 @@ impl Versioning { } /// Return `Self` without checking anything about the version - pub fn raw_new(env: &Env, wtxn: &mut RwTxn) -> Result { + pub fn raw_new(env: &Env, wtxn: &mut RwTxn) -> Result { let version = env.create_database(wtxn, Some(db_name::VERSION))?; Ok(Self { version }) } - pub(crate) fn new(env: &Env, db_version: (u32, u32, u32)) -> Result { + pub(crate) fn new(env: &Env, db_version: (u32, u32, u32)) -> Result { let mut wtxn = env.write_txn()?; let this = Self::raw_new(env, &mut wtxn)?; let from = match this.get_version(&wtxn)? { diff --git a/crates/meilisearch-auth/src/dump.rs b/crates/meilisearch-auth/src/dump.rs index 8a215d8ae..6a71b636e 100644 --- a/crates/meilisearch-auth/src/dump.rs +++ b/crates/meilisearch-auth/src/dump.rs @@ -2,6 +2,7 @@ use std::fs::File; use std::io::{BufReader, Write}; use std::path::Path; +use meilisearch_types::heed::{Env, WithoutTls}; use serde_json::Deserializer; use crate::{AuthController, HeedAuthStore, Result}; @@ -9,11 +10,8 @@ use crate::{AuthController, HeedAuthStore, Result}; const KEYS_PATH: &str = "keys"; impl AuthController { - pub fn dump(src: impl AsRef, dst: impl AsRef) -> Result<()> { - let mut store = HeedAuthStore::new(&src)?; - - // do not attempt to close the database on drop! - store.set_drop_on_close(false); + pub fn dump(auth_env: Env, dst: impl AsRef) -> Result<()> { + let store = HeedAuthStore::new(auth_env)?; let keys_file_path = dst.as_ref().join(KEYS_PATH); @@ -27,8 +25,8 @@ impl AuthController { Ok(()) } - pub fn load_dump(src: impl AsRef, dst: impl AsRef) -> Result<()> { - let store = HeedAuthStore::new(&dst)?; + pub fn load_dump(src: impl AsRef, auth_env: Env) -> Result<()> { + let store = HeedAuthStore::new(auth_env)?; let keys_file_path = src.as_ref().join(KEYS_PATH); diff --git a/crates/meilisearch-auth/src/lib.rs b/crates/meilisearch-auth/src/lib.rs index 4dbf1bf6f..01c986d9f 100644 --- a/crates/meilisearch-auth/src/lib.rs +++ b/crates/meilisearch-auth/src/lib.rs @@ -3,11 +3,10 @@ pub mod error; mod store; use std::collections::{HashMap, HashSet}; -use std::path::Path; -use std::sync::Arc; use error::{AuthControllerError, Result}; use maplit::hashset; +use meilisearch_types::heed::{Env, WithoutTls}; use meilisearch_types::index_uid_pattern::IndexUidPattern; use meilisearch_types::keys::{Action, CreateApiKey, Key, PatchApiKey}; use meilisearch_types::milli::update::Setting; @@ -19,19 +18,19 @@ use uuid::Uuid; #[derive(Clone)] pub struct AuthController { - store: Arc, + store: HeedAuthStore, master_key: Option, } impl AuthController { - pub fn new(db_path: impl AsRef, master_key: &Option) -> Result { - let store = HeedAuthStore::new(db_path)?; + pub fn new(auth_env: Env, master_key: &Option) -> Result { + let store = HeedAuthStore::new(auth_env)?; if store.is_empty()? { generate_default_keys(&store)?; } - Ok(Self { store: Arc::new(store), master_key: master_key.clone() }) + Ok(Self { store, master_key: master_key.clone() }) } /// Return `Ok(())` if the auth controller is able to access one of its database. diff --git a/crates/meilisearch-auth/src/store.rs b/crates/meilisearch-auth/src/store.rs index ef992e836..2fd380194 100644 --- a/crates/meilisearch-auth/src/store.rs +++ b/crates/meilisearch-auth/src/store.rs @@ -1,18 +1,16 @@ use std::borrow::Cow; use std::cmp::Reverse; use std::collections::HashSet; -use std::fs::create_dir_all; use std::path::Path; use std::result::Result as StdResult; use std::str; use std::str::FromStr; -use std::sync::Arc; use hmac::{Hmac, Mac}; -use meilisearch_types::heed::BoxedError; +use meilisearch_types::heed::{BoxedError, WithoutTls}; use meilisearch_types::index_uid_pattern::IndexUidPattern; use meilisearch_types::keys::KeyId; -use meilisearch_types::milli; +use meilisearch_types::milli::heed; use meilisearch_types::milli::heed::types::{Bytes, DecodeIgnore, SerdeJson}; use meilisearch_types::milli::heed::{Database, Env, EnvOpenOptions, RwTxn}; use sha2::Sha256; @@ -25,44 +23,32 @@ use super::error::{AuthControllerError, Result}; use super::{Action, Key}; const AUTH_STORE_SIZE: usize = 1_073_741_824; //1GiB -const AUTH_DB_PATH: &str = "auth"; const KEY_DB_NAME: &str = "api-keys"; const KEY_ID_ACTION_INDEX_EXPIRATION_DB_NAME: &str = "keyid-action-index-expiration"; #[derive(Clone)] pub struct HeedAuthStore { - env: Arc, + env: Env, keys: Database>, action_keyid_index_expiration: Database>>, - should_close_on_drop: bool, } -impl Drop for HeedAuthStore { - fn drop(&mut self) { - if self.should_close_on_drop && Arc::strong_count(&self.env) == 1 { - self.env.as_ref().clone().prepare_for_closing(); - } - } -} - -pub fn open_auth_store_env(path: &Path) -> milli::heed::Result { - let mut options = EnvOpenOptions::new(); +pub fn open_auth_store_env(path: &Path) -> heed::Result> { + let options = EnvOpenOptions::new(); + let mut options = options.read_txn_without_tls(); options.map_size(AUTH_STORE_SIZE); // 1GB options.max_dbs(2); unsafe { options.open(path) } } impl HeedAuthStore { - pub fn new(path: impl AsRef) -> Result { - let path = path.as_ref().join(AUTH_DB_PATH); - create_dir_all(&path)?; - let env = Arc::new(open_auth_store_env(path.as_ref())?); + pub fn new(env: Env) -> Result { let mut wtxn = env.write_txn()?; let keys = env.create_database(&mut wtxn, Some(KEY_DB_NAME))?; let action_keyid_index_expiration = env.create_database(&mut wtxn, Some(KEY_ID_ACTION_INDEX_EXPIRATION_DB_NAME))?; wtxn.commit()?; - Ok(Self { env, keys, action_keyid_index_expiration, should_close_on_drop: true }) + Ok(Self { env, keys, action_keyid_index_expiration }) } /// Return `Ok(())` if the auth store is able to access one of its database. @@ -82,10 +68,6 @@ impl HeedAuthStore { Ok(self.env.non_free_pages_size()?) } - pub fn set_drop_on_close(&mut self, v: bool) { - self.should_close_on_drop = v; - } - pub fn is_empty(&self) -> Result { let rtxn = self.env.read_txn()?; @@ -293,7 +275,7 @@ impl HeedAuthStore { /// optionally on a specific index, for a given key. pub struct KeyIdActionCodec; -impl<'a> milli::heed::BytesDecode<'a> for KeyIdActionCodec { +impl<'a> heed::BytesDecode<'a> for KeyIdActionCodec { type DItem = (KeyId, Action, Option<&'a [u8]>); fn bytes_decode(bytes: &'a [u8]) -> StdResult { @@ -310,7 +292,7 @@ impl<'a> milli::heed::BytesDecode<'a> for KeyIdActionCodec { } } -impl<'a> milli::heed::BytesEncode<'a> for KeyIdActionCodec { +impl<'a> heed::BytesEncode<'a> for KeyIdActionCodec { type EItem = (&'a KeyId, &'a Action, Option<&'a [u8]>); fn bytes_encode((key_id, action, index): &Self::EItem) -> StdResult, BoxedError> { diff --git a/crates/meilisearch-types/src/error.rs b/crates/meilisearch-types/src/error.rs index 84bfa0aa9..859563d8a 100644 --- a/crates/meilisearch-types/src/error.rs +++ b/crates/meilisearch-types/src/error.rs @@ -407,7 +407,7 @@ impl ErrorCode for milli::Error { match error { // TODO: wait for spec for new error codes. UserError::SerdeJson(_) - | UserError::InvalidLmdbOpenOptions + | UserError::EnvAlreadyOpened | UserError::DocumentLimitReached | UserError::UnknownInternalDocumentId { .. } => Code::Internal, UserError::InvalidStoreFile => Code::InvalidStoreFile, @@ -504,8 +504,7 @@ impl ErrorCode for HeedError { HeedError::Mdb(_) | HeedError::Encoding(_) | HeedError::Decoding(_) - | HeedError::DatabaseClosing - | HeedError::BadOpenOptions { .. } => Code::Internal, + | HeedError::EnvAlreadyOpened => Code::Internal, } } } diff --git a/crates/meilisearch/src/lib.rs b/crates/meilisearch/src/lib.rs index 948d1148b..1841d5556 100644 --- a/crates/meilisearch/src/lib.rs +++ b/crates/meilisearch/src/lib.rs @@ -34,7 +34,7 @@ use error::PayloadError; use extractors::payload::PayloadConfig; use index_scheduler::versioning::Versioning; use index_scheduler::{IndexScheduler, IndexSchedulerOptions}; -use meilisearch_auth::AuthController; +use meilisearch_auth::{open_auth_store_env, AuthController}; use meilisearch_types::milli::constants::VERSION_MAJOR; use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader}; use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod}; @@ -335,9 +335,12 @@ fn open_or_create_database_unchecked( ) -> anyhow::Result<(IndexScheduler, AuthController)> { // we don't want to create anything in the data.ms yet, thus we // wrap our two builders in a closure that'll be executed later. - let auth_controller = AuthController::new(&opt.db_path, &opt.master_key); - let index_scheduler_builder = - || -> anyhow::Result<_> { Ok(IndexScheduler::new(index_scheduler_opt, version)?) }; + std::fs::create_dir_all(&index_scheduler_opt.auth_path)?; + let auth_env = open_auth_store_env(&index_scheduler_opt.auth_path).unwrap(); + let auth_controller = AuthController::new(auth_env.clone(), &opt.master_key); + let index_scheduler_builder = || -> anyhow::Result<_> { + Ok(IndexScheduler::new(index_scheduler_opt, auth_env, version)?) + }; match ( index_scheduler_builder(), @@ -420,6 +423,7 @@ pub fn update_version_file_for_dumpless_upgrade( if from_major == 1 && from_minor == 12 { let env = unsafe { heed::EnvOpenOptions::new() + .read_txn_without_tls() .max_dbs(Versioning::nb_db()) .map_size(index_scheduler_opt.task_db_size) .open(&index_scheduler_opt.tasks_path) diff --git a/crates/meilisearch/src/search/mod.rs b/crates/meilisearch/src/search/mod.rs index d0cab1672..69b306abc 100644 --- a/crates/meilisearch/src/search/mod.rs +++ b/crates/meilisearch/src/search/mod.rs @@ -340,7 +340,8 @@ impl SearchKind { vector_len: Option, route: Route, ) -> Result<(String, Arc, bool), ResponseError> { - let embedder_configs = index.embedding_configs(&index.read_txn()?)?; + let rtxn = index.read_txn()?; + let embedder_configs = index.embedding_configs(&rtxn)?; let embedders = index_scheduler.embedders(index_uid, embedder_configs)?; let (embedder, _, quantized) = embedders diff --git a/crates/meilisearch/tests/features/mod.rs b/crates/meilisearch/tests/features/mod.rs index ea11738cc..34cd40e38 100644 --- a/crates/meilisearch/tests/features/mod.rs +++ b/crates/meilisearch/tests/features/mod.rs @@ -144,14 +144,6 @@ async fn experimental_feature_metrics() { let (response, code) = server.get_metrics().await; meili_snap::snapshot!(code, @"200 OK"); meili_snap::snapshot!(response, @"null"); - - // startup without flag respects persisted metrics value - let disable_metrics = - Opt { experimental_enable_metrics: false, ..default_settings(dir.path()) }; - let server_no_flag = Server::new_with_options(disable_metrics).await.unwrap(); - let (response, code) = server_no_flag.get_metrics().await; - meili_snap::snapshot!(code, @"200 OK"); - meili_snap::snapshot!(response, @"null"); } #[actix_rt::test] diff --git a/crates/meilisearch/tests/upgrade/mod.rs b/crates/meilisearch/tests/upgrade/mod.rs index 4b0cb6330..462305d21 100644 --- a/crates/meilisearch/tests/upgrade/mod.rs +++ b/crates/meilisearch/tests/upgrade/mod.rs @@ -64,9 +64,6 @@ async fn version_requires_downgrade() { #[actix_rt::test] async fn upgrade_to_the_current_version() { let temp = tempfile::tempdir().unwrap(); - let server = Server::new_with_options(default_settings(temp.path())).await.unwrap(); - drop(server); - let server = Server::new_with_options(Opt { experimental_dumpless_upgrade: true, ..default_settings(temp.path()) diff --git a/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs b/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs index 3e9f2b932..11ba2882a 100644 --- a/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs +++ b/crates/meilisearch/tests/upgrade/v1_12/v1_12_0.rs @@ -108,6 +108,10 @@ async fn check_the_keys(server: &Server) { /// 5.2. Enqueue a new task /// 5.3. Create an index async fn check_the_index_scheduler(server: &Server) { + // Wait until the upgrade has been applied to all indexes to avoid flakyness + let (tasks, _) = server.tasks_filter("types=upgradeDatabase&limit=1").await; + server.wait_task(Value(tasks["results"][0].clone()).uid()).await.succeeded(); + // All the indexes are still present let (indexes, _) = server.list_indexes(None, None).await; snapshot!(indexes, @r#" @@ -156,10 +160,6 @@ async fn check_the_index_scheduler(server: &Server) { } "###); - // Wait until the upgrade has been applied to all indexes to avoid flakyness - let (tasks, _) = server.tasks_filter("types=upgradeDatabase&limit=1").await; - server.wait_task(Value(tasks["results"][0].clone()).uid()).await.succeeded(); - // Tasks and batches should still work // We rewrite the first task for all calls because it may be the upgrade database with unknown dates and duration. // The other tasks should NOT change diff --git a/crates/meilitool/Cargo.toml b/crates/meilitool/Cargo.toml index ffd13da34..485177838 100644 --- a/crates/meilitool/Cargo.toml +++ b/crates/meilitool/Cargo.toml @@ -10,7 +10,6 @@ license.workspace = true [dependencies] anyhow = "1.0.95" -arroy_v04_to_v05 = { package = "arroy", git = "https://github.com/meilisearch/arroy/", tag = "DO-NOT-DELETE-upgrade-v04-to-v05" } clap = { version = "4.5.24", features = ["derive"] } dump = { path = "../dump" } file-store = { path = "../file-store" } diff --git a/crates/meilitool/src/main.rs b/crates/meilitool/src/main.rs index 8a8b774b8..dd1213782 100644 --- a/crates/meilitool/src/main.rs +++ b/crates/meilitool/src/main.rs @@ -7,11 +7,11 @@ use anyhow::{bail, Context}; use clap::{Parser, Subcommand, ValueEnum}; use dump::{DumpWriter, IndexMetadata}; use file_store::FileStore; -use meilisearch_auth::AuthController; +use meilisearch_auth::{open_auth_store_env, AuthController}; use meilisearch_types::batches::Batch; use meilisearch_types::heed::types::{Bytes, SerdeJson, Str}; use meilisearch_types::heed::{ - CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified, + CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified, WithoutTls, }; use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; @@ -172,7 +172,7 @@ fn main() -> anyhow::Result<()> { /// Clears the task queue located at `db_path`. fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> { let path = db_path.join("tasks"); - let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&path) } + let env = unsafe { EnvOpenOptions::new().read_txn_without_tls().max_dbs(100).open(&path) } .with_context(|| format!("While trying to open {:?}", path.display()))?; eprintln!("Deleting tasks from the database..."); @@ -225,7 +225,7 @@ fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> { } fn try_opening_database( - env: &Env, + env: &Env, rtxn: &RoTxn, db_name: &str, ) -> anyhow::Result> { @@ -235,7 +235,7 @@ fn try_opening_database( } fn try_opening_poly_database( - env: &Env, + env: &Env, rtxn: &RoTxn, db_name: &str, ) -> anyhow::Result> { @@ -284,13 +284,18 @@ fn export_a_dump( FileStore::new(db_path.join("update_files")).context("While opening the FileStore")?; let index_scheduler_path = db_path.join("tasks"); - let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) } - .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; + let env = unsafe { + EnvOpenOptions::new().read_txn_without_tls().max_dbs(100).open(&index_scheduler_path) + } + .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; eprintln!("Dumping the keys..."); // 2. dump the keys - let auth_store = AuthController::new(&db_path, &None) + let auth_path = db_path.join("auth"); + std::fs::create_dir_all(&auth_path).context("While creating the auth directory")?; + let auth_env = open_auth_store_env(&auth_path).context("While opening the auth store")?; + let auth_store = AuthController::new(auth_env, &None) .with_context(|| format!("While opening the auth store at {}", db_path.display()))?; let mut dump_keys = dump.create_keys()?; let mut count = 0; @@ -386,9 +391,10 @@ fn export_a_dump( for result in index_mapping.iter(&rtxn)? { let (uid, uuid) = result?; let index_path = db_path.join("indexes").join(uuid.to_string()); - let index = Index::new(EnvOpenOptions::new(), &index_path, false).with_context(|| { - format!("While trying to open the index at path {:?}", index_path.display()) - })?; + let index = Index::new(EnvOpenOptions::new().read_txn_without_tls(), &index_path, false) + .with_context(|| { + format!("While trying to open the index at path {:?}", index_path.display()) + })?; let rtxn = index.read_txn()?; let metadata = IndexMetadata { @@ -438,8 +444,10 @@ fn export_a_dump( fn compact_index(db_path: PathBuf, index_name: &str) -> anyhow::Result<()> { let index_scheduler_path = db_path.join("tasks"); - let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) } - .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; + let env = unsafe { + EnvOpenOptions::new().read_txn_without_tls().max_dbs(100).open(&index_scheduler_path) + } + .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; let rtxn = env.read_txn()?; let index_mapping: Database = @@ -456,9 +464,10 @@ fn compact_index(db_path: PathBuf, index_name: &str) -> anyhow::Result<()> { } let index_path = db_path.join("indexes").join(uuid.to_string()); - let index = Index::new(EnvOpenOptions::new(), &index_path, false).with_context(|| { - format!("While trying to open the index at path {:?}", index_path.display()) - })?; + let index = Index::new(EnvOpenOptions::new().read_txn_without_tls(), &index_path, false) + .with_context(|| { + format!("While trying to open the index at path {:?}", index_path.display()) + })?; eprintln!("Awaiting for a mutable transaction..."); let _wtxn = index.write_txn().context("While awaiting for a write transaction")?; @@ -470,7 +479,7 @@ fn compact_index(db_path: PathBuf, index_name: &str) -> anyhow::Result<()> { eprintln!("Compacting the index..."); let before_compaction = Instant::now(); let new_file = index - .copy_to_file(&compacted_index_file_path, CompactionOption::Enabled) + .copy_to_path(&compacted_index_file_path, CompactionOption::Enabled) .with_context(|| format!("While compacting {}", compacted_index_file_path.display()))?; let after_size = new_file.metadata()?.len(); @@ -514,8 +523,10 @@ fn export_documents( offset: Option, ) -> anyhow::Result<()> { let index_scheduler_path = db_path.join("tasks"); - let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) } - .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; + let env = unsafe { + EnvOpenOptions::new().read_txn_without_tls().max_dbs(100).open(&index_scheduler_path) + } + .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; let rtxn = env.read_txn()?; let index_mapping: Database = @@ -526,9 +537,10 @@ fn export_documents( if uid == index_name { let index_path = db_path.join("indexes").join(uuid.to_string()); let index = - Index::new(EnvOpenOptions::new(), &index_path, false).with_context(|| { - format!("While trying to open the index at path {:?}", index_path.display()) - })?; + Index::new(EnvOpenOptions::new().read_txn_without_tls(), &index_path, false) + .with_context(|| { + format!("While trying to open the index at path {:?}", index_path.display()) + })?; let rtxn = index.read_txn()?; let fields_ids_map = index.fields_ids_map(&rtxn)?; @@ -616,8 +628,10 @@ fn hair_dryer( index_parts: &[IndexPart], ) -> anyhow::Result<()> { let index_scheduler_path = db_path.join("tasks"); - let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) } - .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; + let env = unsafe { + EnvOpenOptions::new().read_txn_without_tls().max_dbs(100).open(&index_scheduler_path) + } + .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; eprintln!("Trying to get a read transaction on the index scheduler..."); @@ -630,9 +644,10 @@ fn hair_dryer( if index_names.iter().any(|i| i == uid) { let index_path = db_path.join("indexes").join(uuid.to_string()); let index = - Index::new(EnvOpenOptions::new(), &index_path, false).with_context(|| { - format!("While trying to open the index at path {:?}", index_path.display()) - })?; + Index::new(EnvOpenOptions::new().read_txn_without_tls(), &index_path, false) + .with_context(|| { + format!("While trying to open the index at path {:?}", index_path.display()) + })?; eprintln!("Trying to get a read transaction on the {uid} index..."); diff --git a/crates/meilitool/src/upgrade/v1_10.rs b/crates/meilitool/src/upgrade/v1_10.rs index 043520e82..ac30055c5 100644 --- a/crates/meilitool/src/upgrade/v1_10.rs +++ b/crates/meilitool/src/upgrade/v1_10.rs @@ -2,7 +2,9 @@ use std::path::Path; use anyhow::{bail, Context}; use meilisearch_types::heed::types::{SerdeJson, Str}; -use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified}; +use meilisearch_types::heed::{ + Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified, WithoutTls, +}; use meilisearch_types::milli::index::{db_name, main_key}; use super::v1_9; @@ -92,7 +94,7 @@ fn update_index_stats( fn update_date_format( index_uid: &str, - index_env: &Env, + index_env: &Env, index_wtxn: &mut RwTxn, ) -> anyhow::Result<()> { let main = try_opening_poly_database(index_env, index_wtxn, db_name::MAIN) @@ -106,7 +108,7 @@ fn update_date_format( fn find_rest_embedders( index_uid: &str, - index_env: &Env, + index_env: &Env, index_txn: &RoTxn, ) -> anyhow::Result> { let main = try_opening_poly_database(index_env, index_txn, db_name::MAIN) @@ -164,8 +166,10 @@ pub fn v1_9_to_v1_10( // 2. REST embedders. We don't support this case right now, so bail let index_scheduler_path = db_path.join("tasks"); - let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) } - .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; + let env = unsafe { + EnvOpenOptions::new().read_txn_without_tls().max_dbs(100).open(&index_scheduler_path) + } + .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; let mut sched_wtxn = env.write_txn()?; @@ -205,9 +209,13 @@ pub fn v1_9_to_v1_10( let index_env = unsafe { // FIXME: fetch the 25 magic number from the index file - EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| { - format!("while opening index {uid} at '{}'", index_path.display()) - })? + EnvOpenOptions::new() + .read_txn_without_tls() + .max_dbs(25) + .open(&index_path) + .with_context(|| { + format!("while opening index {uid} at '{}'", index_path.display()) + })? }; let index_txn = index_env.read_txn().with_context(|| { @@ -252,9 +260,13 @@ pub fn v1_9_to_v1_10( let index_env = unsafe { // FIXME: fetch the 25 magic number from the index file - EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| { - format!("while opening index {uid} at '{}'", index_path.display()) - })? + EnvOpenOptions::new() + .read_txn_without_tls() + .max_dbs(25) + .open(&index_path) + .with_context(|| { + format!("while opening index {uid} at '{}'", index_path.display()) + })? }; let mut index_wtxn = index_env.write_txn().with_context(|| { diff --git a/crates/meilitool/src/upgrade/v1_11.rs b/crates/meilitool/src/upgrade/v1_11.rs index 44aeb125f..76d2fc24f 100644 --- a/crates/meilitool/src/upgrade/v1_11.rs +++ b/crates/meilitool/src/upgrade/v1_11.rs @@ -23,8 +23,10 @@ pub fn v1_10_to_v1_11( println!("Upgrading from v1.10.0 to v1.11.0"); let index_scheduler_path = db_path.join("tasks"); - let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) } - .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; + let env = unsafe { + EnvOpenOptions::new().read_txn_without_tls().max_dbs(100).open(&index_scheduler_path) + } + .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; let sched_rtxn = env.read_txn()?; @@ -50,9 +52,13 @@ pub fn v1_10_to_v1_11( ); let index_env = unsafe { - EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| { - format!("while opening index {uid} at '{}'", index_path.display()) - })? + EnvOpenOptions::new() + .read_txn_without_tls() + .max_dbs(25) + .open(&index_path) + .with_context(|| { + format!("while opening index {uid} at '{}'", index_path.display()) + })? }; let index_rtxn = index_env.read_txn().with_context(|| { @@ -76,11 +82,11 @@ pub fn v1_10_to_v1_11( try_opening_poly_database(&index_env, &index_wtxn, db_name::VECTOR_ARROY) .with_context(|| format!("while updating date format for index `{uid}`"))?; - arroy_v04_to_v05::ugrade_from_prev_version( + meilisearch_types::milli::arroy::upgrade::cosine_from_0_4_to_0_5( &index_rtxn, - index_read_database, + index_read_database.remap_types(), &mut index_wtxn, - index_write_database, + index_write_database.remap_types(), )?; index_wtxn.commit()?; diff --git a/crates/meilitool/src/upgrade/v1_12.rs b/crates/meilitool/src/upgrade/v1_12.rs index 3ad171c31..1dd679eb9 100644 --- a/crates/meilitool/src/upgrade/v1_12.rs +++ b/crates/meilitool/src/upgrade/v1_12.rs @@ -115,8 +115,10 @@ fn convert_update_files(db_path: &Path) -> anyhow::Result<()> { /// Rebuild field distribution as it was wrongly computed in v1.12.x if x < 3 fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> { let index_scheduler_path = db_path.join("tasks"); - let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) } - .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; + let env = unsafe { + EnvOpenOptions::new().read_txn_without_tls().max_dbs(100).open(&index_scheduler_path) + } + .with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?; let mut sched_wtxn = env.write_txn()?; @@ -173,11 +175,12 @@ fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> { println!("\t- Rebuilding field distribution"); - let index = - meilisearch_types::milli::Index::new(EnvOpenOptions::new(), &index_path, false) - .with_context(|| { - format!("while opening index {uid} at '{}'", index_path.display()) - })?; + let index = meilisearch_types::milli::Index::new( + EnvOpenOptions::new().read_txn_without_tls(), + &index_path, + false, + ) + .with_context(|| format!("while opening index {uid} at '{}'", index_path.display()))?; let mut index_txn = index.write_txn()?; diff --git a/crates/milli/Cargo.toml b/crates/milli/Cargo.toml index 5eb89ea53..dc95135a2 100644 --- a/crates/milli/Cargo.toml +++ b/crates/milli/Cargo.toml @@ -28,11 +28,13 @@ flatten-serde-json = { path = "../flatten-serde-json" } fst = "0.4.7" fxhash = "0.2.1" geoutils = "0.5.1" -grenad = { version = "0.5.0", default-features = false, features = ["rayon", "tempfile"] } -heed = { version = "0.20.5", default-features = false, features = [ +grenad = { version = "0.5.0", default-features = false, features = [ + "rayon", + "tempfile", +] } +heed = { version = "0.22.0", default-features = false, features = [ "serde-json", "serde-bincode", - "read-txn-no-tls", ] } indexmap = { version = "2.7.0", features = ["serde"] } json-depth-checker = { path = "../json-depth-checker" } @@ -85,7 +87,7 @@ rhai = { git = "https://github.com/rhaiscript/rhai", rev = "ef3df63121d27aacd838 "no_time", "sync", ] } -arroy = "0.5.0" +arroy = "0.6.0" rand = "0.8.5" tracing = "0.1.41" ureq = { version = "2.12.1", features = ["json"] } @@ -101,7 +103,13 @@ uell = "0.1.0" enum-iterator = "2.1.0" bbqueue = { git = "https://github.com/meilisearch/bbqueue" } flume = { version = "0.11.1", default-features = false } -utoipa = { version = "5.3.1", features = ["non_strict_integers", "preserve_order", "uuid", "time", "openapi_extensions"] } +utoipa = { version = "5.3.1", features = [ + "non_strict_integers", + "preserve_order", + "uuid", + "time", + "openapi_extensions", +] } [dev-dependencies] mimalloc = { version = "0.1.43", default-features = false } @@ -113,9 +121,7 @@ meili-snap = { path = "../meili-snap" } rand = { version = "0.8.5", features = ["small_rng"] } [features] -all-tokenizations = [ - "charabia/default", -] +all-tokenizations = ["charabia/default"] # Use POSIX semaphores instead of SysV semaphores in LMDB # For more information on this feature, see heed's Cargo.toml diff --git a/crates/milli/src/documents/mod.rs b/crates/milli/src/documents/mod.rs index 88fa38d30..f43f7e842 100644 --- a/crates/milli/src/documents/mod.rs +++ b/crates/milli/src/documents/mod.rs @@ -80,9 +80,13 @@ impl DocumentsBatchIndex { let mut map = Object::new(); for (k, v) in document.iter() { - // TODO: TAMO: update the error type - let key = - self.0.get_by_left(&k).ok_or(crate::error::InternalError::DatabaseClosing)?.clone(); + let key = self + .0 + .get_by_left(&k) + .ok_or(crate::error::InternalError::FieldIdMapMissingEntry( + FieldIdMapMissingEntry::FieldId { field_id: k, process: "recreate_json" }, + ))? + .clone(); let value = serde_json::from_slice::(v) .map_err(crate::error::InternalError::SerdeJson)?; map.insert(key, value); diff --git a/crates/milli/src/error.rs b/crates/milli/src/error.rs index f0972de75..e1098cfa5 100644 --- a/crates/milli/src/error.rs +++ b/crates/milli/src/error.rs @@ -33,8 +33,6 @@ pub enum Error { #[derive(Error, Debug)] pub enum InternalError { - #[error("{}", HeedError::DatabaseClosing)] - DatabaseClosing, #[error("missing {} in the {db_name} database", key.unwrap_or("key"))] DatabaseMissingEntry { db_name: &'static str, key: Option<&'static str> }, #[error("missing {key} in the fieldids weights mapping")] @@ -197,8 +195,8 @@ and can not be more than 511 bytes.", .document_id.to_string() valid_fields: BTreeSet, hidden_fields: bool, }, - #[error("an environment is already opened with different options")] - InvalidLmdbOpenOptions, + #[error("An LMDB environment is already opened")] + EnvAlreadyOpened, #[error("You must specify where `sort` is listed in the rankingRules setting to use the sort parameter at search time.")] SortRankingRuleMissing, #[error("The database file is in an invalid state.")] @@ -362,7 +360,8 @@ impl From for Error { | arroy::Error::UnmatchingDistance { .. } | arroy::Error::NeedBuild(_) | arroy::Error::MissingKey { .. } - | arroy::Error::MissingMetadata(_) => { + | arroy::Error::MissingMetadata(_) + | arroy::Error::CannotDecodeKeyMode { .. } => { Error::InternalError(InternalError::ArroyError(value)) } } @@ -516,8 +515,7 @@ impl From for Error { // TODO use the encoding HeedError::Encoding(_) => InternalError(Serialization(Encoding { db_name: None })), HeedError::Decoding(_) => InternalError(Serialization(Decoding { db_name: None })), - HeedError::DatabaseClosing => InternalError(DatabaseClosing), - HeedError::BadOpenOptions { .. } => UserError(InvalidLmdbOpenOptions), + HeedError::EnvAlreadyOpened { .. } => UserError(EnvAlreadyOpened), } } } diff --git a/crates/milli/src/index.rs b/crates/milli/src/index.rs index 798cf3073..771d32175 100644 --- a/crates/milli/src/index.rs +++ b/crates/milli/src/index.rs @@ -3,7 +3,7 @@ use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::fs::File; use std::path::Path; -use heed::types::*; +use heed::{types::*, WithoutTls}; use heed::{CompactionOption, Database, RoTxn, RwTxn, Unspecified}; use roaring::RoaringBitmap; use rstar::RTree; @@ -110,7 +110,7 @@ pub mod db_name { #[derive(Clone)] pub struct Index { /// The LMDB environment which this index is associated with. - pub(crate) env: heed::Env, + pub(crate) env: heed::Env, /// Contains many different types (e.g. the fields ids map). pub(crate) main: Database, @@ -177,7 +177,7 @@ pub struct Index { impl Index { pub fn new_with_creation_dates>( - mut options: heed::EnvOpenOptions, + mut options: heed::EnvOpenOptions, path: P, created_at: time::OffsetDateTime, updated_at: time::OffsetDateTime, @@ -275,7 +275,7 @@ impl Index { } pub fn new>( - options: heed::EnvOpenOptions, + options: heed::EnvOpenOptions, path: P, creation: bool, ) -> Result { @@ -284,7 +284,7 @@ impl Index { } fn set_creation_dates( - env: &heed::Env, + env: &heed::Env, main: Database, created_at: time::OffsetDateTime, updated_at: time::OffsetDateTime, @@ -306,12 +306,12 @@ impl Index { } /// Create a read transaction to be able to read the index. - pub fn read_txn(&self) -> heed::Result> { + pub fn read_txn(&self) -> heed::Result> { self.env.read_txn() } /// Create a static read transaction to be able to read the index without keeping a reference to it. - pub fn static_read_txn(&self) -> heed::Result> { + pub fn static_read_txn(&self) -> heed::Result> { self.env.clone().static_read_txn() } @@ -340,8 +340,12 @@ impl Index { self.env.info().map_size } - pub fn copy_to_file>(&self, path: P, option: CompactionOption) -> Result { - self.env.copy_to_file(path, option).map_err(Into::into) + pub fn copy_to_file(&self, file: &mut File, option: CompactionOption) -> Result<()> { + self.env.copy_to_file(file, option).map_err(Into::into) + } + + pub fn copy_to_path>(&self, path: P, option: CompactionOption) -> Result { + self.env.copy_to_path(path, option).map_err(Into::into) } /// Returns an `EnvClosingEvent` that can be used to wait for the closing event, @@ -1825,7 +1829,8 @@ pub(crate) mod tests { impl TempIndex { /// Creates a temporary index pub fn new_with_map_size(size: usize) -> Self { - let mut options = EnvOpenOptions::new(); + let options = EnvOpenOptions::new(); + let mut options = options.read_txn_without_tls(); options.map_size(size); let _tempdir = TempDir::new_in(".").unwrap(); let inner = Index::new(options, _tempdir.path(), true).unwrap(); diff --git a/crates/milli/src/lib.rs b/crates/milli/src/lib.rs index 85540c82e..1a6977585 100644 --- a/crates/milli/src/lib.rs +++ b/crates/milli/src/lib.rs @@ -83,6 +83,8 @@ pub use self::search::{ }; pub use self::update::ChannelCongestion; +pub use arroy; + pub type Result = std::result::Result; pub type Attribute = u32; diff --git a/crates/milli/src/search/new/tests/integration.rs b/crates/milli/src/search/new/tests/integration.rs index e718eb39d..4a6cc9b90 100644 --- a/crates/milli/src/search/new/tests/integration.rs +++ b/crates/milli/src/search/new/tests/integration.rs @@ -15,7 +15,8 @@ use crate::constants::RESERVED_GEO_FIELD_NAME; pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { let path = tempfile::tempdir().unwrap(); - let mut options = EnvOpenOptions::new(); + let options = EnvOpenOptions::new(); + let mut options = options.read_txn_without_tls(); options.map_size(10 * 1024 * 1024); // 10 MB let index = Index::new(options, &path, true).unwrap(); diff --git a/crates/milli/src/update/facet/mod.rs b/crates/milli/src/update/facet/mod.rs index 027bb355e..c40916670 100644 --- a/crates/milli/src/update/facet/mod.rs +++ b/crates/milli/src/update/facet/mod.rs @@ -352,7 +352,7 @@ pub(crate) mod test_helpers { use grenad::MergerBuilder; use heed::types::Bytes; - use heed::{BytesDecode, BytesEncode, Env, RoTxn, RwTxn}; + use heed::{BytesDecode, BytesEncode, Env, RoTxn, RwTxn, WithoutTls}; use roaring::RoaringBitmap; use super::bulk::FacetsUpdateBulkInner; @@ -390,7 +390,7 @@ pub(crate) mod test_helpers { for<'a> BoundCodec: BytesEncode<'a> + BytesDecode<'a, DItem = >::EItem>, { - pub env: Env, + pub env: Env, pub content: heed::Database, FacetGroupValueCodec>, pub group_size: Cell, pub min_level_size: Cell, @@ -412,7 +412,8 @@ pub(crate) mod test_helpers { let group_size = group_size.clamp(2, 127); let max_group_size = std::cmp::min(127, std::cmp::max(group_size * 2, max_group_size)); // 2*group_size <= x <= 127 let min_level_size = std::cmp::max(1, min_level_size); // 1 <= x <= inf - let mut options = heed::EnvOpenOptions::new(); + let options = heed::EnvOpenOptions::new(); + let mut options = options.read_txn_without_tls(); let options = options.map_size(4096 * 4 * 1000 * 100); let tempdir = tempfile::TempDir::new().unwrap(); let env = unsafe { options.open(tempdir.path()) }.unwrap(); diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index ae082284a..a0228e9cf 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -520,7 +520,14 @@ where pool.install(|| { let mut writer = ArroyWrapper::new(vector_arroy, embedder_index, was_quantized); - writer.build_and_quantize(wtxn, &mut rng, dimension, is_quantizing, cancel)?; + writer.build_and_quantize( + wtxn, + &mut rng, + dimension, + is_quantizing, + self.indexer_config.max_memory, + cancel, + )?; Result::Ok(()) }) .map_err(InternalError::from)??; diff --git a/crates/milli/src/update/new/indexer/document_changes.rs b/crates/milli/src/update/new/indexer/document_changes.rs index f77ac7658..a2388a662 100644 --- a/crates/milli/src/update/new/indexer/document_changes.rs +++ b/crates/milli/src/update/new/indexer/document_changes.rs @@ -3,7 +3,7 @@ use std::sync::atomic::Ordering; use std::sync::{Arc, RwLock}; use bumpalo::Bump; -use heed::RoTxn; +use heed::{RoTxn, WithoutTls}; use rayon::iter::IndexedParallelIterator; use super::super::document_change::DocumentChange; @@ -28,7 +28,7 @@ pub struct DocumentChangeContext< /// inside of the DB. pub db_fields_ids_map: &'indexer FieldsIdsMap, /// A transaction providing data from the DB before all indexing operations - pub rtxn: RoTxn<'indexer>, + pub rtxn: RoTxn<'indexer, WithoutTls>, /// Global field id map that is up to date with the current state of the indexing process. /// diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 1cd227139..d002317ca 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -62,6 +62,8 @@ where let mut bbbuffers = Vec::new(); let finished_extraction = AtomicBool::new(false); + let arroy_memory = grenad_parameters.max_memory; + // We reduce the actual memory used to 5%. The reason we do this here and not in Meilisearch // is because we still use the old indexer for the settings and it is highly impacted by the // max memory. So we keep the changes here and will remove these changes once we use the new @@ -200,6 +202,7 @@ where index, wtxn, index_embeddings, + arroy_memory, &mut arroy_writers, &indexing_context.must_stop_processing, ) diff --git a/crates/milli/src/update/new/indexer/write.rs b/crates/milli/src/update/new/indexer/write.rs index a8bd3217f..d3fa5e182 100644 --- a/crates/milli/src/update/new/indexer/write.rs +++ b/crates/milli/src/update/new/indexer/write.rs @@ -101,6 +101,7 @@ pub fn build_vectors( index: &Index, wtxn: &mut RwTxn<'_>, index_embeddings: Vec, + arroy_memory: Option, arroy_writers: &mut HashMap, must_stop_processing: &MSP, ) -> Result<()> @@ -111,10 +112,18 @@ where return Ok(()); } - let mut rng = rand::rngs::StdRng::seed_from_u64(42); + let seed = rand::random(); + let mut rng = rand::rngs::StdRng::seed_from_u64(seed); for (_index, (_embedder_name, _embedder, writer, dimensions)) in arroy_writers { let dimensions = *dimensions; - writer.build_and_quantize(wtxn, &mut rng, dimensions, false, must_stop_processing)?; + writer.build_and_quantize( + wtxn, + &mut rng, + dimensions, + false, + arroy_memory, + must_stop_processing, + )?; } index.put_embedding_configs(wtxn, index_embeddings)?; diff --git a/crates/milli/src/update/upgrade/mod.rs b/crates/milli/src/update/upgrade/mod.rs index 98cad3dad..7c8dcf64a 100644 --- a/crates/milli/src/update/upgrade/mod.rs +++ b/crates/milli/src/update/upgrade/mod.rs @@ -1,15 +1,17 @@ mod v1_12; mod v1_13; +mod v1_14; use heed::RwTxn; use v1_12::{V1_12_3_To_V1_13_0, V1_12_To_V1_12_3}; -use v1_13::{V1_13_0_To_V1_13_1, V1_13_1_To_Current}; +use v1_13::{V1_13_0_To_V1_13_1, V1_13_1_To_Latest_V1_13}; +use v1_14::Latest_V1_13_To_Latest_V1_14; use crate::progress::{Progress, VariableNameStep}; use crate::{Index, InternalError, Result}; trait UpgradeIndex { - /// Returns true if the index scheduler must regenerate its cached stats + /// Returns `true` if the index scheduler must regenerate its cached stats. fn upgrade( &self, wtxn: &mut RwTxn, @@ -32,15 +34,17 @@ pub fn upgrade( &V1_12_To_V1_12_3 {}, &V1_12_3_To_V1_13_0 {}, &V1_13_0_To_V1_13_1 {}, - &V1_13_1_To_Current {}, + &V1_13_1_To_Latest_V1_13 {}, + &Latest_V1_13_To_Latest_V1_14 {}, ]; let start = match from { (1, 12, 0..=2) => 0, (1, 12, 3..) => 1, (1, 13, 0) => 2, + (1, 13, _) => 4, // We must handle the current version in the match because in case of a failure some index may have been upgraded but not other. - (1, 13, _) => 3, + (1, 14, _) => 4, (major, minor, patch) => { return Err(InternalError::CannotUpgradeToVersion(major, minor, patch).into()) } @@ -50,7 +54,6 @@ pub fn upgrade( let upgrade_path = &upgrade_functions[start..]; let mut current_version = from; - let mut regenerate_stats = false; for (i, upgrade) in upgrade_path.iter().enumerate() { let target = upgrade.target_version(); diff --git a/crates/milli/src/update/upgrade/v1_13.rs b/crates/milli/src/update/upgrade/v1_13.rs index f1d56d9cb..8e5e052bd 100644 --- a/crates/milli/src/update/upgrade/v1_13.rs +++ b/crates/milli/src/update/upgrade/v1_13.rs @@ -37,9 +37,9 @@ impl UpgradeIndex for V1_13_0_To_V1_13_1 { } #[allow(non_camel_case_types)] -pub(super) struct V1_13_1_To_Current(); +pub(super) struct V1_13_1_To_Latest_V1_13(); -impl UpgradeIndex for V1_13_1_To_Current { +impl UpgradeIndex for V1_13_1_To_Latest_V1_13 { fn upgrade( &self, _wtxn: &mut RwTxn, diff --git a/crates/milli/src/update/upgrade/v1_14.rs b/crates/milli/src/update/upgrade/v1_14.rs new file mode 100644 index 000000000..039734b75 --- /dev/null +++ b/crates/milli/src/update/upgrade/v1_14.rs @@ -0,0 +1,41 @@ +use arroy::distances::Cosine; +use heed::RwTxn; + +use super::UpgradeIndex; +use crate::progress::Progress; +use crate::{make_enum_progress, Index, Result}; + +#[allow(non_camel_case_types)] +pub(super) struct Latest_V1_13_To_Latest_V1_14(); + +impl UpgradeIndex for Latest_V1_13_To_Latest_V1_14 { + fn upgrade( + &self, + wtxn: &mut RwTxn, + index: &Index, + _original: (u32, u32, u32), + progress: Progress, + ) -> Result { + make_enum_progress! { + enum VectorStore { + UpdateInternalVersions, + } + }; + + progress.update_progress(VectorStore::UpdateInternalVersions); + + let rtxn = index.read_txn()?; + arroy::upgrade::from_0_5_to_0_6::( + &rtxn, + index.vector_arroy.remap_data_type(), + wtxn, + index.vector_arroy.remap_data_type(), + )?; + + Ok(false) + } + + fn target_version(&self) -> (u32, u32, u32) { + (1, 14, 0) + } +} diff --git a/crates/milli/src/vector/mod.rs b/crates/milli/src/vector/mod.rs index f67912b89..80efc210d 100644 --- a/crates/milli/src/vector/mod.rs +++ b/crates/milli/src/vector/mod.rs @@ -86,6 +86,7 @@ impl ArroyWrapper { rng: &mut R, dimension: usize, quantizing: bool, + arroy_memory: Option, cancel: &(impl Fn() -> bool + Sync + Send), ) -> Result<(), arroy::Error> { for index in arroy_db_range_for_embedder(self.embedder_index) { @@ -105,9 +106,17 @@ impl ArroyWrapper { // sensitive. if quantizing && !self.quantized { let writer = writer.prepare_changing_distance::(wtxn)?; - writer.builder(rng).cancel(cancel).build(wtxn)?; + writer + .builder(rng) + .available_memory(arroy_memory.unwrap_or(usize::MAX)) + .cancel(cancel) + .build(wtxn)?; } else if writer.need_build(wtxn)? { - writer.builder(rng).cancel(cancel).build(wtxn)?; + writer + .builder(rng) + .available_memory(arroy_memory.unwrap_or(usize::MAX)) + .cancel(cancel) + .build(wtxn)?; } else if writer.is_empty(wtxn)? { break; } diff --git a/crates/milli/tests/search/facet_distribution.rs b/crates/milli/tests/search/facet_distribution.rs index c5a61da9f..8934cbea4 100644 --- a/crates/milli/tests/search/facet_distribution.rs +++ b/crates/milli/tests/search/facet_distribution.rs @@ -12,7 +12,8 @@ use serde_json::{from_value, json}; #[test] fn test_facet_distribution_with_no_facet_values() { let path = tempfile::tempdir().unwrap(); - let mut options = EnvOpenOptions::new(); + let options = EnvOpenOptions::new(); + let mut options = options.read_txn_without_tls(); options.map_size(10 * 1024 * 1024); // 10 MB let index = Index::new(options, &path, true).unwrap(); diff --git a/crates/milli/tests/search/mod.rs b/crates/milli/tests/search/mod.rs index 72b124219..c4a94d815 100644 --- a/crates/milli/tests/search/mod.rs +++ b/crates/milli/tests/search/mod.rs @@ -34,7 +34,8 @@ pub const CONTENT: &str = include_str!("../assets/test_set.ndjson"); pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index { let path = tempfile::tempdir().unwrap(); - let mut options = EnvOpenOptions::new(); + let options = EnvOpenOptions::new(); + let mut options = options.read_txn_without_tls(); options.map_size(10 * 1024 * 1024); // 10 MB let index = Index::new(options, &path, true).unwrap(); diff --git a/crates/milli/tests/search/query_criteria.rs b/crates/milli/tests/search/query_criteria.rs index 3cc747f06..1acc89484 100644 --- a/crates/milli/tests/search/query_criteria.rs +++ b/crates/milli/tests/search/query_criteria.rs @@ -262,7 +262,8 @@ fn criteria_mixup() { #[test] fn criteria_ascdesc() { let path = tempfile::tempdir().unwrap(); - let mut options = EnvOpenOptions::new(); + let options = EnvOpenOptions::new(); + let mut options = options.read_txn_without_tls(); options.map_size(12 * 1024 * 1024); // 10 MB let index = Index::new(options, &path, true).unwrap(); diff --git a/crates/milli/tests/search/typo_tolerance.rs b/crates/milli/tests/search/typo_tolerance.rs index 837b5e6b2..3c0717063 100644 --- a/crates/milli/tests/search/typo_tolerance.rs +++ b/crates/milli/tests/search/typo_tolerance.rs @@ -108,7 +108,8 @@ fn test_typo_tolerance_two_typo() { #[test] fn test_typo_disabled_on_word() { let tmp = tempdir().unwrap(); - let mut options = EnvOpenOptions::new(); + let options = EnvOpenOptions::new(); + let mut options = options.read_txn_without_tls(); options.map_size(4096 * 100); let index = Index::new(options, tmp.path(), true).unwrap();