diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index 655922785..66c516d9b 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -788,15 +788,15 @@ impl IndexScheduler { dump_tasks.flush()?; // 3. Dump the indexes - for (uid, index) in self.index_mapper.indexes(&rtxn)? { + self.index_mapper.try_for_each_index(&rtxn, |uid, index| -> Result<()> { let rtxn = index.read_txn()?; let metadata = IndexMetadata { - uid: uid.clone(), + uid: uid.to_owned(), primary_key: index.primary_key(&rtxn)?.map(String::from), created_at: index.created_at(&rtxn)?, updated_at: index.updated_at(&rtxn)?, }; - let mut index_dumper = dump.create_index(&uid, &metadata)?; + let mut index_dumper = dump.create_index(uid, &metadata)?; let fields_ids_map = index.fields_ids_map(&rtxn)?; let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); @@ -809,9 +809,10 @@ impl IndexScheduler { } // 3.2. Dump the settings - let settings = meilisearch_types::settings::settings(&index, &rtxn)?; + let settings = meilisearch_types::settings::settings(index, &rtxn)?; index_dumper.settings(&settings)?; - } + Ok(()) + })?; let dump_uid = started_at.format(format_description!( "[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]" diff --git a/index-scheduler/src/index_mapper/index_map.rs b/index-scheduler/src/index_mapper/index_map.rs new file mode 100644 index 000000000..d140d4944 --- /dev/null +++ b/index-scheduler/src/index_mapper/index_map.rs @@ -0,0 +1,370 @@ +/// the map size to use when we don't succeed in reading it in indexes. +const DEFAULT_MAP_SIZE: usize = 10 * 1024 * 1024 * 1024; // 10 GiB + +use std::collections::BTreeMap; +use std::path::Path; +use std::time::Duration; + +use meilisearch_types::heed::{EnvClosingEvent, EnvOpenOptions}; +use meilisearch_types::milli::Index; +use time::OffsetDateTime; +use uuid::Uuid; + +use super::IndexStatus::{self, Available, BeingDeleted, Closing, Missing}; +use crate::lru::{InsertionOutcome, LruMap}; +use crate::{clamp_to_page_size, Result}; + +/// Keep an internally consistent view of the open indexes in memory. +/// +/// This view is made of an LRU cache that will evict the least frequently used indexes when new indexes are opened. +/// Indexes that are being closed (for resizing or due to cache eviction) or deleted cannot be evicted from the cache and +/// are stored separately. +/// +/// This view provides operations to change the state of the index as it is known in memory: +/// open an index (making it available for queries), close an index (specifying the new size it should be opened with), +/// delete an index. +/// +/// External consistency with the other bits of data of an index is provided by the `IndexMapper` parent structure. +pub struct IndexMap { + /// A LRU map of indexes that are in the open state and available for queries. + available: LruMap, + /// A map of indexes that are not available for queries, either because they are being deleted + /// or because they are being closed. + /// + /// If they are being deleted, the UUID points to `None`. + unavailable: BTreeMap>, + + /// A monotonically increasing generation number, used to differentiate between multiple successive index closing requests. + /// + /// Because multiple readers could be waiting on an index to close, the following could theoretically happen: + /// + /// 1. Multiple readers wait for the index closing to occur. + /// 2. One of them "wins the race", takes the lock and then removes the index that finished closing from the map. + /// 3. The index is reopened, but must be closed again (such as being resized again). + /// 4. One reader that "lost the race" in (2) wakes up and tries to take the lock and remove the index from the map. + /// + /// In that situation, the index may or may not have finished closing. The `generation` field allows to remember which + /// closing request was made, so the reader that "lost the race" has the old generation and will need to wait again for the index + /// to close. + generation: usize, +} + +#[derive(Clone)] +pub struct ClosingIndex { + uuid: Uuid, + closing_event: EnvClosingEvent, + map_size: usize, + generation: usize, +} + +impl ClosingIndex { + /// Waits for the index to be definitely closed. + /// + /// To avoid blocking, users should relinquish their locks to the IndexMap before calling this function. + /// + /// After the index is physically closed, the in memory map must still be updated to take this into account. + /// To do so, a `ReopenableIndex` is returned, that can be used to either definitely close or definitely open + /// the index without waiting anymore. + pub fn wait_timeout(self, timeout: Duration) -> Option { + self.closing_event.wait_timeout(timeout).then_some(ReopenableIndex { + uuid: self.uuid, + map_size: self.map_size, + generation: self.generation, + }) + } +} + +pub struct ReopenableIndex { + uuid: Uuid, + map_size: usize, + generation: usize, +} + +impl ReopenableIndex { + /// Attempts to reopen the index, which can result in the index being reopened again or not + /// (e.g. if another thread already opened and closed the index again). + /// + /// Use get again on the IndexMap to get the updated status. + /// + /// Fails if the underlying index creation fails. + /// + /// # Status table + /// + /// | Previous Status | New Status | + /// |-----------------|----------------------------------------------| + /// | Missing | Missing | + /// | BeingDeleted | BeingDeleted | + /// | Closing | Available or Closing depending on generation | + /// | Available | Available | + /// + pub fn reopen(self, map: &mut IndexMap, path: &Path) -> Result<()> { + if let Closing(reopen) = map.get(&self.uuid) { + if reopen.generation != self.generation { + return Ok(()); + } + map.unavailable.remove(&self.uuid); + map.create(&self.uuid, path, None, self.map_size)?; + } + Ok(()) + } + + /// Attempts to close the index, which may or may not result in the index being closed + /// (e.g. if another thread already reopened the index again). + /// + /// Use get again on the IndexMap to get the updated status. + /// + /// # Status table + /// + /// | Previous Status | New Status | + /// |-----------------|--------------------------------------------| + /// | Missing | Missing | + /// | BeingDeleted | BeingDeleted | + /// | Closing | Missing or Closing depending on generation | + /// | Available | Available | + pub fn close(self, map: &mut IndexMap) { + if let Closing(reopen) = map.get(&self.uuid) { + if reopen.generation != self.generation { + return; + } + map.unavailable.remove(&self.uuid); + } + } +} + +impl IndexMap { + pub fn new(cap: usize) -> IndexMap { + Self { unavailable: Default::default(), available: LruMap::new(cap), generation: 0 } + } + + /// Gets the current status of an index in the map. + /// + /// If the index is available it can be accessed from the returned status. + pub fn get(&self, uuid: &Uuid) -> IndexStatus { + self.available + .get(uuid) + .map(|index| Available(index.clone())) + .unwrap_or_else(|| self.get_unavailable(uuid)) + } + + fn get_unavailable(&self, uuid: &Uuid) -> IndexStatus { + match self.unavailable.get(uuid) { + Some(Some(reopen)) => Closing(reopen.clone()), + Some(None) => BeingDeleted, + None => Missing, + } + } + + /// Attempts to create a new index that wasn't existing before. + /// + /// # Status table + /// + /// | Previous Status | New Status | + /// |-----------------|------------| + /// | Missing | Available | + /// | BeingDeleted | panics | + /// | Closing | panics | + /// | Available | panics | + /// + pub fn create( + &mut self, + uuid: &Uuid, + path: &Path, + date: Option<(OffsetDateTime, OffsetDateTime)>, + map_size: usize, + ) -> Result { + if !matches!(self.get_unavailable(uuid), Missing) { + panic!("Attempt to open an index that was unavailable"); + } + let index = create_or_open_index(path, date, map_size)?; + match self.available.insert(*uuid, index.clone()) { + InsertionOutcome::InsertedNew => (), + InsertionOutcome::Evicted(evicted_uuid, evicted_index) => { + self.close(evicted_uuid, evicted_index, 0); + } + InsertionOutcome::Replaced(_) => { + panic!("Attempt to open an index that was already opened") + } + } + Ok(index) + } + + /// Increases the current generation. See documentation for this field. + /// + /// In the unlikely event that the 2^64 generations would have been exhausted, we simply wrap-around. + /// + /// For this to cause an issue, one should be able to stop a reader in time after it got a `ReopenableIndex` and before it takes the lock + /// to remove it from the unavailable map, and keep the reader in this frozen state for 2^64 closing of other indexes. + /// + /// This seems overwhelmingly impossible to achieve in practice. + fn next_generation(&mut self) -> usize { + self.generation = self.generation.wrapping_add(1); + self.generation + } + + /// Attempts to close an index. + /// + /// # Status table + /// + /// | Previous Status | New Status | + /// |-----------------|---------------| + /// | Missing | Missing | + /// | BeingDeleted | BeingDeleted | + /// | Closing | Closing | + /// | Available | Closing | + /// + pub fn close_for_resize(&mut self, uuid: &Uuid, map_size_growth: usize) { + let Some(index) = self.available.remove(uuid) else { return; }; + self.close(*uuid, index, map_size_growth); + } + + fn close(&mut self, uuid: Uuid, index: Index, map_size_growth: usize) { + let map_size = index.map_size().unwrap_or(DEFAULT_MAP_SIZE) + map_size_growth; + let closing_event = index.prepare_for_closing(); + let generation = self.next_generation(); + self.unavailable + .insert(uuid, Some(ClosingIndex { uuid, closing_event, map_size, generation })); + } + + /// Attempts to delete and index. + /// + /// `end_deletion` must be called just after. + /// + /// # Status table + /// + /// | Previous Status | New Status | Return value | + /// |-----------------|--------------|-----------------------------| + /// | Missing | BeingDeleted | Ok(None) | + /// | BeingDeleted | BeingDeleted | Err(None) | + /// | Closing | Closing | Err(Some(reopen)) | + /// | Available | BeingDeleted | Ok(Some(env_closing_event)) | + pub fn start_deletion( + &mut self, + uuid: &Uuid, + ) -> std::result::Result, Option> { + if let Some(index) = self.available.remove(uuid) { + self.unavailable.insert(*uuid, None); + return Ok(Some(index.prepare_for_closing())); + } + match self.unavailable.remove(uuid) { + Some(Some(reopen)) => Err(Some(reopen)), + Some(None) => Err(None), + None => Ok(None), + } + } + + /// Marks that an index deletion finished. + /// + /// Must be used after calling `start_deletion`. + /// + /// # Status table + /// + /// | Previous Status | New Status | + /// |-----------------|------------| + /// | Missing | Missing | + /// | BeingDeleted | Missing | + /// | Closing | panics | + /// | Available | panics | + pub fn end_deletion(&mut self, uuid: &Uuid) { + assert!( + self.available.get(uuid).is_none(), + "Attempt to finish deletion of an index that was not being deleted" + ); + // Do not panic if the index was Missing or BeingDeleted + assert!( + !matches!(self.unavailable.remove(uuid), Some(Some(_))), + "Attempt to finish deletion of an index that was being closed" + ); + } +} + +/// Create or open an index in the specified path. +/// The path *must* exist or an error will be thrown. +fn create_or_open_index( + path: &Path, + date: Option<(OffsetDateTime, OffsetDateTime)>, + map_size: usize, +) -> Result { + let mut options = EnvOpenOptions::new(); + options.map_size(clamp_to_page_size(map_size)); + options.max_readers(1024); + + if let Some((created, updated)) = date { + Ok(Index::new_with_creation_dates(options, path, created, updated)?) + } else { + Ok(Index::new(options, path)?) + } +} + +/// Putting the tests of the LRU down there so we have access to the cache's private members +#[cfg(test)] +mod tests { + + use meilisearch_types::heed::Env; + use meilisearch_types::Index; + use uuid::Uuid; + + use super::super::IndexMapper; + use crate::tests::IndexSchedulerHandle; + use crate::utils::clamp_to_page_size; + use crate::IndexScheduler; + + impl IndexMapper { + fn test() -> (Self, Env, IndexSchedulerHandle) { + let (index_scheduler, handle) = IndexScheduler::test(true, vec![]); + (index_scheduler.index_mapper, index_scheduler.env, handle) + } + } + + fn check_first_unavailable(mapper: &IndexMapper, expected_uuid: Uuid, is_closing: bool) { + let index_map = mapper.index_map.read().unwrap(); + let (uuid, state) = index_map.unavailable.first_key_value().unwrap(); + assert_eq!(uuid, &expected_uuid); + assert_eq!(state.is_some(), is_closing); + } + + #[test] + fn evict_indexes() { + let (mapper, env, _handle) = IndexMapper::test(); + let mut uuids = vec![]; + // LRU cap + 1 + for i in 0..(5 + 1) { + let index_name = format!("index-{i}"); + let wtxn = env.write_txn().unwrap(); + mapper.create_index(wtxn, &index_name, None).unwrap(); + let txn = env.read_txn().unwrap(); + uuids.push(mapper.index_mapping.get(&txn, &index_name).unwrap().unwrap()); + } + // index-0 was evicted + check_first_unavailable(&mapper, uuids[0], true); + + // get back the evicted index + let wtxn = env.write_txn().unwrap(); + mapper.create_index(wtxn, "index-0", None).unwrap(); + + // Least recently used is now index-1 + check_first_unavailable(&mapper, uuids[1], true); + } + + #[test] + fn resize_index() { + let (mapper, env, _handle) = IndexMapper::test(); + let index = mapper.create_index(env.write_txn().unwrap(), "index", None).unwrap(); + assert_index_size(index, mapper.index_base_map_size); + + mapper.resize_index(&env.read_txn().unwrap(), "index").unwrap(); + + let index = mapper.create_index(env.write_txn().unwrap(), "index", None).unwrap(); + assert_index_size(index, mapper.index_base_map_size + mapper.index_growth_amount); + + mapper.resize_index(&env.read_txn().unwrap(), "index").unwrap(); + + let index = mapper.create_index(env.write_txn().unwrap(), "index", None).unwrap(); + assert_index_size(index, mapper.index_base_map_size + mapper.index_growth_amount * 2); + } + + fn assert_index_size(index: Index, expected: usize) { + let expected = clamp_to_page_size(expected); + let index_map_size = index.map_size().unwrap(); + assert_eq!(index_map_size, expected); + } +} diff --git a/index-scheduler/src/index_mapper.rs b/index-scheduler/src/index_mapper/mod.rs similarity index 50% rename from index-scheduler/src/index_mapper.rs rename to index-scheduler/src/index_mapper/mod.rs index d1fe7c57d..1693d12d7 100644 --- a/index-scheduler/src/index_mapper.rs +++ b/index-scheduler/src/index_mapper/mod.rs @@ -1,21 +1,22 @@ -use std::collections::hash_map::Entry; -use std::collections::HashMap; -use std::path::{Path, PathBuf}; +use std::path::PathBuf; use std::sync::{Arc, RwLock}; +use std::time::Duration; use std::{fs, thread}; use log::error; use meilisearch_types::heed::types::Str; -use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn}; +use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn}; use meilisearch_types::milli::update::IndexerConfig; use meilisearch_types::milli::Index; -use synchronoise::SignalEvent; use time::OffsetDateTime; use uuid::Uuid; -use self::IndexStatus::{Available, BeingDeleted, BeingResized}; +use self::index_map::IndexMap; +use self::IndexStatus::{Available, BeingDeleted, Closing, Missing}; use crate::uuid_codec::UuidCodec; -use crate::{clamp_to_page_size, Error, Result}; +use crate::{Error, Result}; + +mod index_map; const INDEX_MAPPING: &str = "index-mapping"; @@ -26,17 +27,38 @@ const INDEX_MAPPING: &str = "index-mapping"; /// 2. Opening indexes and storing references to these opened indexes /// 3. Accessing indexes through their uuid /// 4. Mapping a user-defined name to each index uuid. +/// +/// # Implementation notes +/// +/// An index exists as 3 bits of data: +/// 1. The index data on disk, that can exist in 3 states: Missing, Present, or BeingDeleted. +/// 2. The persistent database containing the association between the index' name and its UUID, +/// that can exist in 2 states: Missing or Present. +/// 3. The state of the index in the in-memory `IndexMap`, that can exist in multiple states: +/// - Missing +/// - Available +/// - Closing (because an index needs resizing or was evicted from the cache) +/// - BeingDeleted +/// +/// All of this data should be kept consistent between index operations, which is achieved by the `IndexMapper` +/// with the use of the following primitives: +/// - A RwLock on the `IndexMap`. +/// - Transactions on the association database. +/// - ClosingEvent signals emitted when closing an environment. #[derive(Clone)] pub struct IndexMapper { /// Keep track of the opened indexes. Used mainly by the index resolver. - index_map: Arc>>, + index_map: Arc>, /// Map an index name with an index uuid currently available on disk. pub(crate) index_mapping: Database, /// Path to the folder where the LMDB environments of each index are. base_path: PathBuf, - index_size: usize, + /// The map size an index is opened with on the first time. + index_base_map_size: usize, + /// The quantity by which the map size of an index is incremented upon reopening, in bytes. + index_growth_amount: usize, pub indexer_config: Arc, } @@ -44,10 +66,12 @@ pub struct IndexMapper { #[allow(clippy::large_enum_variant)] #[derive(Clone)] pub enum IndexStatus { + /// Not currently in the index map. + Missing, /// Do not insert it back in the index map as it is currently being deleted. BeingDeleted, - /// Temporarily do not insert the index in the index map as it is currently being resized. - BeingResized(Arc), + /// Temporarily do not insert the index in the index map as it is currently being resized/evicted from the map. + Closing(index_map::ClosingIndex), /// You can use the index without worrying about anything. Available(Index), } @@ -56,37 +80,21 @@ impl IndexMapper { pub fn new( env: &Env, base_path: PathBuf, - index_size: usize, + index_base_map_size: usize, + index_growth_amount: usize, + index_count: usize, indexer_config: IndexerConfig, ) -> Result { Ok(Self { - index_map: Arc::default(), + index_map: Arc::new(RwLock::new(IndexMap::new(index_count))), index_mapping: env.create_database(Some(INDEX_MAPPING))?, base_path, - index_size, + index_base_map_size, + index_growth_amount, indexer_config: Arc::new(indexer_config), }) } - /// Create or open an index in the specified path. - /// The path *must* exists or an error will be thrown. - fn create_or_open_index( - &self, - path: &Path, - date: Option<(OffsetDateTime, OffsetDateTime)>, - map_size: usize, - ) -> Result { - let mut options = EnvOpenOptions::new(); - options.map_size(clamp_to_page_size(map_size)); - options.max_readers(1024); - - if let Some((created, updated)) = date { - Ok(Index::new_with_creation_dates(options, path, created, updated)?) - } else { - Ok(Index::new(options, path)?) - } - } - /// Get or create the index. pub fn create_index( &self, @@ -106,16 +114,17 @@ impl IndexMapper { let index_path = self.base_path.join(uuid.to_string()); fs::create_dir_all(&index_path)?; - let index = self.create_or_open_index(&index_path, date, self.index_size)?; - - wtxn.commit()?; // Error if the UUIDv4 somehow already exists in the map, since it should be fresh. // This is very unlikely to happen in practice. // TODO: it would be better to lazily create the index. But we need an Index::open function for milli. - if self.index_map.write().unwrap().insert(uuid, Available(index.clone())).is_some() - { - panic!("Uuid v4 conflict: index with UUID {uuid} already exists."); - } + let index = self.index_map.write().unwrap().create( + &uuid, + &index_path, + date, + self.index_base_map_size, + )?; + + wtxn.commit()?; Ok(index) } @@ -135,23 +144,42 @@ impl IndexMapper { assert!(self.index_mapping.delete(&mut wtxn, name)?); wtxn.commit()?; - // We remove the index from the in-memory index map. + + let mut tries = 0; + // Attempts to remove the index from the in-memory index map in a loop. + // + // If the index is currently being closed, we will wait for it to be closed and retry getting it in a subsequent + // loop iteration. + // + // We make 100 attempts before giving up. + // This could happen in the following situations: + // + // 1. There is a bug preventing the index from being correctly closed, or us from detecting this. + // 2. A user of the index is keeping it open for more than 600 seconds. This could happen e.g. during a pathological search. + // This can not be caused by indexation because deleting an index happens in the scheduler itself, so cannot be concurrent with indexation. + // + // In these situations, reporting the error through a panic is in order. let closing_event = loop { let mut lock = self.index_map.write().unwrap(); - let resize_operation = match lock.insert(uuid, BeingDeleted) { - Some(Available(index)) => break Some(index.prepare_for_closing()), - // The target index is in the middle of a resize operation. - // Wait for this operation to complete, then try again. - Some(BeingResized(resize_operation)) => resize_operation.clone(), - // The index is already being deleted or doesn't exist. - // It's OK to remove it from the map again. - _ => break None, - }; - - // Avoiding deadlocks: we need to drop the lock before waiting for the end of the resize, which - // will involve operations on the very map we're locking. - drop(lock); - resize_operation.wait(); + match lock.start_deletion(&uuid) { + Ok(env_closing) => break env_closing, + Err(Some(reopen)) => { + // drop the lock here so that we don't synchronously wait for the index to close. + drop(lock); + tries += 1; + if tries >= 100 { + panic!("Too many attempts to close index {name} prior to deletion.") + } + let reopen = if let Some(reopen) = reopen.wait_timeout(Duration::from_secs(6)) { + reopen + } else { + continue; + }; + reopen.close(&mut self.index_map.write().unwrap()); + continue; + } + Err(None) => return Ok(()), + } }; let index_map = self.index_map.clone(); @@ -161,7 +189,7 @@ impl IndexMapper { .name(String::from("index_deleter")) .spawn(move || { // We first wait to be sure that the previously opened index is effectively closed. - // This can take a lot of time, this is why we do that in a seperate thread. + // This can take a lot of time, this is why we do that in a separate thread. if let Some(closing_event) = closing_event { closing_event.wait(); } @@ -175,7 +203,7 @@ impl IndexMapper { } // Finally we remove the entry from the index map. - assert!(matches!(index_map.write().unwrap().remove(&uuid), Some(BeingDeleted))); + index_map.write().unwrap().end_deletion(&uuid); }) .unwrap(); @@ -195,76 +223,15 @@ impl IndexMapper { /// - If the Index corresponding to the passed name is concurrently being deleted/resized or cannot be found in the /// in memory hash map. pub fn resize_index(&self, rtxn: &RoTxn, name: &str) -> Result<()> { - // fixme: factor to a function? let uuid = self .index_mapping .get(rtxn, name)? .ok_or_else(|| Error::IndexNotFound(name.to_string()))?; // We remove the index from the in-memory index map. - let mut lock = self.index_map.write().unwrap(); - // signal that will be sent when the resize operation completes - let resize_operation = Arc::new(SignalEvent::manual(false)); - let index = match lock.insert(uuid, BeingResized(resize_operation)) { - Some(Available(index)) => index, - Some(previous_status) => { - lock.insert(uuid, previous_status); - panic!( - "Attempting to resize index {name} that is already being resized or deleted." - ) - } - None => { - panic!("Could not find the status of index {name} in the in-memory index mapper.") - } - }; + self.index_map.write().unwrap().close_for_resize(&uuid, self.index_growth_amount); - drop(lock); - - let resize_succeeded = (move || { - let current_size = index.map_size()?; - let new_size = current_size * 2; - let closing_event = index.prepare_for_closing(); - - log::debug!("Waiting for index {name} to close"); - - if !closing_event.wait_timeout(std::time::Duration::from_secs(600)) { - // fail after 10 minutes waiting - panic!("Could not resize index {name} (unable to close it)"); - } - - log::info!("Resized index {name} from {current_size} to {new_size} bytes"); - let index_path = self.base_path.join(uuid.to_string()); - let index = self.create_or_open_index(&index_path, None, new_size)?; - Ok(index) - })(); - - // Put the map back to a consistent state. - // Even if there was an error we don't want to leave the map in an inconsistent state as it would cause - // deadlocks. - let mut lock = self.index_map.write().unwrap(); - let (resize_operation, resize_succeeded) = match resize_succeeded { - Ok(index) => { - // insert the resized index - let Some(BeingResized(resize_operation)) = lock.insert(uuid, Available(index)) else { - panic!("Index state for index {name} was modified while it was being resized") - }; - - (resize_operation, Ok(())) - } - Err(error) => { - // there was an error, not much we can do... delete the index from the in-memory map to prevent future errors - let Some(BeingResized(resize_operation)) = lock.remove(&uuid) else { - panic!("Index state for index {name} was modified while it was being resized") - }; - (resize_operation, Err(error)) - } - }; - - // drop the lock before signaling completion so that other threads don't immediately await on the lock after waking up. - drop(lock); - resize_operation.signal(); - - resize_succeeded + Ok(()) } /// Return an index, may open it if it wasn't already opened. @@ -274,47 +241,68 @@ impl IndexMapper { .get(rtxn, name)? .ok_or_else(|| Error::IndexNotFound(name.to_string()))?; - // we clone here to drop the lock before entering the match + let mut tries = 0; + // attempts to open the index in a loop. + // + // If the index is currently being closed, we will wait for it to be closed and retry getting it in a subsequent + // loop iteration. + // + // We make 100 attempts before giving up. + // This could happen in the following situations: + // + // 1. There is a bug preventing the index from being correctly closed, or us from detecting it was. + // 2. A user of the index is keeping it open for more than 600 seconds. This could happen e.g. during a long indexation, + // a pathological search, and so on. + // + // In these situations, reporting the error through a panic is in order. let index = loop { - let index = self.index_map.read().unwrap().get(&uuid).cloned(); + tries += 1; + if tries > 100 { + panic!("Too many spurious wake ups while trying to open the index {name}"); + } + + // we get the index here to drop the lock before entering the match + let index = self.index_map.read().unwrap().get(&uuid); match index { - Some(Available(index)) => break index, - Some(BeingResized(ref resize_operation)) => { + Available(index) => break index, + Closing(reopen) => { // Avoiding deadlocks: no lock taken while doing this operation. - resize_operation.wait(); + let reopen = if let Some(reopen) = reopen.wait_timeout(Duration::from_secs(6)) { + reopen + } else { + continue; + }; + let index_path = self.base_path.join(uuid.to_string()); + // take the lock to reopen the environment. + reopen.reopen(&mut self.index_map.write().unwrap(), &index_path)?; continue; } - Some(BeingDeleted) => return Err(Error::IndexNotFound(name.to_string())), + BeingDeleted => return Err(Error::IndexNotFound(name.to_string())), // since we're lazy, it's possible that the index has not been opened yet. - None => { + Missing => { let mut index_map = self.index_map.write().unwrap(); // between the read lock and the write lock it's not impossible - // that someone already opened the index (eg if two search happens + // that someone already opened the index (eg if two searches happen // at the same time), thus before opening it we check a second time // if it's not already there. - // Since there is a good chance it's not already there we can use - // the entry method. - match index_map.entry(uuid) { - Entry::Vacant(entry) => { + match index_map.get(&uuid) { + Missing => { let index_path = self.base_path.join(uuid.to_string()); - let index = - self.create_or_open_index(&index_path, None, self.index_size)?; - entry.insert(Available(index.clone())); - break index; + break index_map.create( + &uuid, + &index_path, + None, + self.index_base_map_size, + )?; } - Entry::Occupied(entry) => match entry.get() { - Available(index) => break index.clone(), - BeingResized(resize_operation) => { - // Avoiding the deadlock: we drop the lock before waiting - let resize_operation = resize_operation.clone(); - drop(index_map); - resize_operation.wait(); - continue; - } - BeingDeleted => return Err(Error::IndexNotFound(name.to_string())), - }, + Available(index) => break index, + Closing(_) => { + // the reopening will be handled in the next loop operation + continue; + } + BeingDeleted => return Err(Error::IndexNotFound(name.to_string())), } } } @@ -323,18 +311,38 @@ impl IndexMapper { Ok(index) } - /// Return all indexes, may open them if they weren't already opened. - pub fn indexes(&self, rtxn: &RoTxn) -> Result> { + /// Attempts `f` for each index that exists in the index mapper. + /// + /// It is preferable to use this function rather than a loop that opens all indexes, as a way to avoid having all indexes opened, + /// which is unsupported in general. + /// + /// Since `f` is allowed to return a result, and `Index` is cloneable, it is still possible to wrongly build e.g. a vector of + /// all the indexes, but this function makes it harder and so less likely to do accidentally. + pub fn try_for_each_index( + &self, + rtxn: &RoTxn, + mut f: impl FnMut(&str, &Index) -> Result, + ) -> Result + where + V: FromIterator, + { self.index_mapping .iter(rtxn)? - .map(|ret| { - ret.map_err(Error::from).and_then(|(name, _)| { - self.index(rtxn, name).map(|index| (name.to_string(), index)) - }) + .map(|res| { + res.map_err(Error::from) + .and_then(|(name, _)| self.index(rtxn, name).and_then(|index| f(name, &index))) }) .collect() } + /// Return the name of all indexes without opening them. + pub fn index_names(&self, rtxn: &RoTxn) -> Result> { + self.index_mapping + .iter(rtxn)? + .map(|res| res.map_err(Error::from).map(|(name, _)| name.to_string())) + .collect() + } + /// Swap two index names. pub fn swap(&self, wtxn: &mut RwTxn, lhs: &str, rhs: &str) -> Result<()> { let lhs_uuid = self diff --git a/index-scheduler/src/insta_snapshot.rs b/index-scheduler/src/insta_snapshot.rs index e8d07ee63..dcc348c98 100644 --- a/index-scheduler/src/insta_snapshot.rs +++ b/index-scheduler/src/insta_snapshot.rs @@ -254,6 +254,6 @@ pub fn snapshot_canceled_by( snap } pub fn snapshot_index_mapper(rtxn: &RoTxn, mapper: &IndexMapper) -> String { - let names = mapper.indexes(rtxn).unwrap().into_iter().map(|(n, _)| n).collect::>(); + let names = mapper.index_names(rtxn).unwrap(); format!("{names:?}") } diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 8c050a34f..65e9f63ff 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -24,6 +24,7 @@ pub mod error; mod index_mapper; #[cfg(test)] mod insta_snapshot; +mod lru; mod utils; mod uuid_codec; @@ -31,7 +32,7 @@ pub type Result = std::result::Result; pub type TaskId = u32; use std::ops::{Bound, RangeBounds}; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering::Relaxed; use std::sync::{Arc, RwLock}; @@ -229,8 +230,12 @@ pub struct IndexSchedulerOptions { pub dumps_path: PathBuf, /// The maximum size, in bytes, of the task index. pub task_db_size: usize, - /// The maximum size, in bytes, of each meilisearch index. - pub index_size: usize, + /// The size, in bytes, with which a meilisearch index is opened the first time of each meilisearch index. + pub index_base_map_size: usize, + /// The size, in bytes, by which the map size of an index is increased when it resized due to being full. + pub index_growth_amount: usize, + /// The number of indexes that can be concurrently opened in memory. + pub index_count: usize, /// Configuration used during indexing for each meilisearch index. pub indexer_config: IndexerConfig, /// Set to `true` iff the index scheduler is allowed to automatically @@ -360,9 +365,25 @@ impl IndexScheduler { std::fs::create_dir_all(&options.indexes_path)?; std::fs::create_dir_all(&options.dumps_path)?; + let task_db_size = clamp_to_page_size(options.task_db_size); + let budget = if options.indexer_config.skip_index_budget { + IndexBudget { + map_size: options.index_base_map_size, + index_count: options.index_count, + task_db_size, + } + } else { + Self::index_budget( + &options.tasks_path, + options.index_base_map_size, + task_db_size, + options.index_count, + ) + }; + let env = heed::EnvOpenOptions::new() .max_dbs(10) - .map_size(clamp_to_page_size(options.task_db_size)) + .map_size(budget.task_db_size) .open(options.tasks_path)?; let file_store = FileStore::new(&options.update_file_path)?; @@ -382,7 +403,9 @@ impl IndexScheduler { index_mapper: IndexMapper::new( &env, options.indexes_path, - options.index_size, + budget.map_size, + options.index_growth_amount, + budget.index_count, options.indexer_config, )?, env, @@ -406,6 +429,65 @@ impl IndexScheduler { Ok(this) } + fn index_budget( + tasks_path: &Path, + base_map_size: usize, + mut task_db_size: usize, + max_index_count: usize, + ) -> IndexBudget { + let budget = utils::dichotomic_search(base_map_size, |map_size| { + Self::is_good_heed(tasks_path, map_size) + }); + + log::debug!("memmap budget: {budget}B"); + let mut budget = budget / 2; + if task_db_size > (budget / 2) { + task_db_size = clamp_to_page_size(budget * 2 / 5); + log::debug!( + "Decreasing max size of task DB to {task_db_size}B due to constrained memory space" + ); + } + budget -= task_db_size; + + // won't be mutated again + let budget = budget; + let task_db_size = task_db_size; + + log::debug!("index budget: {budget}B"); + let mut index_count = budget / base_map_size; + if index_count < 2 { + // take a bit less than half than the budget to make sure we can always afford to open an index + let map_size = (budget * 2) / 5; + // single index of max budget + log::debug!("1 index of {map_size}B can be opened simultaneously."); + return IndexBudget { map_size, index_count: 1, task_db_size }; + } + // give us some space for an additional index when the cache is already full + // decrement is OK because index_count >= 2. + index_count -= 1; + if index_count > max_index_count { + index_count = max_index_count; + } + log::debug!("Up to {index_count} indexes of {base_map_size}B opened simultaneously."); + IndexBudget { map_size: base_map_size, index_count, task_db_size } + } + + fn is_good_heed(tasks_path: &Path, map_size: usize) -> bool { + if let Ok(env) = + heed::EnvOpenOptions::new().map_size(clamp_to_page_size(map_size)).open(tasks_path) + { + env.prepare_for_closing().wait(); + true + } else { + // We're treating all errors equally here, not only allocation errors. + // This means there's a possiblity for the budget to lower due to errors different from allocation errors. + // For persistent errors, this is OK as long as the task db is then reopened normally without ignoring the error this time. + // For transient errors, this could lead to an instance with too low a budget. + // However transient errors are: 1) less likely than persistent errors 2) likely to cause other issues down the line anyway. + false + } + } + pub fn read_txn(&self) -> Result { self.env.read_txn().map_err(|e| e.into()) } @@ -459,15 +541,42 @@ impl IndexScheduler { /// /// * If the index wasn't opened before, the index will be opened. /// * If the index doesn't exist on disk, the `IndexNotFoundError` is thrown. + /// + /// ### Note + /// + /// As an `Index` requires a large swath of the virtual memory address space, correct usage of an `Index` does not + /// keep its handle for too long. + /// + /// Some configurations also can't reasonably open multiple indexes at once. + /// If you need to fetch information from or perform an action on all indexes, + /// see the `try_for_each_index` function. pub fn index(&self, name: &str) -> Result { let rtxn = self.env.read_txn()?; self.index_mapper.index(&rtxn, name) } - /// Return and open all the indexes. - pub fn indexes(&self) -> Result> { + /// Return the name of all indexes without opening them. + pub fn index_names(self) -> Result> { let rtxn = self.env.read_txn()?; - self.index_mapper.indexes(&rtxn) + self.index_mapper.index_names(&rtxn) + } + + /// Attempts `f` for each index that exists known to the index scheduler. + /// + /// It is preferable to use this function rather than a loop that opens all indexes, as a way to avoid having all indexes opened, + /// which is unsupported in general. + /// + /// Since `f` is allowed to return a result, and `Index` is cloneable, it is still possible to wrongly build e.g. a vector of + /// all the indexes, but this function makes it harder and so less likely to do accidentally. + /// + /// If many indexes exist, this operation can take time to complete (in the order of seconds for a 1000 of indexes) as it needs to open + /// all the indexes. + pub fn try_for_each_index(&self, f: impl FnMut(&str, &Index) -> Result) -> Result + where + V: FromIterator, + { + let rtxn = self.env.read_txn()?; + self.index_mapper.try_for_each_index(&rtxn, f) } /// Return the task ids matched by the given query from the index scheduler's point of view. @@ -1109,6 +1218,16 @@ pub enum TickOutcome { WaitForSignal, } +/// How many indexes we can afford to have open simultaneously. +struct IndexBudget { + /// Map size of an index. + map_size: usize, + /// Maximum number of simultaneously opened indexes. + index_count: usize, + /// For very constrained systems we might need to reduce the base task_db_size so we can accept at least one index. + task_db_size: usize, +} + #[cfg(test)] mod tests { use std::io::{BufWriter, Seek, Write}; @@ -1154,6 +1273,8 @@ mod tests { let tempdir = TempDir::new().unwrap(); let (sender, receiver) = crossbeam::channel::bounded(0); + let indexer_config = IndexerConfig { skip_index_budget: true, ..Default::default() }; + let options = IndexSchedulerOptions { version_file_path: tempdir.path().join(VERSION_FILE_NAME), auth_path: tempdir.path().join("auth"), @@ -1163,8 +1284,10 @@ mod tests { snapshots_path: tempdir.path().join("snapshots"), dumps_path: tempdir.path().join("dumps"), task_db_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose. - index_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose. - indexer_config: IndexerConfig::default(), + index_base_map_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose. + index_growth_amount: 1000 * 1000, // 1 MB + index_count: 5, + indexer_config, autobatching_enabled, }; diff --git a/index-scheduler/src/lru.rs b/index-scheduler/src/lru.rs new file mode 100644 index 000000000..370ff5fe1 --- /dev/null +++ b/index-scheduler/src/lru.rs @@ -0,0 +1,203 @@ +//! Thread-safe `Vec`-backend LRU cache using [`std::sync::atomic::AtomicU64`] for synchronization. + +use std::sync::atomic::{AtomicU64, Ordering}; + +/// Thread-safe `Vec`-backend LRU cache +#[derive(Debug)] +pub struct Lru { + data: Vec<(AtomicU64, T)>, + generation: AtomicU64, + cap: usize, +} + +impl Lru { + /// Creates a new LRU cache with the specified capacity. + /// + /// The capacity is allocated up-front, and will never change through a [`Self::put`] operation. + /// + /// # Panics + /// + /// - If the capacity is 0. + /// - If the capacity exceeds `isize::MAX` bytes. + pub fn new(cap: usize) -> Self { + assert_ne!(cap, 0, "The capacity of a cache cannot be 0"); + Self { + // Note: since the element of the vector contains an AtomicU64, it is definitely not zero-sized so cap will never be usize::MAX. + data: Vec::with_capacity(cap), + generation: AtomicU64::new(0), + cap, + } + } + + /// The capacity of this LRU cache, that is the maximum number of elements it can hold before evicting elements from the cache. + /// + /// The cache will contain at most this number of elements at any given time. + pub fn capacity(&self) -> usize { + self.cap + } + + fn next_generation(&self) -> u64 { + // Acquire so this "happens-before" any potential store to a data cell (with Release ordering) + let generation = self.generation.fetch_add(1, Ordering::Acquire); + generation + 1 + } + + fn next_generation_mut(&mut self) -> u64 { + let generation = self.generation.get_mut(); + *generation += 1; + *generation + } + + /// Add a value in the cache, evicting an older value if necessary. + /// + /// If a value was evicted from the cache, it is returned. + /// + /// # Complexity + /// + /// - If the cache is full, then linear in the capacity. + /// - Otherwise constant. + pub fn put(&mut self, value: T) -> Option { + // no need for a memory fence: we assume that whichever mechanism provides us synchronization + // (very probably, a RwLock) takes care of fencing for us. + + let next_generation = self.next_generation_mut(); + let evicted = if self.is_full() { self.pop() } else { None }; + self.data.push((AtomicU64::new(next_generation), value)); + evicted + } + + /// Evict the oldest value from the cache. + /// + /// If the cache is empty, `None` will be returned. + /// + /// # Complexity + /// + /// - Linear in the capacity of the cache. + pub fn pop(&mut self) -> Option { + // Don't use `Iterator::min_by_key` that provides shared references to its elements, + // so that we can get an exclusive one. + // This allows to handles the `AtomicU64`s as normal integers without using atomic instructions. + let mut min_generation_index = None; + for (index, (generation, _)) in self.data.iter_mut().enumerate() { + let generation = *generation.get_mut(); + if let Some((_, min_generation)) = min_generation_index { + if min_generation > generation { + min_generation_index = Some((index, generation)); + } + } else { + min_generation_index = Some((index, generation)) + } + } + min_generation_index.map(|(min_index, _)| self.data.swap_remove(min_index).1) + } + + /// The current number of elements in the cache. + /// + /// This value is guaranteed to be less than or equal to [`Self::capacity`]. + pub fn len(&self) -> usize { + self.data.len() + } + + /// Returns `true` if putting any additional element in the cache would cause the eviction of an element. + pub fn is_full(&self) -> bool { + self.len() == self.capacity() + } +} + +pub struct LruMap(Lru<(K, V)>); + +impl LruMap +where + K: Eq, +{ + /// Creates a new LRU cache map with the specified capacity. + /// + /// The capacity is allocated up-front, and will never change through a [`Self::insert`] operation. + /// + /// # Panics + /// + /// - If the capacity is 0. + /// - If the capacity exceeds `isize::MAX` bytes. + pub fn new(cap: usize) -> Self { + Self(Lru::new(cap)) + } + + /// Gets a value in the cache map by its key. + /// + /// If no value matches, `None` will be returned. + /// + /// # Complexity + /// + /// - Linear in the capacity of the cache. + pub fn get(&self, key: &K) -> Option<&V> { + for (generation, (candidate, value)) in self.0.data.iter() { + if key == candidate { + generation.store(self.0.next_generation(), Ordering::Release); + return Some(value); + } + } + None + } + + /// Gets a value in the cache map by its key. + /// + /// If no value matches, `None` will be returned. + /// + /// # Complexity + /// + /// - Linear in the capacity of the cache. + pub fn get_mut(&mut self, key: &K) -> Option<&mut V> { + let next_generation = self.0.next_generation_mut(); + for (generation, (candidate, value)) in self.0.data.iter_mut() { + if key == candidate { + *generation.get_mut() = next_generation; + return Some(value); + } + } + None + } + + /// Inserts a value in the cache map by its key, replacing any existing value and returning any evicted value. + /// + /// # Complexity + /// + /// - Linear in the capacity of the cache. + pub fn insert(&mut self, key: K, mut value: V) -> InsertionOutcome { + match self.get_mut(&key) { + Some(old_value) => { + std::mem::swap(old_value, &mut value); + InsertionOutcome::Replaced(value) + } + None => match self.0.put((key, value)) { + Some((key, value)) => InsertionOutcome::Evicted(key, value), + None => InsertionOutcome::InsertedNew, + }, + } + } + + /// Removes an element from the cache map by its key, returning its value. + /// + /// Returns `None` if there was no element with this key in the cache. + /// + /// # Complexity + /// + /// - Linear in the capacity of the cache. + pub fn remove(&mut self, key: &K) -> Option { + for (index, (_, (candidate, _))) in self.0.data.iter_mut().enumerate() { + if key == candidate { + return Some(self.0.data.swap_remove(index).1 .1); + } + } + None + } +} + +/// The result of an insertion in a LRU map. +pub enum InsertionOutcome { + /// The key was not in the cache, the key-value pair has been inserted. + InsertedNew, + /// The key was not in the cache and an old key-value pair was evicted from the cache to make room for its insertions. + Evicted(K, V), + /// The key was already in the cache map, its value has been updated. + Replaced(V), +} diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs index acb520513..7718e1af0 100644 --- a/index-scheduler/src/utils.rs +++ b/index-scheduler/src/utils.rs @@ -538,3 +538,37 @@ impl IndexScheduler { } } } + +pub fn dichotomic_search(start_point: usize, mut is_good: impl FnMut(usize) -> bool) -> usize { + let mut biggest_good = None; + let mut smallest_bad = None; + let mut current = start_point; + loop { + let is_good = is_good(current); + + (biggest_good, smallest_bad, current) = match (biggest_good, smallest_bad, is_good) { + (None, None, false) => (None, Some(current), current / 2), + (None, None, true) => (Some(current), None, current * 2), + (None, Some(smallest_bad), true) => { + (Some(current), Some(smallest_bad), (current + smallest_bad) / 2) + } + (None, Some(_), false) => (None, Some(current), current / 2), + (Some(_), None, true) => (Some(current), None, current * 2), + (Some(biggest_good), None, false) => { + (Some(biggest_good), Some(current), (biggest_good + current) / 2) + } + (Some(_), Some(smallest_bad), true) => { + (Some(current), Some(smallest_bad), (smallest_bad + current) / 2) + } + (Some(biggest_good), Some(_), false) => { + (Some(biggest_good), Some(current), (biggest_good + current) / 2) + } + }; + if current == 0 { + return current; + } + if smallest_bad.is_some() && biggest_good.is_some() && biggest_good >= Some(current) { + return current; + } + } +} diff --git a/meilisearch/src/analytics/segment_analytics.rs b/meilisearch/src/analytics/segment_analytics.rs index 92ac4b1d5..472001004 100644 --- a/meilisearch/src/analytics/segment_analytics.rs +++ b/meilisearch/src/analytics/segment_analytics.rs @@ -292,7 +292,8 @@ impl From for Infos { ScheduleSnapshot::Enabled(interval) => Some(interval), }; - let IndexerOpts { max_indexing_memory, max_indexing_threads } = indexer_options; + let IndexerOpts { max_indexing_memory, max_indexing_threads, skip_index_budget: _ } = + indexer_options; // We're going to override every sensible information. // We consider information sensible if it contains a path, an address, or a key. diff --git a/meilisearch/src/lib.rs b/meilisearch/src/lib.rs index 71e7486f1..3352aa4f5 100644 --- a/meilisearch/src/lib.rs +++ b/meilisearch/src/lib.rs @@ -45,6 +45,34 @@ use option::ScheduleSnapshot; use crate::error::MeilisearchHttpError; +/// Default number of simultaneously opened indexes. +/// +/// This value is used when dynamic computation of how many indexes can be opened at once was skipped (e.g., in tests). +/// +/// Lower for Windows that dedicates a smaller virtual address space to processes. +/// +/// The value was chosen this way: +/// +/// - Windows provides a small virtual address space of about 10TiB to processes. +/// - The chosen value allows for indexes to use the default map size of 2TiB safely. +#[cfg(windows)] +const DEFAULT_INDEX_COUNT: usize = 4; + +/// Default number of simultaneously opened indexes. +/// +/// This value is used when dynamic computation of how many indexes can be opened at once was skipped (e.g., in tests). +/// +/// The higher, the better for avoiding reopening indexes. +/// +/// The value was chosen this way: +/// +/// - Opening an index consumes a file descriptor. +/// - The default on many unices is about 256 file descriptors for a process. +/// - 100 is a little bit less than half this value. +/// - The chosen value allows for indexes to use the default map size of 2TiB safely. +#[cfg(not(windows))] +const DEFAULT_INDEX_COUNT: usize = 20; + /// Check if a db is empty. It does not provide any information on the /// validity of the data in it. /// We consider a database as non empty when it's a non empty directory. @@ -206,9 +234,11 @@ fn open_or_create_database_unchecked( snapshots_path: opt.snapshot_dir.clone(), dumps_path: opt.dump_dir.clone(), task_db_size: opt.max_task_db_size.get_bytes() as usize, - index_size: opt.max_index_size.get_bytes() as usize, + index_base_map_size: opt.max_index_size.get_bytes() as usize, indexer_config: (&opt.indexer_options).try_into()?, autobatching_enabled: true, + index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize, + index_count: DEFAULT_INDEX_COUNT, })?) }; diff --git a/meilisearch/src/option.rs b/meilisearch/src/option.rs index 2a652d7c8..544827387 100644 --- a/meilisearch/src/option.rs +++ b/meilisearch/src/option.rs @@ -65,11 +65,11 @@ const MEILI_MAX_INDEXING_THREADS: &str = "MEILI_MAX_INDEXING_THREADS"; const DEFAULT_LOG_EVERY_N: usize = 100_000; // Each environment (index and task-db) is taking space in the virtual address space. -// -// The size of the virtual address space is limited by the OS. About 100TB for Linux and about 10TB for Windows. -// This means that the number of indexes is limited to about 200 for Linux and about 20 for Windows. -pub const INDEX_SIZE: u64 = 536_870_912_000; // 500 GiB -pub const TASK_DB_SIZE: u64 = 10_737_418_240; // 10 GiB +// Ideally, indexes can occupy 2TiB each to avoid having to manually resize them. +// The actual size of the virtual address space is computed at startup to determine how many 2TiB indexes can be +// opened simultaneously. +pub const INDEX_SIZE: u64 = 2 * 1024 * 1024 * 1024 * 1024; // 2 TiB +pub const TASK_DB_SIZE: u64 = 10 * 1024 * 1024 * 1024; // 10 GiB #[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)] #[serde(rename_all = "UPPERCASE")] @@ -494,12 +494,21 @@ pub struct IndexerOpts { #[clap(long, env = MEILI_MAX_INDEXING_THREADS, default_value_t)] #[serde(default)] pub max_indexing_threads: MaxThreads, + + /// Whether or not we want to determine the budget of virtual memory address space we have available dynamically + /// (the default), or statically. + /// + /// Determining the budget of virtual memory address space dynamically takes some time on some systems (such as macOS) + /// and may make tests non-deterministic, so we want to skip it in tests. + #[clap(skip)] + #[serde(skip)] + pub skip_index_budget: bool, } impl IndexerOpts { /// Exports the values to their corresponding env vars if they are not set. pub fn export_to_env(self) { - let IndexerOpts { max_indexing_memory, max_indexing_threads } = self; + let IndexerOpts { max_indexing_memory, max_indexing_threads, skip_index_budget: _ } = self; if let Some(max_indexing_memory) = max_indexing_memory.0 { export_to_env_if_not_present( MEILI_MAX_INDEXING_MEMORY, @@ -527,6 +536,7 @@ impl TryFrom<&IndexerOpts> for IndexerConfig { max_memory: other.max_indexing_memory.map(|b| b.get_bytes() as usize), thread_pool: Some(thread_pool), max_positions_per_attributes: None, + skip_index_budget: other.skip_index_budget, ..Default::default() }) } diff --git a/meilisearch/src/routes/indexes/mod.rs b/meilisearch/src/routes/indexes/mod.rs index da24a92ad..c5c168786 100644 --- a/meilisearch/src/routes/indexes/mod.rs +++ b/meilisearch/src/routes/indexes/mod.rs @@ -61,6 +61,8 @@ pub struct IndexView { impl IndexView { fn new(uid: String, index: &Index) -> Result { + // It is important that this function does not keep the Index handle or a clone of it, because + // `list_indexes` relies on this property to avoid opening all indexes at once. let rtxn = index.read_txn()?; Ok(IndexView { uid, @@ -90,13 +92,15 @@ pub async fn list_indexes( paginate: AwebQueryParameter, ) -> Result { let filters = index_scheduler.filters(); - let indexes: Vec<_> = index_scheduler.indexes()?; - let indexes = indexes - .into_iter() - .filter(|(name, _)| filters.is_index_authorized(name)) - .map(|(name, index)| IndexView::new(name, &index)) - .collect::, _>>()?; - + let indexes: Vec> = + index_scheduler.try_for_each_index(|uid, index| -> Result, _> { + if !filters.is_index_authorized(uid) { + return Ok(None); + } + Ok(Some(IndexView::new(uid.to_string(), index)?)) + })?; + // Won't cause to open all indexes because IndexView doesn't keep the `Index` opened. + let indexes: Vec = indexes.into_iter().flatten().collect(); let ret = paginate.as_pagination().auto_paginate_sized(indexes.into_iter()); debug!("returns: {:?}", ret); diff --git a/meilisearch/src/routes/mod.rs b/meilisearch/src/routes/mod.rs index 622e26c75..bd3dd0649 100644 --- a/meilisearch/src/routes/mod.rs +++ b/meilisearch/src/routes/mod.rs @@ -261,9 +261,9 @@ pub fn create_all_stats( )?; // accumulate the size of each indexes let processing_index = processing_task.first().and_then(|task| task.index_uid()); - for (name, index) in index_scheduler.indexes()? { - if !filters.is_index_authorized(&name) { - continue; + index_scheduler.try_for_each_index(|name, index| { + if !filters.is_index_authorized(name) { + return Ok(()); } database_size += index.on_disk_size()?; @@ -278,8 +278,9 @@ pub fn create_all_stats( let updated_at = index.updated_at(&rtxn)?; last_task = last_task.map_or(Some(updated_at), |last| Some(last.max(updated_at))); - indexes.insert(name, stats); - } + indexes.insert(name.to_string(), stats); + Ok(()) + })?; database_size += index_scheduler.size()?; database_size += auth_controller.size()?; diff --git a/meilisearch/tests/common/server.rs b/meilisearch/tests/common/server.rs index 8152edbd0..268b5c4b7 100644 --- a/meilisearch/tests/common/server.rs +++ b/meilisearch/tests/common/server.rs @@ -205,6 +205,7 @@ pub fn default_settings(dir: impl AsRef) -> Opt { indexer_options: IndexerOpts { // memory has to be unlimited because several meilisearch are running in test context. max_indexing_memory: MaxMemory::unlimited(), + skip_index_budget: true, ..Parser::parse_from(None as Option<&str>) }, #[cfg(feature = "metrics")] diff --git a/milli/src/update/indexer_config.rs b/milli/src/update/indexer_config.rs index af7211f90..ff7942fdb 100644 --- a/milli/src/update/indexer_config.rs +++ b/milli/src/update/indexer_config.rs @@ -11,6 +11,7 @@ pub struct IndexerConfig { pub chunk_compression_level: Option, pub thread_pool: Option, pub max_positions_per_attributes: Option, + pub skip_index_budget: bool, } impl Default for IndexerConfig { @@ -24,6 +25,7 @@ impl Default for IndexerConfig { chunk_compression_level: None, thread_pool: None, max_positions_per_attributes: None, + skip_index_budget: false, } } }