Compute budget

This commit is contained in:
Louis Dureuil 2023-02-15 12:30:46 +01:00
parent f1119f2dc2
commit a529bf160c
No known key found for this signature in database

View File

@ -32,7 +32,7 @@ pub type Result<T> = std::result::Result<T, Error>;
pub type TaskId = u32; pub type TaskId = u32;
use std::ops::{Bound, RangeBounds}; use std::ops::{Bound, RangeBounds};
use std::path::PathBuf; use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::Relaxed; use std::sync::atomic::Ordering::Relaxed;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
@ -365,9 +365,16 @@ impl IndexScheduler {
std::fs::create_dir_all(&options.indexes_path)?; std::fs::create_dir_all(&options.indexes_path)?;
std::fs::create_dir_all(&options.dumps_path)?; std::fs::create_dir_all(&options.dumps_path)?;
let task_db_size = clamp_to_page_size(options.task_db_size);
let budget = IndexBudget {
map_size: options.index_base_map_size,
index_count: options.index_count,
task_db_size,
};
let env = heed::EnvOpenOptions::new() let env = heed::EnvOpenOptions::new()
.max_dbs(10) .max_dbs(10)
.map_size(clamp_to_page_size(options.task_db_size)) .map_size(budget.task_db_size)
.open(options.tasks_path)?; .open(options.tasks_path)?;
let file_store = FileStore::new(&options.update_file_path)?; let file_store = FileStore::new(&options.update_file_path)?;
@ -387,9 +394,9 @@ impl IndexScheduler {
index_mapper: IndexMapper::new( index_mapper: IndexMapper::new(
&env, &env,
options.indexes_path, options.indexes_path,
options.index_base_map_size, budget.map_size,
options.index_growth_amount, options.index_growth_amount,
options.index_count, budget.index_count,
options.indexer_config, options.indexer_config,
)?, )?,
env, env,
@ -413,6 +420,65 @@ impl IndexScheduler {
Ok(this) Ok(this)
} }
fn index_budget(
tasks_path: &Path,
base_map_size: usize,
mut task_db_size: usize,
max_index_count: usize,
) -> IndexBudget {
let budget = utils::dichotomic_search(base_map_size, |map_size| {
Self::is_good_heed(tasks_path, map_size)
});
log::debug!("memmap budget: {budget}B");
let mut budget = budget / 2;
if task_db_size > (budget / 2) {
task_db_size = clamp_to_page_size(budget * 2 / 5);
log::debug!(
"Decreasing max size of task DB to {task_db_size}B due to constrained memory space"
);
}
budget -= task_db_size;
// won't be mutated again
let budget = budget;
let task_db_size = task_db_size;
log::debug!("index budget: {budget}B");
let mut index_count = budget / base_map_size;
if index_count < 2 {
// take a bit less than half than the budget to make sure we can always afford to open an index
let map_size = (budget * 2) / 5;
// single index of max budget
log::debug!("1 index of {map_size}B can be opened simultaneously.");
return IndexBudget { map_size, index_count: 1, task_db_size };
}
// give us some space for an additional index when the cache is already full
// decrement is OK because index_count >= 2.
index_count -= 1;
if index_count > max_index_count {
index_count = max_index_count;
}
log::debug!("Up to {index_count} indexes of {base_map_size}B opened simultaneously.");
IndexBudget { map_size: base_map_size, index_count, task_db_size }
}
fn is_good_heed(tasks_path: &Path, map_size: usize) -> bool {
if let Ok(env) =
heed::EnvOpenOptions::new().map_size(clamp_to_page_size(map_size)).open(tasks_path)
{
env.prepare_for_closing().wait();
true
} else {
// We're treating all errors equally here, not only allocation errors.
// This means there's a possiblity for the budget to lower due to errors different from allocation errors.
// For persistent errors, this is OK as long as the task db is then reopened normally without ignoring the error this time.
// For transient errors, this could lead to an instance with too low a budget.
// However transient errors are: 1) less likely than persistent errors 2) likely to cause other issues down the line anyway.
false
}
}
pub fn read_txn(&self) -> Result<RoTxn> { pub fn read_txn(&self) -> Result<RoTxn> {
self.env.read_txn().map_err(|e| e.into()) self.env.read_txn().map_err(|e| e.into())
} }
@ -1116,6 +1182,16 @@ pub enum TickOutcome {
WaitForSignal, WaitForSignal,
} }
/// How many indexes we can afford to have open simultaneously.
struct IndexBudget {
/// Map size of an index.
map_size: usize,
/// Maximum number of simultaneously opened indexes.
index_count: usize,
/// For very constrained systems we might need to reduce the base task_db_size so we can accept at least one index.
task_db_size: usize,
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::io::{BufWriter, Seek, Write}; use std::io::{BufWriter, Seek, Write};