write the dump export

This commit is contained in:
Tamo 2022-10-13 15:02:59 +02:00 committed by Clément Renault
parent 8954b1bd1d
commit b7f9c94f4a
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
25 changed files with 686 additions and 184 deletions

View file

@ -1,7 +1,11 @@
use std::collections::HashSet;
use std::fs::File;
use std::io::BufWriter;
use crate::{autobatcher::BatchKind, Error, IndexScheduler, Result, TaskId};
use dump::IndexMetadata;
use meilisearch_types::milli::documents::obkv_to_object;
use meilisearch_types::tasks::{Details, Kind, KindWithContent, Status, Task};
use log::{debug, info};
@ -25,7 +29,7 @@ pub(crate) enum Batch {
Cancel(Task),
TaskDeletion(Task),
Snapshot(Vec<Task>),
Dump(Vec<Task>),
Dump(Task),
IndexOperation(IndexOperation),
IndexCreation {
index_uid: String,
@ -100,9 +104,10 @@ impl Batch {
match self {
Batch::Cancel(task)
| Batch::TaskDeletion(task)
| Batch::Dump(task)
| Batch::IndexCreation { task, .. }
| Batch::IndexUpdate { task, .. } => vec![task.uid],
Batch::Snapshot(tasks) | Batch::Dump(tasks) | Batch::IndexDeletion { tasks, .. } => {
Batch::Snapshot(tasks) | Batch::IndexDeletion { tasks, .. } => {
tasks.iter().map(|task| task.uid).collect()
}
Batch::IndexOperation(operation) => match operation {
@ -402,8 +407,11 @@ impl IndexScheduler {
// 4. we batch the dumps.
let to_dump = self.get_kind(rtxn, Kind::DumpExport)? & enqueued;
if !to_dump.is_empty() {
return Ok(Some(Batch::Dump(self.get_existing_tasks(rtxn, to_dump)?)));
if let Some(to_dump) = to_dump.min() {
return Ok(Some(Batch::Dump(
self.get_task(rtxn, to_dump)?
.ok_or(Error::CorruptedTaskQueue)?,
)));
}
// 5. We take the next task and try to batch all the tasks associated with this index.
@ -477,7 +485,80 @@ impl IndexScheduler {
Ok(vec![task])
}
Batch::Snapshot(_) => todo!(),
Batch::Dump(_) => todo!(),
Batch::Dump(mut task) => {
let KindWithContent::DumpExport { keys, instance_uid, dump_uid } = &task.kind else {
unreachable!();
};
let dump = dump::DumpWriter::new(instance_uid.clone())?;
let mut d_keys = dump.create_keys()?;
// 1. dump the keys
for key in keys {
d_keys.push_key(key)?;
}
let rtxn = self.env.read_txn()?;
// 2. dump the tasks
let mut tasks = dump.create_tasks_queue()?;
for ret in self.all_tasks.iter(&rtxn)? {
let (_, task) = ret?;
let mut dump_content_file = tasks.push_task(&task)?;
// 2.1. Dump the `content_file` associated with the task if there is one.
if let Some(content_file) = task.content_uuid() {
let content_file = self.file_store.get_update(*content_file)?;
let reader = DocumentsBatchReader::from_reader(content_file)
.map_err(milli::Error::from)?;
let (mut cursor, documents_batch_index) =
reader.into_cursor_and_fields_index();
while let Some(doc) = cursor.next_document().map_err(milli::Error::from)? {
dump_content_file
.push_document(&obkv_to_object(&doc, &documents_batch_index)?)?;
}
}
}
// TODO: maybe `self.indexes` could use this rtxn instead of creating its own
drop(rtxn);
// 3. Dump the indexes
for (uid, index) in self.indexes()? {
let rtxn = index.read_txn()?;
let metadata = IndexMetadata {
uid: uid.clone(),
primary_key: index.primary_key(&rtxn)?.map(String::from),
created_at: index.created_at(&rtxn)?,
updated_at: index.updated_at(&rtxn)?,
};
let mut index_dumper = dump.create_index(&uid, &metadata)?;
let fields_ids_map = index.fields_ids_map(&rtxn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
// 3.1. Dump the documents
for ret in index.all_documents(&rtxn)? {
let (_id, doc) = ret?;
let document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?;
index_dumper.push_document(&document)?;
}
// 3.2. Dump the settings
let settings = meilisearch_types::settings::settings(&index, &rtxn)?;
index_dumper.settings(&settings)?;
}
let path = self.dumps_path.join(format!("{}.dump", dump_uid));
let file = File::create(path).unwrap();
dump.persist_to(BufWriter::new(file)).unwrap();
task.status = Status::Succeeded;
Ok(vec![task])
}
Batch::IndexOperation(operation) => {
#[rustfmt::skip]
let index = match operation {
@ -679,14 +760,14 @@ impl IndexScheduler {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentAddition {
received_documents: number_of_documents,
indexed_documents,
indexed_documents: Some(indexed_documents),
});
}
Err(error) => {
task.status = Status::Failed;
task.details = Some(Details::DocumentAddition {
received_documents: count,
indexed_documents: count,
indexed_documents: Some(count),
});
task.error = Some(error.into())
}

View file

@ -24,6 +24,8 @@ pub enum Error {
#[error("`{0}` is not a type. Available types are")]
InvalidKind(String),
#[error(transparent)]
Dump(#[from] dump::Error),
#[error(transparent)]
Heed(#[from] heed::Error),
#[error(transparent)]
@ -48,8 +50,9 @@ impl ErrorCode for Error {
Error::InvalidKind(_) => Code::BadRequest,
// TODO: TAMO: are all these errors really internal?
Error::Dump(e) => e.error_code(),
Error::Milli(e) => e.error_code(),
Error::Heed(_) => Code::Internal,
Error::Milli(_) => Code::Internal,
Error::FileStore(_) => Code::Internal,
Error::IoError(_) => Code::Internal,
Error::Anyhow(_) => Code::Internal,

View file

@ -154,6 +154,9 @@ pub struct IndexScheduler {
/// Weither autobatching is enabled or not.
pub(crate) autobatching_enabled: bool,
/// The path used to create the dumps.
pub(crate) dumps_path: PathBuf,
// ================= test
/// The next entry is dedicated to the tests.
/// It provide a way to break in multiple part of the scheduler.
@ -175,6 +178,7 @@ impl IndexScheduler {
tasks_path: PathBuf,
update_file_path: PathBuf,
indexes_path: PathBuf,
dumps_path: PathBuf,
index_size: usize,
indexer_config: IndexerConfig,
autobatching_enabled: bool,
@ -183,6 +187,7 @@ impl IndexScheduler {
std::fs::create_dir_all(&tasks_path)?;
std::fs::create_dir_all(&update_file_path)?;
std::fs::create_dir_all(&indexes_path)?;
std::fs::create_dir_all(&dumps_path)?;
let mut options = heed::EnvOpenOptions::new();
options.max_dbs(6);
@ -205,6 +210,7 @@ impl IndexScheduler {
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
wake_up: Arc::new(SignalEvent::auto(true)),
autobatching_enabled,
dumps_path,
#[cfg(test)]
test_breakpoint_sdr,
@ -227,6 +233,7 @@ impl IndexScheduler {
index_mapper: self.index_mapper.clone(),
wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled,
dumps_path: self.dumps_path.clone(),
#[cfg(test)]
test_breakpoint_sdr: self.test_breakpoint_sdr.clone(),
@ -342,7 +349,7 @@ impl IndexScheduler {
started_at: None,
finished_at: None,
error: None,
details: task.default_details(),
details: (&task).into(),
status: Status::Enqueued,
kind: task,
};
@ -367,9 +374,9 @@ impl IndexScheduler {
match wtxn.commit() {
Ok(()) => (),
e @ Err(_) => {
_e @ Err(_) => {
todo!("remove the data associated with the task");
e?;
// _e?;
}
}
@ -436,6 +443,7 @@ impl IndexScheduler {
// TODO the info field should've been set by the process_batch function
self.update_task(&mut wtxn, &task)?;
}
log::info!("A batch of tasks was successfully completed.");
}
// In case of a failure we must get back and patch all the tasks with the error.
Err(err) => {
@ -453,7 +461,6 @@ impl IndexScheduler {
}
*self.processing_tasks.write().unwrap() = (finished_at, RoaringBitmap::new());
wtxn.commit()?;
log::info!("A batch of tasks was successfully completed.");
#[cfg(test)]
self.test_breakpoint_sdr
@ -542,6 +549,7 @@ mod tests {
tempdir.path().join("db_path"),
tempdir.path().join("file_store"),
tempdir.path().join("indexes"),
tempdir.path().join("dumps"),
1024 * 1024,
IndexerConfig::default(),
autobatching, // enable autobatching