mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-04 20:18:55 +01:00
Use LRU cache
This commit is contained in:
parent
fdf043580c
commit
80b060f920
@ -1,21 +1,20 @@
|
|||||||
use std::collections::hash_map::Entry;
|
use std::path::PathBuf;
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::path::{Path, PathBuf};
|
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
|
use std::time::Duration;
|
||||||
use std::{fs, thread};
|
use std::{fs, thread};
|
||||||
|
|
||||||
use log::error;
|
use log::error;
|
||||||
use meilisearch_types::heed::types::Str;
|
use meilisearch_types::heed::types::Str;
|
||||||
use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn};
|
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
|
||||||
use meilisearch_types::milli::update::IndexerConfig;
|
use meilisearch_types::milli::update::IndexerConfig;
|
||||||
use meilisearch_types::milli::Index;
|
use meilisearch_types::milli::Index;
|
||||||
use synchronoise::SignalEvent;
|
|
||||||
use time::OffsetDateTime;
|
use time::OffsetDateTime;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use self::IndexStatus::{Available, BeingDeleted, BeingResized};
|
use self::index_map::IndexMap;
|
||||||
|
use self::IndexStatus::{Available, BeingDeleted, Closing, Missing};
|
||||||
use crate::uuid_codec::UuidCodec;
|
use crate::uuid_codec::UuidCodec;
|
||||||
use crate::{clamp_to_page_size, Error, Result};
|
use crate::{Error, Result};
|
||||||
|
|
||||||
const INDEX_MAPPING: &str = "index-mapping";
|
const INDEX_MAPPING: &str = "index-mapping";
|
||||||
|
|
||||||
@ -26,52 +25,324 @@ const INDEX_MAPPING: &str = "index-mapping";
|
|||||||
/// 2. Opening indexes and storing references to these opened indexes
|
/// 2. Opening indexes and storing references to these opened indexes
|
||||||
/// 3. Accessing indexes through their uuid
|
/// 3. Accessing indexes through their uuid
|
||||||
/// 4. Mapping a user-defined name to each index uuid.
|
/// 4. Mapping a user-defined name to each index uuid.
|
||||||
|
///
|
||||||
|
/// # Implementation notes
|
||||||
|
///
|
||||||
|
/// An index exists as 3 bits of data:
|
||||||
|
/// 1. The index data on disk, that can exist in 3 states: Missing, Present, or BeingDeleted.
|
||||||
|
/// 2. The persistent database containing the association between the index' name and its UUID,
|
||||||
|
/// that can exist in 2 states: Missing or Present.
|
||||||
|
/// 3. The state of the index in the in-memory `IndexMap`, that can exist in multiple states:
|
||||||
|
/// - Missing
|
||||||
|
/// - Available
|
||||||
|
/// - Closing (because an index needs resizing or was evicted from the cache)
|
||||||
|
/// - BeingDeleted
|
||||||
|
///
|
||||||
|
/// All of this data should be kept consistent between index operations, which is achieved by the `IndexMapper`
|
||||||
|
/// with the use of the following primitives:
|
||||||
|
/// - A RwLock on the `IndexMap`.
|
||||||
|
/// - Transactions on the association database.
|
||||||
|
/// - ClosingEvent signals emitted when closing an environment.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct IndexMapper {
|
pub struct IndexMapper {
|
||||||
/// Keep track of the opened indexes. Used mainly by the index resolver.
|
/// Keep track of the opened indexes. Used mainly by the index resolver.
|
||||||
index_map: Arc<RwLock<HashMap<Uuid, IndexStatus>>>,
|
index_map: Arc<RwLock<IndexMap>>,
|
||||||
|
|
||||||
/// Map an index name with an index uuid currently available on disk.
|
/// Map an index name with an index uuid currently available on disk.
|
||||||
pub(crate) index_mapping: Database<Str, UuidCodec>,
|
pub(crate) index_mapping: Database<Str, UuidCodec>,
|
||||||
|
|
||||||
/// Path to the folder where the LMDB environments of each index are.
|
/// Path to the folder where the LMDB environments of each index are.
|
||||||
base_path: PathBuf,
|
base_path: PathBuf,
|
||||||
index_size: usize,
|
/// The map size an index is opened with on the first time.
|
||||||
|
index_base_map_size: usize,
|
||||||
|
/// The quantity by which the map size of an index is incremented upon reopening, in bytes.
|
||||||
|
index_growth_amount: usize,
|
||||||
pub indexer_config: Arc<IndexerConfig>,
|
pub indexer_config: Arc<IndexerConfig>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Whether the index is available for use or is forbidden to be inserted back in the index map
|
mod index_map {
|
||||||
#[allow(clippy::large_enum_variant)]
|
/// the map size to use when we don't succeed in reading it in indexes.
|
||||||
#[derive(Clone)]
|
const DEFAULT_MAP_SIZE: usize = 10 * 1024 * 1024 * 1024; // 10 GiB
|
||||||
pub enum IndexStatus {
|
|
||||||
/// Do not insert it back in the index map as it is currently being deleted.
|
|
||||||
BeingDeleted,
|
|
||||||
/// Temporarily do not insert the index in the index map as it is currently being resized.
|
|
||||||
BeingResized(Arc<SignalEvent>),
|
|
||||||
/// You can use the index without worrying about anything.
|
|
||||||
Available(Index),
|
|
||||||
}
|
|
||||||
|
|
||||||
impl IndexMapper {
|
use std::collections::BTreeMap;
|
||||||
pub fn new(
|
use std::path::Path;
|
||||||
env: &Env,
|
use std::time::Duration;
|
||||||
base_path: PathBuf,
|
|
||||||
index_size: usize,
|
use meilisearch_types::heed::{EnvClosingEvent, EnvOpenOptions};
|
||||||
indexer_config: IndexerConfig,
|
use meilisearch_types::milli::Index;
|
||||||
) -> Result<Self> {
|
use time::OffsetDateTime;
|
||||||
Ok(Self {
|
use uuid::Uuid;
|
||||||
index_map: Arc::default(),
|
|
||||||
index_mapping: env.create_database(Some(INDEX_MAPPING))?,
|
use super::IndexStatus::{self, Available, BeingDeleted, Closing, Missing};
|
||||||
base_path,
|
use crate::lru::{InsertionOutcome, LruMap};
|
||||||
index_size,
|
use crate::{clamp_to_page_size, Result};
|
||||||
indexer_config: Arc::new(indexer_config),
|
|
||||||
})
|
/// Keep an internally consistent view of the open indexes in memory.
|
||||||
|
///
|
||||||
|
/// This view is made of an LRU cache that will evict the least frequently used indexes when new indexes are opened.
|
||||||
|
/// Indexes that are being closed (for resizing or due to cache eviction) or deleted cannot be evicted from the cache and
|
||||||
|
/// are stored separately.
|
||||||
|
///
|
||||||
|
/// This view provides operations to change the state of the index as it is known in memory:
|
||||||
|
/// open an index (making it available for queries), close an index (specifying the new size it should be opened with),
|
||||||
|
/// delete an index.
|
||||||
|
///
|
||||||
|
/// External consistency with the other bits of data of an index is provided by the `IndexMapper` parent structure.
|
||||||
|
pub struct IndexMap {
|
||||||
|
/// A LRU map of indexes that are in the open state and available for queries.
|
||||||
|
available: LruMap<Uuid, Index>,
|
||||||
|
/// A map of indexes that are not available for queries, either because they are being deleted
|
||||||
|
/// or because they are being closed.
|
||||||
|
///
|
||||||
|
/// If they are being deleted, the UUID points to `None`.
|
||||||
|
unavailable: BTreeMap<Uuid, Option<ClosingIndex>>,
|
||||||
|
|
||||||
|
/// A monotonically increasing generation number, used to differentiate between multiple successive index closing requests.
|
||||||
|
///
|
||||||
|
/// Because multiple readers could be waiting on an index to close, the following could theoretically happen:
|
||||||
|
///
|
||||||
|
/// 1. Multiple readers wait for the index closing to occur.
|
||||||
|
/// 2. One of them "wins the race", takes the lock and then removes the index that finished closing from the map.
|
||||||
|
/// 3. The index is reopened, but must be closed again (such as being resized again).
|
||||||
|
/// 4. One reader that "lost the race" in (2) wakes up and tries to take the lock and remove the index from the map.
|
||||||
|
///
|
||||||
|
/// In that situation, the index may or may not have finished closing. The `generation` field allows to remember which
|
||||||
|
/// closing request was made, so the reader that "lost the race" has the old generation and will need to wait again for the index
|
||||||
|
/// to close.
|
||||||
|
generation: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct ClosingIndex {
|
||||||
|
uuid: Uuid,
|
||||||
|
closing_event: EnvClosingEvent,
|
||||||
|
map_size: usize,
|
||||||
|
generation: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ClosingIndex {
|
||||||
|
/// Waits for the index to be definitely closed.
|
||||||
|
///
|
||||||
|
/// To avoid blocking, users should relinquish their locks to the IndexMap before calling this function.
|
||||||
|
///
|
||||||
|
/// After the index is physically closed, the in memory map must still be updated to take this into account.
|
||||||
|
/// To do so, a `ReopenableIndex` is returned, that can be used to either definitely close or definitely open
|
||||||
|
/// the index without waiting anymore.
|
||||||
|
pub fn wait_timeout(self, timeout: Duration) -> Option<ReopenableIndex> {
|
||||||
|
self.closing_event.wait_timeout(timeout).then_some(ReopenableIndex {
|
||||||
|
uuid: self.uuid,
|
||||||
|
map_size: self.map_size,
|
||||||
|
generation: self.generation,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ReopenableIndex {
|
||||||
|
uuid: Uuid,
|
||||||
|
map_size: usize,
|
||||||
|
generation: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ReopenableIndex {
|
||||||
|
/// Attempts to reopen the index, which can result in the index being reopened again or not
|
||||||
|
/// (e.g. if another thread already opened and closed the index again).
|
||||||
|
///
|
||||||
|
/// Use get again on the IndexMap to get the updated status.
|
||||||
|
///
|
||||||
|
/// Fails if the underlying index creation fails.
|
||||||
|
///
|
||||||
|
/// # Status table
|
||||||
|
///
|
||||||
|
/// | Previous Status | New Status |
|
||||||
|
/// |-----------------|------------|
|
||||||
|
/// | Missing | Missing |
|
||||||
|
/// | BeingDeleted | BeingDeleted |
|
||||||
|
/// | Closing | Available or Closing depending on generation |
|
||||||
|
/// | Available | Available |
|
||||||
|
///
|
||||||
|
pub fn reopen(self, map: &mut IndexMap, path: &Path) -> Result<()> {
|
||||||
|
if let Closing(reopen) = map.get(&self.uuid) {
|
||||||
|
if reopen.generation != self.generation {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
map.unavailable.remove(&self.uuid);
|
||||||
|
map.create(&self.uuid, path, None, self.map_size)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempts to close the index, which may or may not result in the index being closed
|
||||||
|
/// (e.g. if another thread already reopened the index again).
|
||||||
|
///
|
||||||
|
/// Use get again on the IndexMap to get the updated status.
|
||||||
|
///
|
||||||
|
/// # Status table
|
||||||
|
///
|
||||||
|
/// | Previous Status | New Status |
|
||||||
|
/// |-----------------|------------|
|
||||||
|
/// | Missing | Missing |
|
||||||
|
/// | BeingDeleted | BeingDeleted |
|
||||||
|
/// | Closing | Missing or Closing depending on generation |
|
||||||
|
/// | Available | Available |
|
||||||
|
pub fn close(self, map: &mut IndexMap) {
|
||||||
|
if let Closing(reopen) = map.get(&self.uuid) {
|
||||||
|
if reopen.generation != self.generation {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
map.unavailable.remove(&self.uuid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IndexMap {
|
||||||
|
pub fn new(cap: usize) -> IndexMap {
|
||||||
|
Self { unavailable: Default::default(), available: LruMap::new(cap), generation: 0 }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Gets the current status of an index in the map.
|
||||||
|
///
|
||||||
|
/// If the index is available it can be accessed from the returned status.
|
||||||
|
pub fn get(&self, uuid: &Uuid) -> IndexStatus {
|
||||||
|
self.available
|
||||||
|
.get(uuid)
|
||||||
|
.map(|index| Available(index.clone()))
|
||||||
|
.unwrap_or_else(|| self.get_unavailable(uuid))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_unavailable(&self, uuid: &Uuid) -> IndexStatus {
|
||||||
|
match self.unavailable.get(uuid) {
|
||||||
|
Some(Some(reopen)) => Closing(reopen.clone()),
|
||||||
|
Some(None) => BeingDeleted,
|
||||||
|
None => Missing,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempts to create a new index that wasn't existing before.
|
||||||
|
///
|
||||||
|
/// # Status table
|
||||||
|
///
|
||||||
|
/// | Previous Status | New Status |
|
||||||
|
/// |-----------------|------------|
|
||||||
|
/// | Missing | Available |
|
||||||
|
/// | BeingDeleted | panics |
|
||||||
|
/// | Closing | panics |
|
||||||
|
/// | Available | panics |
|
||||||
|
///
|
||||||
|
pub fn create(
|
||||||
|
&mut self,
|
||||||
|
uuid: &Uuid,
|
||||||
|
path: &Path,
|
||||||
|
date: Option<(OffsetDateTime, OffsetDateTime)>,
|
||||||
|
map_size: usize,
|
||||||
|
) -> Result<Index> {
|
||||||
|
if !matches!(self.get_unavailable(uuid), Missing) {
|
||||||
|
panic!("Attempt to open an index that was unavailable");
|
||||||
|
}
|
||||||
|
let index = create_or_open_index(path, date, map_size)?;
|
||||||
|
match self.available.insert(*uuid, index.clone()) {
|
||||||
|
InsertionOutcome::InsertedNew => (),
|
||||||
|
InsertionOutcome::Evicted(evicted_uuid, evicted_index) => {
|
||||||
|
self.close(evicted_uuid, evicted_index, 0);
|
||||||
|
}
|
||||||
|
InsertionOutcome::Replaced(_) => {
|
||||||
|
panic!("Attempt to open an index that was already opened")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(index)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Increases the current generation. See documentation for this field.
|
||||||
|
///
|
||||||
|
/// In the unlikely event that the 2^64 generations would have been exhausted, we simply wrap-around.
|
||||||
|
///
|
||||||
|
/// For this to cause an issue, one should be able to stop a reader in time after it got a `ReopenableIndex` and before it takes the lock
|
||||||
|
/// to remove it from the unavailable map, and keep the reader in this frozen state for 2^64 closing of other indexes.
|
||||||
|
///
|
||||||
|
/// This seems overwhelmingly impossible to achieve in practice.
|
||||||
|
fn next_generation(&mut self) -> usize {
|
||||||
|
self.generation = self.generation.wrapping_add(1);
|
||||||
|
self.generation
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempts to close an index.
|
||||||
|
///
|
||||||
|
/// # Status table
|
||||||
|
///
|
||||||
|
/// | Previous Status | New Status |
|
||||||
|
/// |-----------------|------------|
|
||||||
|
/// | Missing | Missing |
|
||||||
|
/// | BeingDeleted | BeingDeleted |
|
||||||
|
/// | Closing | Closing |
|
||||||
|
/// | Available | Closing |
|
||||||
|
///
|
||||||
|
pub fn close_for_resize(&mut self, uuid: &Uuid, map_size_growth: usize) {
|
||||||
|
let Some(index) = self.available.remove(uuid) else { return; };
|
||||||
|
self.close(*uuid, index, map_size_growth);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn close(&mut self, uuid: Uuid, index: Index, map_size_growth: usize) {
|
||||||
|
let map_size = index.map_size().unwrap_or(DEFAULT_MAP_SIZE) + map_size_growth;
|
||||||
|
let closing_event = index.prepare_for_closing();
|
||||||
|
let generation = self.next_generation();
|
||||||
|
self.unavailable
|
||||||
|
.insert(uuid, Some(ClosingIndex { uuid, closing_event, map_size, generation }));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attempts to delete and index.
|
||||||
|
///
|
||||||
|
/// `end_deletion` must be called just after.
|
||||||
|
///
|
||||||
|
/// # Status table
|
||||||
|
///
|
||||||
|
/// | Previous Status | New Status | Return value |
|
||||||
|
/// |-----------------|------------|--------------|
|
||||||
|
/// | Missing | BeingDeleted | Ok(None) |
|
||||||
|
/// | BeingDeleted | BeingDeleted | Err(None) |
|
||||||
|
/// | Closing | Closing | Err(Some(reopen)) |
|
||||||
|
/// | Available | BeingDeleted | Ok(Some(env_closing_event)) |
|
||||||
|
pub fn start_deletion(
|
||||||
|
&mut self,
|
||||||
|
uuid: &Uuid,
|
||||||
|
) -> std::result::Result<Option<EnvClosingEvent>, Option<ClosingIndex>> {
|
||||||
|
if let Some(index) = self.available.remove(uuid) {
|
||||||
|
self.unavailable.insert(uuid, None);
|
||||||
|
return Ok(Some(index.prepare_for_closing()));
|
||||||
|
}
|
||||||
|
match self.unavailable.remove(uuid) {
|
||||||
|
Some(Some(reopen)) => Err(Some(reopen)),
|
||||||
|
Some(None) => Err(None),
|
||||||
|
None => Ok(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Marks that an index deletion finished.
|
||||||
|
///
|
||||||
|
/// Must be used after calling `start_deletion`.
|
||||||
|
///
|
||||||
|
/// # Status table
|
||||||
|
///
|
||||||
|
/// | Previous Status | New Status |
|
||||||
|
/// |-----------------|------------|
|
||||||
|
/// | Missing | Missing |
|
||||||
|
/// | BeingDeleted | Missing |
|
||||||
|
/// | Closing | panics |
|
||||||
|
/// | Available | panics |
|
||||||
|
pub fn end_deletion(&mut self, uuid: &Uuid) {
|
||||||
|
assert!(
|
||||||
|
self.available.get(uuid).is_none(),
|
||||||
|
"Attempt to finish deletion of an index that was not being deleted"
|
||||||
|
);
|
||||||
|
// Do not panic if the index was Missing or BeingDeleted
|
||||||
|
assert!(
|
||||||
|
!matches!(self.unavailable.remove(uuid), Some(Some(_))),
|
||||||
|
"Attempt to finish deletion of an index that was being closed"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create or open an index in the specified path.
|
/// Create or open an index in the specified path.
|
||||||
/// The path *must* exists or an error will be thrown.
|
/// The path *must* exist or an error will be thrown.
|
||||||
fn create_or_open_index(
|
fn create_or_open_index(
|
||||||
&self,
|
|
||||||
path: &Path,
|
path: &Path,
|
||||||
date: Option<(OffsetDateTime, OffsetDateTime)>,
|
date: Option<(OffsetDateTime, OffsetDateTime)>,
|
||||||
map_size: usize,
|
map_size: usize,
|
||||||
@ -86,6 +357,40 @@ impl IndexMapper {
|
|||||||
Ok(Index::new(options, path)?)
|
Ok(Index::new(options, path)?)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Whether the index is available for use or is forbidden to be inserted back in the index map
|
||||||
|
#[allow(clippy::large_enum_variant)]
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub enum IndexStatus {
|
||||||
|
/// Not currently in the index map.
|
||||||
|
Missing,
|
||||||
|
/// Do not insert it back in the index map as it is currently being deleted.
|
||||||
|
BeingDeleted,
|
||||||
|
/// Temporarily do not insert the index in the index map as it is currently being resized/evicted from the map.
|
||||||
|
Closing(index_map::ClosingIndex),
|
||||||
|
/// You can use the index without worrying about anything.
|
||||||
|
Available(Index),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IndexMapper {
|
||||||
|
pub fn new(
|
||||||
|
env: &Env,
|
||||||
|
base_path: PathBuf,
|
||||||
|
index_base_map_size: usize,
|
||||||
|
index_growth_amount: usize,
|
||||||
|
index_count: usize,
|
||||||
|
indexer_config: IndexerConfig,
|
||||||
|
) -> Result<Self> {
|
||||||
|
Ok(Self {
|
||||||
|
index_map: Arc::new(RwLock::new(IndexMap::new(index_count))),
|
||||||
|
index_mapping: env.create_database(Some(INDEX_MAPPING))?,
|
||||||
|
base_path,
|
||||||
|
index_base_map_size,
|
||||||
|
index_growth_amount,
|
||||||
|
indexer_config: Arc::new(indexer_config),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// Get or create the index.
|
/// Get or create the index.
|
||||||
pub fn create_index(
|
pub fn create_index(
|
||||||
@ -106,16 +411,17 @@ impl IndexMapper {
|
|||||||
let index_path = self.base_path.join(uuid.to_string());
|
let index_path = self.base_path.join(uuid.to_string());
|
||||||
fs::create_dir_all(&index_path)?;
|
fs::create_dir_all(&index_path)?;
|
||||||
|
|
||||||
let index = self.create_or_open_index(&index_path, date, self.index_size)?;
|
|
||||||
|
|
||||||
wtxn.commit()?;
|
|
||||||
// Error if the UUIDv4 somehow already exists in the map, since it should be fresh.
|
// Error if the UUIDv4 somehow already exists in the map, since it should be fresh.
|
||||||
// This is very unlikely to happen in practice.
|
// This is very unlikely to happen in practice.
|
||||||
// TODO: it would be better to lazily create the index. But we need an Index::open function for milli.
|
// TODO: it would be better to lazily create the index. But we need an Index::open function for milli.
|
||||||
if self.index_map.write().unwrap().insert(uuid, Available(index.clone())).is_some()
|
let index = self.index_map.write().unwrap().create(
|
||||||
{
|
&uuid,
|
||||||
panic!("Uuid v4 conflict: index with UUID {uuid} already exists.");
|
&index_path,
|
||||||
}
|
date,
|
||||||
|
self.index_base_map_size,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
wtxn.commit()?;
|
||||||
|
|
||||||
Ok(index)
|
Ok(index)
|
||||||
}
|
}
|
||||||
@ -135,23 +441,42 @@ impl IndexMapper {
|
|||||||
assert!(self.index_mapping.delete(&mut wtxn, name)?);
|
assert!(self.index_mapping.delete(&mut wtxn, name)?);
|
||||||
|
|
||||||
wtxn.commit()?;
|
wtxn.commit()?;
|
||||||
// We remove the index from the in-memory index map.
|
|
||||||
|
let mut tries = 0;
|
||||||
|
// Attempts to remove the index from the in-memory index map in a loop.
|
||||||
|
//
|
||||||
|
// If the index is currently being closed, we will wait for it to be closed and retry getting it in a subsequent
|
||||||
|
// loop iteration.
|
||||||
|
//
|
||||||
|
// We make 100 attempts before giving up.
|
||||||
|
// This could happen in the following situations:
|
||||||
|
//
|
||||||
|
// 1. There is a bug preventing the index from being correctly closed, or us from detecting this.
|
||||||
|
// 2. A user of the index is keeping it open for more than 600 seconds. This could happen e.g. during a pathological search.
|
||||||
|
// This can not be caused by indexation because deleting an index happens in the scheduler itself, so cannot be concurrent with indexation.
|
||||||
|
//
|
||||||
|
// In these situations, reporting the error through a panic is in order.
|
||||||
let closing_event = loop {
|
let closing_event = loop {
|
||||||
let mut lock = self.index_map.write().unwrap();
|
let mut lock = self.index_map.write().unwrap();
|
||||||
let resize_operation = match lock.insert(uuid, BeingDeleted) {
|
match lock.start_deletion(&uuid) {
|
||||||
Some(Available(index)) => break Some(index.prepare_for_closing()),
|
Ok(env_closing) => break env_closing,
|
||||||
// The target index is in the middle of a resize operation.
|
Err(Some(reopen)) => {
|
||||||
// Wait for this operation to complete, then try again.
|
// drop the lock here so that we don't synchronously wait for the index to close.
|
||||||
Some(BeingResized(resize_operation)) => resize_operation.clone(),
|
drop(lock);
|
||||||
// The index is already being deleted or doesn't exist.
|
tries += 1;
|
||||||
// It's OK to remove it from the map again.
|
if tries >= 100 {
|
||||||
_ => break None,
|
panic!("Too many attempts to close index {name} prior to deletion.")
|
||||||
};
|
}
|
||||||
|
let reopen = if let Some(reopen) = reopen.wait_timeout(Duration::from_secs(6)) {
|
||||||
// Avoiding deadlocks: we need to drop the lock before waiting for the end of the resize, which
|
reopen
|
||||||
// will involve operations on the very map we're locking.
|
} else {
|
||||||
drop(lock);
|
continue;
|
||||||
resize_operation.wait();
|
};
|
||||||
|
reopen.close(&mut self.index_map.write().unwrap());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Err(None) => return Ok(()),
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let index_map = self.index_map.clone();
|
let index_map = self.index_map.clone();
|
||||||
@ -161,7 +486,7 @@ impl IndexMapper {
|
|||||||
.name(String::from("index_deleter"))
|
.name(String::from("index_deleter"))
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
// We first wait to be sure that the previously opened index is effectively closed.
|
// We first wait to be sure that the previously opened index is effectively closed.
|
||||||
// This can take a lot of time, this is why we do that in a seperate thread.
|
// This can take a lot of time, this is why we do that in a separate thread.
|
||||||
if let Some(closing_event) = closing_event {
|
if let Some(closing_event) = closing_event {
|
||||||
closing_event.wait();
|
closing_event.wait();
|
||||||
}
|
}
|
||||||
@ -175,7 +500,7 @@ impl IndexMapper {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Finally we remove the entry from the index map.
|
// Finally we remove the entry from the index map.
|
||||||
assert!(matches!(index_map.write().unwrap().remove(&uuid), Some(BeingDeleted)));
|
index_map.write().unwrap().end_deletion(&uuid);
|
||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
@ -195,76 +520,15 @@ impl IndexMapper {
|
|||||||
/// - If the Index corresponding to the passed name is concurrently being deleted/resized or cannot be found in the
|
/// - If the Index corresponding to the passed name is concurrently being deleted/resized or cannot be found in the
|
||||||
/// in memory hash map.
|
/// in memory hash map.
|
||||||
pub fn resize_index(&self, rtxn: &RoTxn, name: &str) -> Result<()> {
|
pub fn resize_index(&self, rtxn: &RoTxn, name: &str) -> Result<()> {
|
||||||
// fixme: factor to a function?
|
|
||||||
let uuid = self
|
let uuid = self
|
||||||
.index_mapping
|
.index_mapping
|
||||||
.get(rtxn, name)?
|
.get(rtxn, name)?
|
||||||
.ok_or_else(|| Error::IndexNotFound(name.to_string()))?;
|
.ok_or_else(|| Error::IndexNotFound(name.to_string()))?;
|
||||||
|
|
||||||
// We remove the index from the in-memory index map.
|
// We remove the index from the in-memory index map.
|
||||||
let mut lock = self.index_map.write().unwrap();
|
self.index_map.write().unwrap().close_for_resize(&uuid, self.index_growth_amount);
|
||||||
// signal that will be sent when the resize operation completes
|
|
||||||
let resize_operation = Arc::new(SignalEvent::manual(false));
|
|
||||||
let index = match lock.insert(uuid, BeingResized(resize_operation)) {
|
|
||||||
Some(Available(index)) => index,
|
|
||||||
Some(previous_status) => {
|
|
||||||
lock.insert(uuid, previous_status);
|
|
||||||
panic!(
|
|
||||||
"Attempting to resize index {name} that is already being resized or deleted."
|
|
||||||
)
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
panic!("Could not find the status of index {name} in the in-memory index mapper.")
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
drop(lock);
|
Ok(())
|
||||||
|
|
||||||
let resize_succeeded = (move || {
|
|
||||||
let current_size = index.map_size()?;
|
|
||||||
let new_size = current_size * 2;
|
|
||||||
let closing_event = index.prepare_for_closing();
|
|
||||||
|
|
||||||
log::debug!("Waiting for index {name} to close");
|
|
||||||
|
|
||||||
if !closing_event.wait_timeout(std::time::Duration::from_secs(600)) {
|
|
||||||
// fail after 10 minutes waiting
|
|
||||||
panic!("Could not resize index {name} (unable to close it)");
|
|
||||||
}
|
|
||||||
|
|
||||||
log::info!("Resized index {name} from {current_size} to {new_size} bytes");
|
|
||||||
let index_path = self.base_path.join(uuid.to_string());
|
|
||||||
let index = self.create_or_open_index(&index_path, None, new_size)?;
|
|
||||||
Ok(index)
|
|
||||||
})();
|
|
||||||
|
|
||||||
// Put the map back to a consistent state.
|
|
||||||
// Even if there was an error we don't want to leave the map in an inconsistent state as it would cause
|
|
||||||
// deadlocks.
|
|
||||||
let mut lock = self.index_map.write().unwrap();
|
|
||||||
let (resize_operation, resize_succeeded) = match resize_succeeded {
|
|
||||||
Ok(index) => {
|
|
||||||
// insert the resized index
|
|
||||||
let Some(BeingResized(resize_operation)) = lock.insert(uuid, Available(index)) else {
|
|
||||||
panic!("Index state for index {name} was modified while it was being resized")
|
|
||||||
};
|
|
||||||
|
|
||||||
(resize_operation, Ok(()))
|
|
||||||
}
|
|
||||||
Err(error) => {
|
|
||||||
// there was an error, not much we can do... delete the index from the in-memory map to prevent future errors
|
|
||||||
let Some(BeingResized(resize_operation)) = lock.remove(&uuid) else {
|
|
||||||
panic!("Index state for index {name} was modified while it was being resized")
|
|
||||||
};
|
|
||||||
(resize_operation, Err(error))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// drop the lock before signaling completion so that other threads don't immediately await on the lock after waking up.
|
|
||||||
drop(lock);
|
|
||||||
resize_operation.signal();
|
|
||||||
|
|
||||||
resize_succeeded
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return an index, may open it if it wasn't already opened.
|
/// Return an index, may open it if it wasn't already opened.
|
||||||
@ -274,47 +538,68 @@ impl IndexMapper {
|
|||||||
.get(rtxn, name)?
|
.get(rtxn, name)?
|
||||||
.ok_or_else(|| Error::IndexNotFound(name.to_string()))?;
|
.ok_or_else(|| Error::IndexNotFound(name.to_string()))?;
|
||||||
|
|
||||||
// we clone here to drop the lock before entering the match
|
let mut tries = 0;
|
||||||
|
// attempts to open the index in a loop.
|
||||||
|
//
|
||||||
|
// If the index is currently being closed, we will wait for it to be closed and retry getting it in a subsequent
|
||||||
|
// loop iteration.
|
||||||
|
//
|
||||||
|
// We make 100 attempts before giving up.
|
||||||
|
// This could happen in the following situations:
|
||||||
|
//
|
||||||
|
// 1. There is a bug preventing the index from being correctly closed, or us from detecting it was.
|
||||||
|
// 2. A user of the index is keeping it open for more than 600 seconds. This could happen e.g. during a long indexation,
|
||||||
|
// a pathological search, and so on.
|
||||||
|
//
|
||||||
|
// In these situations, reporting the error through a panic is in order.
|
||||||
let index = loop {
|
let index = loop {
|
||||||
let index = self.index_map.read().unwrap().get(&uuid).cloned();
|
tries += 1;
|
||||||
|
if tries > 100 {
|
||||||
|
panic!("Too many spurious wake ups while trying to open the index {name}");
|
||||||
|
}
|
||||||
|
|
||||||
|
// we get the index here to drop the lock before entering the match
|
||||||
|
let index = self.index_map.read().unwrap().get(&uuid);
|
||||||
|
|
||||||
match index {
|
match index {
|
||||||
Some(Available(index)) => break index,
|
Available(index) => break index,
|
||||||
Some(BeingResized(ref resize_operation)) => {
|
Closing(reopen) => {
|
||||||
// Avoiding deadlocks: no lock taken while doing this operation.
|
// Avoiding deadlocks: no lock taken while doing this operation.
|
||||||
resize_operation.wait();
|
let reopen = if let Some(reopen) = reopen.wait_timeout(Duration::from_secs(6)) {
|
||||||
|
reopen
|
||||||
|
} else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let index_path = self.base_path.join(uuid.to_string());
|
||||||
|
// take the lock to reopen the environment.
|
||||||
|
reopen.reopen(&mut self.index_map.write().unwrap(), &index_path)?;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
Some(BeingDeleted) => return Err(Error::IndexNotFound(name.to_string())),
|
BeingDeleted => return Err(Error::IndexNotFound(name.to_string())),
|
||||||
// since we're lazy, it's possible that the index has not been opened yet.
|
// since we're lazy, it's possible that the index has not been opened yet.
|
||||||
None => {
|
Missing => {
|
||||||
let mut index_map = self.index_map.write().unwrap();
|
let mut index_map = self.index_map.write().unwrap();
|
||||||
// between the read lock and the write lock it's not impossible
|
// between the read lock and the write lock it's not impossible
|
||||||
// that someone already opened the index (eg if two search happens
|
// that someone already opened the index (eg if two searches happen
|
||||||
// at the same time), thus before opening it we check a second time
|
// at the same time), thus before opening it we check a second time
|
||||||
// if it's not already there.
|
// if it's not already there.
|
||||||
// Since there is a good chance it's not already there we can use
|
match index_map.get(&uuid) {
|
||||||
// the entry method.
|
Missing => {
|
||||||
match index_map.entry(uuid) {
|
|
||||||
Entry::Vacant(entry) => {
|
|
||||||
let index_path = self.base_path.join(uuid.to_string());
|
let index_path = self.base_path.join(uuid.to_string());
|
||||||
|
|
||||||
let index =
|
break index_map.create(
|
||||||
self.create_or_open_index(&index_path, None, self.index_size)?;
|
&uuid,
|
||||||
entry.insert(Available(index.clone()));
|
&index_path,
|
||||||
break index;
|
None,
|
||||||
|
self.index_base_map_size,
|
||||||
|
)?;
|
||||||
}
|
}
|
||||||
Entry::Occupied(entry) => match entry.get() {
|
Available(index) => break index,
|
||||||
Available(index) => break index.clone(),
|
Closing(_) => {
|
||||||
BeingResized(resize_operation) => {
|
// the reopening will be handled in the next loop operation
|
||||||
// Avoiding the deadlock: we drop the lock before waiting
|
continue;
|
||||||
let resize_operation = resize_operation.clone();
|
}
|
||||||
drop(index_map);
|
BeingDeleted => return Err(Error::IndexNotFound(name.to_string())),
|
||||||
resize_operation.wait();
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
BeingDeleted => return Err(Error::IndexNotFound(name.to_string())),
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -230,8 +230,12 @@ pub struct IndexSchedulerOptions {
|
|||||||
pub dumps_path: PathBuf,
|
pub dumps_path: PathBuf,
|
||||||
/// The maximum size, in bytes, of the task index.
|
/// The maximum size, in bytes, of the task index.
|
||||||
pub task_db_size: usize,
|
pub task_db_size: usize,
|
||||||
/// The maximum size, in bytes, of each meilisearch index.
|
/// The size, in bytes, with which a meilisearch index is opened the first time of each meilisearch index.
|
||||||
pub index_size: usize,
|
pub index_base_map_size: usize,
|
||||||
|
/// The size, in bytes, by which the map size of an index is increased when it resized due to being full.
|
||||||
|
pub index_growth_amount: usize,
|
||||||
|
/// The number of indexes that can be concurrently opened in memory.
|
||||||
|
pub index_count: usize,
|
||||||
/// Configuration used during indexing for each meilisearch index.
|
/// Configuration used during indexing for each meilisearch index.
|
||||||
pub indexer_config: IndexerConfig,
|
pub indexer_config: IndexerConfig,
|
||||||
/// Set to `true` iff the index scheduler is allowed to automatically
|
/// Set to `true` iff the index scheduler is allowed to automatically
|
||||||
@ -383,7 +387,9 @@ impl IndexScheduler {
|
|||||||
index_mapper: IndexMapper::new(
|
index_mapper: IndexMapper::new(
|
||||||
&env,
|
&env,
|
||||||
options.indexes_path,
|
options.indexes_path,
|
||||||
options.index_size,
|
options.index_base_map_size,
|
||||||
|
options.index_growth_amount,
|
||||||
|
options.index_count,
|
||||||
options.indexer_config,
|
options.indexer_config,
|
||||||
)?,
|
)?,
|
||||||
env,
|
env,
|
||||||
@ -1164,7 +1170,9 @@ mod tests {
|
|||||||
snapshots_path: tempdir.path().join("snapshots"),
|
snapshots_path: tempdir.path().join("snapshots"),
|
||||||
dumps_path: tempdir.path().join("dumps"),
|
dumps_path: tempdir.path().join("dumps"),
|
||||||
task_db_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
|
task_db_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
|
||||||
index_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
|
index_base_map_size: 1000 * 1000, // 1 MB, we don't use MiB on purpose.
|
||||||
|
index_growth_amount: 1000 * 1000, // 1 MB
|
||||||
|
index_count: 5,
|
||||||
indexer_config: IndexerConfig::default(),
|
indexer_config: IndexerConfig::default(),
|
||||||
autobatching_enabled,
|
autobatching_enabled,
|
||||||
};
|
};
|
||||||
|
@ -45,6 +45,33 @@ use option::ScheduleSnapshot;
|
|||||||
|
|
||||||
use crate::error::MeilisearchHttpError;
|
use crate::error::MeilisearchHttpError;
|
||||||
|
|
||||||
|
/// Default number of simultaneously opened indexes,
|
||||||
|
/// lower for Windows that dedicates a smaller virtual address space to processes.
|
||||||
|
///
|
||||||
|
/// The value was chosen this way:
|
||||||
|
///
|
||||||
|
/// - Windows provides a small virtual address space of about 10TiB to processes.
|
||||||
|
/// - The chosen value allows for indexes to reach a safe size of 1TiB.
|
||||||
|
/// - This can accomodate an unlimited number of indexes as long as they stay below 1TiB size.
|
||||||
|
#[cfg(windows)]
|
||||||
|
const DEFAULT_INDEX_COUNT: usize = 10;
|
||||||
|
/// Default number of simultaneously opened indexes.
|
||||||
|
///
|
||||||
|
/// The higher, the better for avoiding reopening indexes.
|
||||||
|
///
|
||||||
|
/// The value was chosen this way:
|
||||||
|
///
|
||||||
|
/// - Opening an index consumes a file descriptor.
|
||||||
|
/// - The default on many unices is about 256 file descriptors for a process.
|
||||||
|
/// - 100 is a little bit less than half this value.
|
||||||
|
///
|
||||||
|
/// In the future, this value could be computed from the dynamic number of allowed file descriptors for the current process.
|
||||||
|
///
|
||||||
|
/// On Unices, this value is largely irrelevant to virtual address space, because due to index resizing the indexes should take virtual memory in the same ballpark
|
||||||
|
/// as their disk size and it is unlikely for a user to have a sum of index weighing 128TB on a single Meilisearch node.
|
||||||
|
#[cfg(not(windows))]
|
||||||
|
const DEFAULT_INDEX_COUNT: usize = 100;
|
||||||
|
|
||||||
/// Check if a db is empty. It does not provide any information on the
|
/// Check if a db is empty. It does not provide any information on the
|
||||||
/// validity of the data in it.
|
/// validity of the data in it.
|
||||||
/// We consider a database as non empty when it's a non empty directory.
|
/// We consider a database as non empty when it's a non empty directory.
|
||||||
@ -206,9 +233,11 @@ fn open_or_create_database_unchecked(
|
|||||||
snapshots_path: opt.snapshot_dir.clone(),
|
snapshots_path: opt.snapshot_dir.clone(),
|
||||||
dumps_path: opt.dump_dir.clone(),
|
dumps_path: opt.dump_dir.clone(),
|
||||||
task_db_size: opt.max_task_db_size.get_bytes() as usize,
|
task_db_size: opt.max_task_db_size.get_bytes() as usize,
|
||||||
index_size: opt.max_index_size.get_bytes() as usize,
|
index_base_map_size: opt.max_index_size.get_bytes() as usize,
|
||||||
indexer_config: (&opt.indexer_options).try_into()?,
|
indexer_config: (&opt.indexer_options).try_into()?,
|
||||||
autobatching_enabled: true,
|
autobatching_enabled: true,
|
||||||
|
index_growth_amount: byte_unit::Byte::from_str("10GiB").unwrap().get_bytes() as usize,
|
||||||
|
index_count: DEFAULT_INDEX_COUNT,
|
||||||
})?)
|
})?)
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -65,11 +65,10 @@ const MEILI_MAX_INDEXING_THREADS: &str = "MEILI_MAX_INDEXING_THREADS";
|
|||||||
const DEFAULT_LOG_EVERY_N: usize = 100_000;
|
const DEFAULT_LOG_EVERY_N: usize = 100_000;
|
||||||
|
|
||||||
// Each environment (index and task-db) is taking space in the virtual address space.
|
// Each environment (index and task-db) is taking space in the virtual address space.
|
||||||
//
|
// When creating a new environment, it starts its life with 10GiB of virtual address space.
|
||||||
// The size of the virtual address space is limited by the OS. About 100TB for Linux and about 10TB for Windows.
|
// It is then later resized if needs be.
|
||||||
// This means that the number of indexes is limited to about 200 for Linux and about 20 for Windows.
|
pub const INDEX_SIZE: u64 = 10 * 1024 * 1024 * 1024; // 10 GiB
|
||||||
pub const INDEX_SIZE: u64 = 536_870_912_000; // 500 GiB
|
pub const TASK_DB_SIZE: u64 = 10 * 1024 * 1024 * 1024; // 10 GiB
|
||||||
pub const TASK_DB_SIZE: u64 = 10_737_418_240; // 10 GiB
|
|
||||||
|
|
||||||
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
|
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "UPPERCASE")]
|
#[serde(rename_all = "UPPERCASE")]
|
||||||
|
Loading…
Reference in New Issue
Block a user