WIP: Still need to introduce a Env::copy_to_path method

This commit is contained in:
Kerollmops 2025-03-10 16:33:35 +01:00
parent 21bbbdec76
commit 3bc62f0549
No known key found for this signature in database
GPG key ID: F250A4C4E3AE5F5F
17 changed files with 93 additions and 72 deletions

View file

@ -2,7 +2,7 @@ use std::sync::{Arc, RwLock};
use meilisearch_types::features::{InstanceTogglableFeatures, Network, RuntimeTogglableFeatures};
use meilisearch_types::heed::types::{SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, RwTxn};
use meilisearch_types::heed::{Database, Env, RwTxn, WithoutTls};
use crate::error::FeatureNotEnabledError;
use crate::Result;
@ -139,7 +139,7 @@ impl FeatureData {
}
pub fn new(
env: &Env,
env: &Env<WithoutTls>,
wtxn: &mut RwTxn,
instance_features: InstanceTogglableFeatures,
) -> Result<Self> {

View file

@ -304,7 +304,8 @@ fn create_or_open_index(
map_size: usize,
creation: bool,
) -> Result<Index> {
let mut options = EnvOpenOptions::new();
let options = EnvOpenOptions::new();
let mut options = options.read_txn_without_tls();
options.map_size(clamp_to_page_size(map_size));
// You can find more details about this experimental
@ -333,7 +334,7 @@ fn create_or_open_index(
#[cfg(test)]
mod tests {
use meilisearch_types::heed::Env;
use meilisearch_types::heed::{Env, WithoutTls};
use meilisearch_types::Index;
use uuid::Uuid;
@ -343,7 +344,7 @@ mod tests {
use crate::IndexScheduler;
impl IndexMapper {
fn test() -> (Self, Env, IndexSchedulerHandle) {
fn test() -> (Self, Env<WithoutTls>, IndexSchedulerHandle) {
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
(index_scheduler.index_mapper, index_scheduler.env, handle)
}

View file

@ -4,7 +4,7 @@ use std::time::Duration;
use std::{fs, thread};
use meilisearch_types::heed::types::{SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
use meilisearch_types::milli;
use meilisearch_types::milli::database_stats::DatabaseStats;
use meilisearch_types::milli::update::IndexerConfig;
@ -164,7 +164,7 @@ impl IndexMapper {
}
pub fn new(
env: &Env,
env: &Env<WithoutTls>,
wtxn: &mut RwTxn,
options: &IndexSchedulerOptions,
budget: IndexBudget,

View file

@ -54,7 +54,7 @@ use meilisearch_types::batches::Batch;
use meilisearch_types::features::{InstanceTogglableFeatures, Network, RuntimeTogglableFeatures};
use meilisearch_types::heed::byteorder::BE;
use meilisearch_types::heed::types::I128;
use meilisearch_types::heed::{self, Env, RoTxn};
use meilisearch_types::heed::{self, Env, RoTxn, WithoutTls};
use meilisearch_types::milli::index::IndexEmbeddingConfig;
use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs};
@ -131,7 +131,7 @@ pub struct IndexSchedulerOptions {
/// to be performed on them.
pub struct IndexScheduler {
/// The LMDB environment which the DBs are associated with.
pub(crate) env: Env,
pub(crate) env: Env<WithoutTls>,
/// The list of tasks currently processing
pub(crate) processing_tasks: Arc<RwLock<ProcessingTasks>>,
@ -240,10 +240,9 @@ impl IndexScheduler {
};
let env = unsafe {
heed::EnvOpenOptions::new()
.max_dbs(Self::nb_db())
.map_size(budget.task_db_size)
.open(&options.tasks_path)
let options = heed::EnvOpenOptions::new();
let mut options = options.read_txn_without_tls();
options.max_dbs(Self::nb_db()).map_size(budget.task_db_size).open(&options.tasks_path)
}?;
// We **must** starts by upgrading the version because it'll also upgrade the required database before we can open them
@ -358,7 +357,7 @@ impl IndexScheduler {
}
}
pub fn read_txn(&self) -> Result<RoTxn> {
pub fn read_txn(&self) -> Result<RoTxn<WithoutTls>> {
self.env.read_txn().map_err(|e| e.into())
}
@ -427,12 +426,14 @@ impl IndexScheduler {
/// If you need to fetch information from or perform an action on all indexes,
/// see the `try_for_each_index` function.
pub fn index(&self, name: &str) -> Result<Index> {
self.index_mapper.index(&self.env.read_txn()?, name)
let rtxn = self.env.read_txn()?;
self.index_mapper.index(&rtxn, name)
}
/// Return the boolean referring if index exists.
pub fn index_exists(&self, name: &str) -> Result<bool> {
self.index_mapper.index_exists(&self.env.read_txn()?, name)
let rtxn = self.env.read_txn()?;
self.index_mapper.index_exists(&rtxn, name)
}
/// Return the name of all indexes without opening them.
@ -507,7 +508,8 @@ impl IndexScheduler {
/// 2. The name of the specific data related to the property can be `enqueued` for the `statuses`, `settingsUpdate` for the `types`, or the name of the index for the `indexes`, for example.
/// 3. The number of times the properties appeared.
pub fn get_stats(&self) -> Result<BTreeMap<String, BTreeMap<String, u64>>> {
self.queue.get_stats(&self.read_txn()?, &self.processing_tasks.read().unwrap())
let rtxn = self.read_txn()?;
self.queue.get_stats(&rtxn, &self.processing_tasks.read().unwrap())
}
// Return true if there is at least one task that is processing.

View file

@ -3,7 +3,7 @@ use std::ops::{Bound, RangeBounds};
use meilisearch_types::batches::{Batch, BatchId};
use meilisearch_types::heed::types::{DecodeIgnore, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Kind, Status};
use roaring::{MultiOps, RoaringBitmap};
@ -66,7 +66,7 @@ impl BatchQueue {
NUMBER_OF_DATABASES
}
pub(super) fn new(env: &Env, wtxn: &mut RwTxn) -> Result<Self> {
pub(super) fn new(env: &Env<WithoutTls>, wtxn: &mut RwTxn) -> Result<Self> {
Ok(Self {
all_batches: env.create_database(wtxn, Some(db_name::ALL_BATCHES))?,
status: env.create_database(wtxn, Some(db_name::BATCH_STATUS))?,

View file

@ -13,7 +13,7 @@ use std::time::Duration;
use file_store::FileStore;
use meilisearch_types::batches::BatchId;
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
use meilisearch_types::milli::{CboRoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use roaring::RoaringBitmap;
@ -157,7 +157,7 @@ impl Queue {
/// Create an index scheduler and start its run loop.
pub(crate) fn new(
env: &Env,
env: &Env<WithoutTls>,
wtxn: &mut RwTxn,
options: &IndexSchedulerOptions,
) -> Result<Self> {

View file

@ -1,7 +1,7 @@
use std::ops::{Bound, RangeBounds};
use meilisearch_types::heed::types::{DecodeIgnore, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Kind, Status, Task};
use roaring::{MultiOps, RoaringBitmap};
@ -68,7 +68,7 @@ impl TaskQueue {
NUMBER_OF_DATABASES
}
pub(crate) fn new(env: &Env, wtxn: &mut RwTxn) -> Result<Self> {
pub(crate) fn new(env: &Env<WithoutTls>, wtxn: &mut RwTxn) -> Result<Self> {
Ok(Self {
all_tasks: env.create_database(wtxn, Some(db_name::ALL_TASKS))?,
status: env.create_database(wtxn, Some(db_name::STATUS))?,

View file

@ -28,7 +28,7 @@ impl IndexScheduler {
// 2. Snapshot the index-scheduler LMDB env
//
// When we call copy_to_file, LMDB opens a read transaction by itself,
// When we call copy_to_path, LMDB opens a read transaction by itself,
// we can't provide our own. It is an issue as we would like to know
// the update files to copy but new ones can be enqueued between the copy
// of the env and the new transaction we open to retrieve the enqueued tasks.
@ -42,7 +42,7 @@ impl IndexScheduler {
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
let dst = temp_snapshot_dir.path().join("tasks");
fs::create_dir_all(&dst)?;
self.env.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;
self.env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
// 2.2 Create a read transaction on the index-scheduler
let rtxn = self.env.read_txn()?;
@ -81,7 +81,7 @@ impl IndexScheduler {
let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string());
fs::create_dir_all(&dst)?;
index
.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)
.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)
.map_err(|e| Error::from_milli(e, Some(name.to_string())))?;
}
@ -98,7 +98,7 @@ impl IndexScheduler {
.max_dbs(2)
.open(&self.scheduler.auth_path)
}?;
auth.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;
auth.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
// 5. Copy and tarball the flat snapshot
progress.update_progress(SnapshotCreationProgress::CreateTheTarball);

View file

@ -1,5 +1,5 @@
use anyhow::bail;
use meilisearch_types::heed::{Env, RwTxn};
use meilisearch_types::heed::{Env, RwTxn, WithTls, WithoutTls};
use meilisearch_types::tasks::{Details, KindWithContent, Status, Task};
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
use time::OffsetDateTime;
@ -9,13 +9,17 @@ use crate::queue::TaskQueue;
use crate::versioning::Versioning;
trait UpgradeIndexScheduler {
fn upgrade(&self, env: &Env, wtxn: &mut RwTxn, original: (u32, u32, u32))
-> anyhow::Result<()>;
fn upgrade(
&self,
env: &Env<WithoutTls>,
wtxn: &mut RwTxn,
original: (u32, u32, u32),
) -> anyhow::Result<()>;
fn target_version(&self) -> (u32, u32, u32);
}
pub fn upgrade_index_scheduler(
env: &Env,
env: &Env<WithoutTls>,
versioning: &Versioning,
from: (u32, u32, u32),
to: (u32, u32, u32),
@ -91,7 +95,7 @@ struct ToCurrentNoOp {}
impl UpgradeIndexScheduler for ToCurrentNoOp {
fn upgrade(
&self,
_env: &Env,
_env: &Env<WithoutTls>,
_wtxn: &mut RwTxn,
_original: (u32, u32, u32),
) -> anyhow::Result<()> {

View file

@ -1,5 +1,5 @@
use meilisearch_types::heed::types::Str;
use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn};
use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn, WithoutTls};
use meilisearch_types::milli::heed_codec::version::VersionCodec;
use meilisearch_types::versioning;
@ -46,12 +46,12 @@ impl Versioning {
}
/// Return `Self` without checking anything about the version
pub fn raw_new(env: &Env, wtxn: &mut RwTxn) -> Result<Self, heed::Error> {
pub fn raw_new(env: &Env<WithoutTls>, wtxn: &mut RwTxn) -> Result<Self, heed::Error> {
let version = env.create_database(wtxn, Some(db_name::VERSION))?;
Ok(Self { version })
}
pub(crate) fn new(env: &Env, db_version: (u32, u32, u32)) -> Result<Self> {
pub(crate) fn new(env: &Env<WithoutTls>, db_version: (u32, u32, u32)) -> Result<Self> {
let mut wtxn = env.write_txn()?;
let this = Self::raw_new(env, &mut wtxn)?;
let from = match this.get_version(&wtxn)? {