WIP: Still need to introduce a Env::copy_to_path method

This commit is contained in:
Kerollmops 2025-03-10 16:33:35 +01:00
parent b9da33fcbd
commit c63dae8267
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
17 changed files with 93 additions and 72 deletions

View File

@ -57,7 +57,8 @@ fn main() {
let opt = opt.clone();
let handle = std::thread::spawn(move || {
let mut options = EnvOpenOptions::new();
let options = EnvOpenOptions::new();
let mut options = options.read_txn_without_tls();
options.map_size(1024 * 1024 * 1024 * 1024);
let tempdir = match opt.path {
Some(path) => TempDir::new_in(path).unwrap(),

View File

@ -2,7 +2,7 @@ use std::sync::{Arc, RwLock};
use meilisearch_types::features::{InstanceTogglableFeatures, Network, RuntimeTogglableFeatures};
use meilisearch_types::heed::types::{SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, RwTxn};
use meilisearch_types::heed::{Database, Env, RwTxn, WithoutTls};
use crate::error::FeatureNotEnabledError;
use crate::Result;
@ -126,7 +126,7 @@ impl FeatureData {
}
pub fn new(
env: &Env,
env: &Env<WithoutTls>,
wtxn: &mut RwTxn,
instance_features: InstanceTogglableFeatures,
) -> Result<Self> {

View File

@ -304,7 +304,8 @@ fn create_or_open_index(
map_size: usize,
creation: bool,
) -> Result<Index> {
let mut options = EnvOpenOptions::new();
let options = EnvOpenOptions::new();
let mut options = options.read_txn_without_tls();
options.map_size(clamp_to_page_size(map_size));
// You can find more details about this experimental
@ -333,7 +334,7 @@ fn create_or_open_index(
#[cfg(test)]
mod tests {
use meilisearch_types::heed::Env;
use meilisearch_types::heed::{Env, WithoutTls};
use meilisearch_types::Index;
use uuid::Uuid;
@ -343,7 +344,7 @@ mod tests {
use crate::IndexScheduler;
impl IndexMapper {
fn test() -> (Self, Env, IndexSchedulerHandle) {
fn test() -> (Self, Env<WithoutTls>, IndexSchedulerHandle) {
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
(index_scheduler.index_mapper, index_scheduler.env, handle)
}

View File

@ -4,7 +4,7 @@ use std::time::Duration;
use std::{fs, thread};
use meilisearch_types::heed::types::{SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
use meilisearch_types::milli;
use meilisearch_types::milli::database_stats::DatabaseStats;
use meilisearch_types::milli::update::IndexerConfig;
@ -164,7 +164,7 @@ impl IndexMapper {
}
pub fn new(
env: &Env,
env: &Env<WithoutTls>,
wtxn: &mut RwTxn,
options: &IndexSchedulerOptions,
budget: IndexBudget,

View File

@ -54,7 +54,7 @@ use meilisearch_types::batches::Batch;
use meilisearch_types::features::{InstanceTogglableFeatures, Network, RuntimeTogglableFeatures};
use meilisearch_types::heed::byteorder::BE;
use meilisearch_types::heed::types::I128;
use meilisearch_types::heed::{self, Env, RoTxn};
use meilisearch_types::heed::{self, Env, RoTxn, WithoutTls};
use meilisearch_types::milli::index::IndexEmbeddingConfig;
use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs};
@ -131,7 +131,7 @@ pub struct IndexSchedulerOptions {
/// to be performed on them.
pub struct IndexScheduler {
/// The LMDB environment which the DBs are associated with.
pub(crate) env: Env,
pub(crate) env: Env<WithoutTls>,
/// The list of tasks currently processing
pub(crate) processing_tasks: Arc<RwLock<ProcessingTasks>>,
@ -240,10 +240,9 @@ impl IndexScheduler {
};
let env = unsafe {
heed::EnvOpenOptions::new()
.max_dbs(Self::nb_db())
.map_size(budget.task_db_size)
.open(&options.tasks_path)
let options = heed::EnvOpenOptions::new();
let mut options = options.read_txn_without_tls();
options.max_dbs(Self::nb_db()).map_size(budget.task_db_size).open(&options.tasks_path)
}?;
// We **must** starts by upgrading the version because it'll also upgrade the required database before we can open them
@ -358,7 +357,7 @@ impl IndexScheduler {
}
}
pub fn read_txn(&self) -> Result<RoTxn> {
pub fn read_txn(&self) -> Result<RoTxn<WithoutTls>> {
self.env.read_txn().map_err(|e| e.into())
}
@ -427,12 +426,14 @@ impl IndexScheduler {
/// If you need to fetch information from or perform an action on all indexes,
/// see the `try_for_each_index` function.
pub fn index(&self, name: &str) -> Result<Index> {
self.index_mapper.index(&self.env.read_txn()?, name)
let rtxn = self.env.read_txn()?;
self.index_mapper.index(&rtxn, name)
}
/// Return the boolean referring if index exists.
pub fn index_exists(&self, name: &str) -> Result<bool> {
self.index_mapper.index_exists(&self.env.read_txn()?, name)
let rtxn = self.env.read_txn()?;
self.index_mapper.index_exists(&rtxn, name)
}
/// Return the name of all indexes without opening them.
@ -507,7 +508,8 @@ impl IndexScheduler {
/// 2. The name of the specific data related to the property can be `enqueued` for the `statuses`, `settingsUpdate` for the `types`, or the name of the index for the `indexes`, for example.
/// 3. The number of times the properties appeared.
pub fn get_stats(&self) -> Result<BTreeMap<String, BTreeMap<String, u64>>> {
self.queue.get_stats(&self.read_txn()?, &self.processing_tasks.read().unwrap())
let rtxn = self.read_txn()?;
self.queue.get_stats(&rtxn, &self.processing_tasks.read().unwrap())
}
// Return true if there is at least one task that is processing.

View File

@ -3,7 +3,7 @@ use std::ops::{Bound, RangeBounds};
use meilisearch_types::batches::{Batch, BatchId};
use meilisearch_types::heed::types::{DecodeIgnore, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Kind, Status};
use roaring::{MultiOps, RoaringBitmap};
@ -66,7 +66,7 @@ impl BatchQueue {
NUMBER_OF_DATABASES
}
pub(super) fn new(env: &Env, wtxn: &mut RwTxn) -> Result<Self> {
pub(super) fn new(env: &Env<WithoutTls>, wtxn: &mut RwTxn) -> Result<Self> {
Ok(Self {
all_batches: env.create_database(wtxn, Some(db_name::ALL_BATCHES))?,
status: env.create_database(wtxn, Some(db_name::BATCH_STATUS))?,

View File

@ -13,7 +13,7 @@ use std::time::Duration;
use file_store::FileStore;
use meilisearch_types::batches::BatchId;
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
use meilisearch_types::milli::{CboRoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use roaring::RoaringBitmap;
@ -157,7 +157,7 @@ impl Queue {
/// Create an index scheduler and start its run loop.
pub(crate) fn new(
env: &Env,
env: &Env<WithoutTls>,
wtxn: &mut RwTxn,
options: &IndexSchedulerOptions,
) -> Result<Self> {

View File

@ -1,7 +1,7 @@
use std::ops::{Bound, RangeBounds};
use meilisearch_types::heed::types::{DecodeIgnore, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Kind, Status, Task};
use roaring::{MultiOps, RoaringBitmap};
@ -68,7 +68,7 @@ impl TaskQueue {
NUMBER_OF_DATABASES
}
pub(crate) fn new(env: &Env, wtxn: &mut RwTxn) -> Result<Self> {
pub(crate) fn new(env: &Env<WithoutTls>, wtxn: &mut RwTxn) -> Result<Self> {
Ok(Self {
all_tasks: env.create_database(wtxn, Some(db_name::ALL_TASKS))?,
status: env.create_database(wtxn, Some(db_name::STATUS))?,

View File

@ -28,7 +28,7 @@ impl IndexScheduler {
// 2. Snapshot the index-scheduler LMDB env
//
// When we call copy_to_file, LMDB opens a read transaction by itself,
// When we call copy_to_path, LMDB opens a read transaction by itself,
// we can't provide our own. It is an issue as we would like to know
// the update files to copy but new ones can be enqueued between the copy
// of the env and the new transaction we open to retrieve the enqueued tasks.
@ -42,7 +42,7 @@ impl IndexScheduler {
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
let dst = temp_snapshot_dir.path().join("tasks");
fs::create_dir_all(&dst)?;
self.env.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;
self.env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
// 2.2 Create a read transaction on the index-scheduler
let rtxn = self.env.read_txn()?;
@ -81,7 +81,7 @@ impl IndexScheduler {
let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string());
fs::create_dir_all(&dst)?;
index
.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)
.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)
.map_err(|e| Error::from_milli(e, Some(name.to_string())))?;
}
@ -98,7 +98,7 @@ impl IndexScheduler {
.max_dbs(2)
.open(&self.scheduler.auth_path)
}?;
auth.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;
auth.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
// 5. Copy and tarball the flat snapshot
progress.update_progress(SnapshotCreationProgress::CreateTheTarball);

View File

@ -1,5 +1,5 @@
use anyhow::bail;
use meilisearch_types::heed::{Env, RwTxn};
use meilisearch_types::heed::{Env, RwTxn, WithTls, WithoutTls};
use meilisearch_types::tasks::{Details, KindWithContent, Status, Task};
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
use time::OffsetDateTime;
@ -9,13 +9,17 @@ use crate::queue::TaskQueue;
use crate::versioning::Versioning;
trait UpgradeIndexScheduler {
fn upgrade(&self, env: &Env, wtxn: &mut RwTxn, original: (u32, u32, u32))
-> anyhow::Result<()>;
fn upgrade(
&self,
env: &Env<WithoutTls>,
wtxn: &mut RwTxn,
original: (u32, u32, u32),
) -> anyhow::Result<()>;
fn target_version(&self) -> (u32, u32, u32);
}
pub fn upgrade_index_scheduler(
env: &Env,
env: &Env<WithoutTls>,
versioning: &Versioning,
from: (u32, u32, u32),
to: (u32, u32, u32),
@ -91,7 +95,7 @@ struct ToCurrentNoOp {}
impl UpgradeIndexScheduler for ToCurrentNoOp {
fn upgrade(
&self,
_env: &Env,
_env: &Env<WithoutTls>,
_wtxn: &mut RwTxn,
_original: (u32, u32, u32),
) -> anyhow::Result<()> {

View File

@ -1,5 +1,5 @@
use meilisearch_types::heed::types::Str;
use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn};
use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn, WithoutTls};
use meilisearch_types::milli::heed_codec::version::VersionCodec;
use meilisearch_types::versioning;
@ -46,12 +46,12 @@ impl Versioning {
}
/// Return `Self` without checking anything about the version
pub fn raw_new(env: &Env, wtxn: &mut RwTxn) -> Result<Self, heed::Error> {
pub fn raw_new(env: &Env<WithoutTls>, wtxn: &mut RwTxn) -> Result<Self, heed::Error> {
let version = env.create_database(wtxn, Some(db_name::VERSION))?;
Ok(Self { version })
}
pub(crate) fn new(env: &Env, db_version: (u32, u32, u32)) -> Result<Self> {
pub(crate) fn new(env: &Env<WithoutTls>, db_version: (u32, u32, u32)) -> Result<Self> {
let mut wtxn = env.write_txn()?;
let this = Self::raw_new(env, &mut wtxn)?;
let from = match this.get_version(&wtxn)? {

View File

@ -9,7 +9,7 @@ use std::str::FromStr;
use std::sync::Arc;
use hmac::{Hmac, Mac};
use meilisearch_types::heed::BoxedError;
use meilisearch_types::heed::{BoxedError, WithTls};
use meilisearch_types::index_uid_pattern::IndexUidPattern;
use meilisearch_types::keys::KeyId;
use meilisearch_types::milli;
@ -31,7 +31,7 @@ const KEY_ID_ACTION_INDEX_EXPIRATION_DB_NAME: &str = "keyid-action-index-expirat
#[derive(Clone)]
pub struct HeedAuthStore {
env: Arc<Env>,
env: Arc<Env<WithTls>>,
keys: Database<Bytes, SerdeJson<Key>>,
action_keyid_index_expiration: Database<KeyIdActionCodec, SerdeJson<Option<OffsetDateTime>>>,
should_close_on_drop: bool,
@ -45,7 +45,7 @@ impl Drop for HeedAuthStore {
}
}
pub fn open_auth_store_env(path: &Path) -> milli::heed::Result<milli::heed::Env> {
pub fn open_auth_store_env(path: &Path) -> milli::heed::Result<milli::heed::Env<WithTls>> {
let mut options = EnvOpenOptions::new();
options.map_size(AUTH_STORE_SIZE); // 1GB
options.max_dbs(2);

View File

@ -405,7 +405,7 @@ impl ErrorCode for milli::Error {
match error {
// TODO: wait for spec for new error codes.
UserError::SerdeJson(_)
| UserError::InvalidLmdbOpenOptions
| UserError::EnvAlreadyOpened
| UserError::DocumentLimitReached
| UserError::UnknownInternalDocumentId { .. } => Code::Internal,
UserError::InvalidStoreFile => Code::InvalidStoreFile,
@ -501,8 +501,7 @@ impl ErrorCode for HeedError {
HeedError::Mdb(_)
| HeedError::Encoding(_)
| HeedError::Decoding(_)
| HeedError::DatabaseClosing
| HeedError::BadOpenOptions { .. } => Code::Internal,
| HeedError::EnvAlreadyOpened => Code::Internal,
}
}
}

View File

@ -11,7 +11,7 @@ use meilisearch_auth::AuthController;
use meilisearch_types::batches::Batch;
use meilisearch_types::heed::types::{Bytes, SerdeJson, Str};
use meilisearch_types::heed::{
CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified,
CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, TlsUsage, Unspecified,
};
use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME;
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
@ -224,8 +224,8 @@ fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> {
Ok(())
}
fn try_opening_database<KC: 'static, DC: 'static>(
env: &Env,
fn try_opening_database<KC: 'static, DC: 'static, T: TlsUsage>(
env: &Env<T>,
rtxn: &RoTxn,
db_name: &str,
) -> anyhow::Result<Database<KC, DC>> {
@ -234,8 +234,8 @@ fn try_opening_database<KC: 'static, DC: 'static>(
.with_context(|| format!("Missing the {db_name:?} database"))
}
fn try_opening_poly_database(
env: &Env,
fn try_opening_poly_database<T: TlsUsage>(
env: &Env<T>,
rtxn: &RoTxn,
db_name: &str,
) -> anyhow::Result<Database<Unspecified, Unspecified>> {
@ -386,9 +386,10 @@ fn export_a_dump(
for result in index_mapping.iter(&rtxn)? {
let (uid, uuid) = result?;
let index_path = db_path.join("indexes").join(uuid.to_string());
let index = Index::new(EnvOpenOptions::new(), &index_path, false).with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
let index = Index::new(EnvOpenOptions::new().read_txn_without_tls(), &index_path, false)
.with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
let rtxn = index.read_txn()?;
let metadata = IndexMetadata {
@ -456,9 +457,10 @@ fn compact_index(db_path: PathBuf, index_name: &str) -> anyhow::Result<()> {
}
let index_path = db_path.join("indexes").join(uuid.to_string());
let index = Index::new(EnvOpenOptions::new(), &index_path, false).with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
let index = Index::new(EnvOpenOptions::new().read_txn_without_tls(), &index_path, false)
.with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
eprintln!("Awaiting for a mutable transaction...");
let _wtxn = index.write_txn().context("While awaiting for a write transaction")?;
@ -470,7 +472,7 @@ fn compact_index(db_path: PathBuf, index_name: &str) -> anyhow::Result<()> {
eprintln!("Compacting the index...");
let before_compaction = Instant::now();
let new_file = index
.copy_to_file(&compacted_index_file_path, CompactionOption::Enabled)
.copy_to_path(&compacted_index_file_path, CompactionOption::Enabled)
.with_context(|| format!("While compacting {}", compacted_index_file_path.display()))?;
let after_size = new_file.metadata()?.len();
@ -526,9 +528,10 @@ fn export_documents(
if uid == index_name {
let index_path = db_path.join("indexes").join(uuid.to_string());
let index =
Index::new(EnvOpenOptions::new(), &index_path, false).with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
Index::new(EnvOpenOptions::new().read_txn_without_tls(), &index_path, false)
.with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
let rtxn = index.read_txn()?;
let fields_ids_map = index.fields_ids_map(&rtxn)?;
@ -630,9 +633,10 @@ fn hair_dryer(
if index_names.iter().any(|i| i == uid) {
let index_path = db_path.join("indexes").join(uuid.to_string());
let index =
Index::new(EnvOpenOptions::new(), &index_path, false).with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
Index::new(EnvOpenOptions::new().read_txn_without_tls(), &index_path, false)
.with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
eprintln!("Trying to get a read transaction on the {uid} index...");

View File

@ -2,7 +2,7 @@ use std::path::Path;
use anyhow::{bail, Context};
use meilisearch_types::heed::types::{SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified};
use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, TlsUsage, Unspecified};
use meilisearch_types::milli::index::{db_name, main_key};
use super::v1_9;
@ -90,9 +90,9 @@ fn update_index_stats(
Ok(())
}
fn update_date_format(
fn update_date_format<T: TlsUsage>(
index_uid: &str,
index_env: &Env,
index_env: &Env<T>,
index_wtxn: &mut RwTxn,
) -> anyhow::Result<()> {
let main = try_opening_poly_database(index_env, index_wtxn, db_name::MAIN)
@ -104,9 +104,9 @@ fn update_date_format(
Ok(())
}
fn find_rest_embedders(
fn find_rest_embedders<T: TlsUsage>(
index_uid: &str,
index_env: &Env,
index_env: &Env<T>,
index_txn: &RoTxn,
) -> anyhow::Result<Vec<String>> {
let main = try_opening_poly_database(index_env, index_txn, db_name::MAIN)

View File

@ -173,11 +173,12 @@ fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> {
println!("\t- Rebuilding field distribution");
let index =
meilisearch_types::milli::Index::new(EnvOpenOptions::new(), &index_path, false)
.with_context(|| {
format!("while opening index {uid} at '{}'", index_path.display())
})?;
let index = meilisearch_types::milli::Index::new(
EnvOpenOptions::new().read_txn_without_tls(),
&index_path,
false,
)
.with_context(|| format!("while opening index {uid} at '{}'", index_path.display()))?;
let mut index_txn = index.write_txn()?;

View File

@ -2,6 +2,7 @@ use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::convert::TryInto;
use std::fs::File;
use std::io::Seek;
use std::path::Path;
use heed::{types::*, WithoutTls};
@ -339,8 +340,16 @@ impl Index {
self.env.info().map_size
}
pub fn copy_to_file<P: AsRef<Path>>(&self, path: P, option: CompactionOption) -> Result<File> {
self.env.copy_to_file(path, option).map_err(Into::into)
pub fn copy_to_file(&self, file: &mut File, option: CompactionOption) -> Result<()> {
self.env.copy_to_file(file, option).map_err(Into::into)
}
pub fn copy_to_path<P: AsRef<Path>>(&self, path: P, option: CompactionOption) -> Result<File> {
let mut file =
File::options().create(true).write(true).truncate(true).read(true).open(path)?;
self.copy_to_file(&mut file, option)?;
file.rewind()?;
Ok(file)
}
/// Returns an `EnvClosingEvent` that can be used to wait for the closing event,