From eec43ec953d4038443d7066d8ccaf54feba47161 Mon Sep 17 00:00:00 2001 From: Kerollmops Date: Tue, 25 Oct 2022 14:09:01 +0200 Subject: [PATCH] Implement a first version of the snapshots --- file-store/src/lib.rs | 7 ++- index-scheduler/src/batch.rs | 86 +++++++++++++++++++++++++++-- index-scheduler/src/index_mapper.rs | 2 +- index-scheduler/src/lib.rs | 2 +- meilisearch-types/src/tasks.rs | 6 +- 5 files changed, 92 insertions(+), 11 deletions(-) diff --git a/file-store/src/lib.rs b/file-store/src/lib.rs index 0e30661ec..e05694c92 100644 --- a/file-store/src/lib.rs +++ b/file-store/src/lib.rs @@ -74,11 +74,16 @@ impl FileStore { /// Returns the file corresponding to the requested uuid. pub fn get_update(&self, uuid: Uuid) -> Result { - let path = self.path.join(uuid.to_string()); + let path = self.get_update_path(uuid); let file = StdFile::open(path)?; Ok(file) } + /// Returns the path that correspond to this uuid, the path could not exists. + pub fn get_update_path(&self, uuid: Uuid) -> PathBuf { + self.path.join(uuid.to_string()) + } + /// Copies the content of the update file pointed to by `uuid` to the `dst` directory. pub fn snapshot(&self, uuid: Uuid, dst: impl AsRef) -> Result<()> { let src = self.path.join(uuid.to_string()); diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs index e7f8e5861..739b342a7 100644 --- a/index-scheduler/src/batch.rs +++ b/index-scheduler/src/batch.rs @@ -18,13 +18,14 @@ one indexing operation. */ use std::collections::HashSet; -use std::fs::File; +use std::fs::{self, File}; use std::io::BufWriter; use dump::IndexMetadata; use log::{debug, error, info}; use meilisearch_types::heed::{RoTxn, RwTxn}; use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader}; +use meilisearch_types::milli::heed::CompactionOption; use meilisearch_types::milli::update::{ DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod, Settings as MilliSettings, @@ -552,7 +553,84 @@ impl IndexScheduler { wtxn.commit()?; Ok(vec![task]) } - Batch::SnapshotCreation(_) => todo!(), + Batch::SnapshotCreation(mut tasks) => { + fs::create_dir_all(&self.snapshots_path)?; + let temp_snapshot_dir = tempfile::tempdir()?; + + // 1. Snapshot the version file. + // TODO where can I find the path of this file and do we create it anyway? + // let dst = temp_snapshot_dir.path().join(VERSION_FILE_NAME); + // let src = self.src_path.join(VERSION_FILE_NAME); + // fs::copy(src, dst)?; + + // TODO what is a meta-env in the previous version of the scheduler? + + // 2. Snapshot the index-scheduler LMDB env + // + // When we call copy_to_path, LMDB opens a read transaction by itself, + // we can't provide our own. It is an issue as we would like to know + // the update files to copy but new ones can be enqueued between the copy + // of the env and the new transaction we open to retrieve the enqueued tasks. + // So we prefer opening a new transaction after copying the env and copy more + // update files than not enough. + // + // Note that there cannot be any update files deleted between those + // two read operations as the task processing is synchronous. + + // 2.1 First copy the LMDB env and reorganize pages to reduce its size. + let dst = temp_snapshot_dir.path().join("data.mdb"); + self.env.copy_to_path(dst, CompactionOption::Enabled)?; + + // 2.2 Create a read transaction on the index-scheduler + let rtxn = self.env.read_txn()?; + + // 2.3 Create the update files directory + let update_files_dir = temp_snapshot_dir.path().join("update_files"); + fs::create_dir_all(&update_files_dir)?; + + // 2.4 Only copy the update files of the enqueued tasks + for task_id in self.get_status(&rtxn, Status::Enqueued)? { + let task = self.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?; + if let Some(content_uuid) = task.content_uuid() { + let src = self.file_store.get_update_path(content_uuid); + let dst = update_files_dir.join(content_uuid.to_string()); + fs::copy(src, dst)?; + } + } + + // 3. Snapshot every indexes + // TODO we are opening all of the indexes it can be too much we should unload all + // of the indexes we are trying to open. It would be even better to only unload + // the one that were opened by us. Or maybe use a LRU in the index mapper. + for result in self.index_mapper.index_mapping.iter(&rtxn)? { + let (name, uuid) = result?; + let index = self.index_mapper.index(&rtxn, name)?; + let dst = temp_snapshot_dir + .path() + .join("indexes") + .join(uuid.to_string()) + .join("data.mdb"); + index.copy_to_path(dst, CompactionOption::Enabled)?; + } + + drop(rtxn); + + // 4. Snapshot the auth LMDB env + let dst = temp_snapshot_dir.path().join("auth").join("data.mdb"); + fs::create_dir_all(&dst)?; + // TODO find a better way to get the auth database path + let auth_path = self.env.path().join("..").join("auth"); + let auth = milli::heed::EnvOpenOptions::new().open(auth_path)?; + auth.copy_to_path(dst, CompactionOption::Enabled)?; + + todo!("tar-gz and append .snapshot at the end of the file"); + + for task in &mut tasks { + task.status = Status::Succeeded; + } + + Ok(tasks) + } Batch::Dump(mut task) => { let started_at = OffsetDateTime::now_utc(); let (keys, instance_uid, dump_uid) = @@ -579,7 +657,7 @@ impl IndexScheduler { for ret in self.all_tasks.iter(&rtxn)? { let (_, mut t) = ret?; let status = t.status; - let content_file = t.content_uuid().copied(); + let content_file = t.content_uuid(); // In the case we're dumping ourselves we want to be marked as finished // to not loop over ourselves indefinitely. @@ -1106,7 +1184,7 @@ impl IndexScheduler { let mut content_files_to_delete = Vec::new(); for mut task in self.get_existing_tasks(wtxn, tasks_to_cancel.iter())? { if let Some(uuid) = task.content_uuid() { - content_files_to_delete.push(*uuid); + content_files_to_delete.push(uuid); } task.status = Status::Canceled; task.canceled_by = Some(cancel_task_id); diff --git a/index-scheduler/src/index_mapper.rs b/index-scheduler/src/index_mapper.rs index 9b8ba5676..3fc6f9281 100644 --- a/index-scheduler/src/index_mapper.rs +++ b/index-scheduler/src/index_mapper.rs @@ -30,7 +30,7 @@ pub struct IndexMapper { // TODO create a UUID Codec that uses the 16 bytes representation /// Map an index name with an index uuid currently available on disk. - index_mapping: Database>, + pub(crate) index_mapping: Database>, /// Path to the folder where the LMDB environments of each index are. base_path: PathBuf, diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 010aeb238..15e57aacb 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs @@ -899,7 +899,7 @@ impl IndexScheduler { pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> { match task.content_uuid() { - Some(content_file) => self.delete_update_file(*content_file), + Some(content_file) => self.delete_update_file(content_file), None => Ok(()), } } diff --git a/meilisearch-types/src/tasks.rs b/meilisearch-types/src/tasks.rs index 62f4f573e..2e070caa5 100644 --- a/meilisearch-types/src/tasks.rs +++ b/meilisearch-types/src/tasks.rs @@ -62,11 +62,9 @@ impl Task { } /// Return the content-uuid if there is one - pub fn content_uuid(&self) -> Option<&Uuid> { + pub fn content_uuid(&self) -> Option { match self.kind { - KindWithContent::DocumentAdditionOrUpdate { ref content_file, .. } => { - Some(content_file) - } + KindWithContent::DocumentAdditionOrUpdate { content_file, .. } => Some(content_file), KindWithContent::DocumentDeletion { .. } | KindWithContent::DocumentClear { .. } | KindWithContent::SettingsUpdate { .. }