Implement a first version of the snapshots

This commit is contained in:
Kerollmops 2022-10-25 14:09:01 +02:00 committed by Clément Renault
parent c063f154fb
commit eec43ec953
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
5 changed files with 92 additions and 11 deletions

View File

@ -74,11 +74,16 @@ impl FileStore {
/// Returns the file corresponding to the requested uuid.
pub fn get_update(&self, uuid: Uuid) -> Result<StdFile> {
let path = self.path.join(uuid.to_string());
let path = self.get_update_path(uuid);
let file = StdFile::open(path)?;
Ok(file)
}
/// Returns the path that correspond to this uuid, the path could not exists.
pub fn get_update_path(&self, uuid: Uuid) -> PathBuf {
self.path.join(uuid.to_string())
}
/// Copies the content of the update file pointed to by `uuid` to the `dst` directory.
pub fn snapshot(&self, uuid: Uuid, dst: impl AsRef<Path>) -> Result<()> {
let src = self.path.join(uuid.to_string());

View File

@ -18,13 +18,14 @@ one indexing operation.
*/
use std::collections::HashSet;
use std::fs::File;
use std::fs::{self, File};
use std::io::BufWriter;
use dump::IndexMetadata;
use log::{debug, error, info};
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
use meilisearch_types::milli::heed::CompactionOption;
use meilisearch_types::milli::update::{
DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod,
Settings as MilliSettings,
@ -552,7 +553,84 @@ impl IndexScheduler {
wtxn.commit()?;
Ok(vec![task])
}
Batch::SnapshotCreation(_) => todo!(),
Batch::SnapshotCreation(mut tasks) => {
fs::create_dir_all(&self.snapshots_path)?;
let temp_snapshot_dir = tempfile::tempdir()?;
// 1. Snapshot the version file.
// TODO where can I find the path of this file and do we create it anyway?
// let dst = temp_snapshot_dir.path().join(VERSION_FILE_NAME);
// let src = self.src_path.join(VERSION_FILE_NAME);
// fs::copy(src, dst)?;
// TODO what is a meta-env in the previous version of the scheduler?
// 2. Snapshot the index-scheduler LMDB env
//
// When we call copy_to_path, LMDB opens a read transaction by itself,
// we can't provide our own. It is an issue as we would like to know
// the update files to copy but new ones can be enqueued between the copy
// of the env and the new transaction we open to retrieve the enqueued tasks.
// So we prefer opening a new transaction after copying the env and copy more
// update files than not enough.
//
// Note that there cannot be any update files deleted between those
// two read operations as the task processing is synchronous.
// 2.1 First copy the LMDB env and reorganize pages to reduce its size.
let dst = temp_snapshot_dir.path().join("data.mdb");
self.env.copy_to_path(dst, CompactionOption::Enabled)?;
// 2.2 Create a read transaction on the index-scheduler
let rtxn = self.env.read_txn()?;
// 2.3 Create the update files directory
let update_files_dir = temp_snapshot_dir.path().join("update_files");
fs::create_dir_all(&update_files_dir)?;
// 2.4 Only copy the update files of the enqueued tasks
for task_id in self.get_status(&rtxn, Status::Enqueued)? {
let task = self.get_task(&rtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
if let Some(content_uuid) = task.content_uuid() {
let src = self.file_store.get_update_path(content_uuid);
let dst = update_files_dir.join(content_uuid.to_string());
fs::copy(src, dst)?;
}
}
// 3. Snapshot every indexes
// TODO we are opening all of the indexes it can be too much we should unload all
// of the indexes we are trying to open. It would be even better to only unload
// the one that were opened by us. Or maybe use a LRU in the index mapper.
for result in self.index_mapper.index_mapping.iter(&rtxn)? {
let (name, uuid) = result?;
let index = self.index_mapper.index(&rtxn, name)?;
let dst = temp_snapshot_dir
.path()
.join("indexes")
.join(uuid.to_string())
.join("data.mdb");
index.copy_to_path(dst, CompactionOption::Enabled)?;
}
drop(rtxn);
// 4. Snapshot the auth LMDB env
let dst = temp_snapshot_dir.path().join("auth").join("data.mdb");
fs::create_dir_all(&dst)?;
// TODO find a better way to get the auth database path
let auth_path = self.env.path().join("..").join("auth");
let auth = milli::heed::EnvOpenOptions::new().open(auth_path)?;
auth.copy_to_path(dst, CompactionOption::Enabled)?;
todo!("tar-gz and append .snapshot at the end of the file");
for task in &mut tasks {
task.status = Status::Succeeded;
}
Ok(tasks)
}
Batch::Dump(mut task) => {
let started_at = OffsetDateTime::now_utc();
let (keys, instance_uid, dump_uid) =
@ -579,7 +657,7 @@ impl IndexScheduler {
for ret in self.all_tasks.iter(&rtxn)? {
let (_, mut t) = ret?;
let status = t.status;
let content_file = t.content_uuid().copied();
let content_file = t.content_uuid();
// In the case we're dumping ourselves we want to be marked as finished
// to not loop over ourselves indefinitely.
@ -1106,7 +1184,7 @@ impl IndexScheduler {
let mut content_files_to_delete = Vec::new();
for mut task in self.get_existing_tasks(wtxn, tasks_to_cancel.iter())? {
if let Some(uuid) = task.content_uuid() {
content_files_to_delete.push(*uuid);
content_files_to_delete.push(uuid);
}
task.status = Status::Canceled;
task.canceled_by = Some(cancel_task_id);

View File

@ -30,7 +30,7 @@ pub struct IndexMapper {
// TODO create a UUID Codec that uses the 16 bytes representation
/// Map an index name with an index uuid currently available on disk.
index_mapping: Database<Str, SerdeBincode<Uuid>>,
pub(crate) index_mapping: Database<Str, SerdeBincode<Uuid>>,
/// Path to the folder where the LMDB environments of each index are.
base_path: PathBuf,

View File

@ -899,7 +899,7 @@ impl IndexScheduler {
pub(crate) fn delete_persisted_task_data(&self, task: &Task) -> Result<()> {
match task.content_uuid() {
Some(content_file) => self.delete_update_file(*content_file),
Some(content_file) => self.delete_update_file(content_file),
None => Ok(()),
}
}

View File

@ -62,11 +62,9 @@ impl Task {
}
/// Return the content-uuid if there is one
pub fn content_uuid(&self) -> Option<&Uuid> {
pub fn content_uuid(&self) -> Option<Uuid> {
match self.kind {
KindWithContent::DocumentAdditionOrUpdate { ref content_file, .. } => {
Some(content_file)
}
KindWithContent::DocumentAdditionOrUpdate { content_file, .. } => Some(content_file),
KindWithContent::DocumentDeletion { .. }
| KindWithContent::DocumentClear { .. }
| KindWithContent::SettingsUpdate { .. }