write the dump export

This commit is contained in:
Tamo 2022-10-13 15:02:59 +02:00 committed by Clément Renault
parent 7ce336306d
commit 9323f9f1c4
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
25 changed files with 686 additions and 184 deletions

View file

@ -1,7 +1,11 @@
use std::collections::HashSet;
use std::fs::File;
use std::io::BufWriter;
use crate::{autobatcher::BatchKind, Error, IndexScheduler, Result, TaskId};
use dump::IndexMetadata;
use meilisearch_types::milli::documents::obkv_to_object;
use meilisearch_types::tasks::{Details, Kind, KindWithContent, Status, Task};
use log::{debug, info};
@ -25,7 +29,7 @@ pub(crate) enum Batch {
Cancel(Task),
TaskDeletion(Task),
Snapshot(Vec<Task>),
Dump(Vec<Task>),
Dump(Task),
IndexOperation(IndexOperation),
IndexCreation {
index_uid: String,
@ -100,9 +104,10 @@ impl Batch {
match self {
Batch::Cancel(task)
| Batch::TaskDeletion(task)
| Batch::Dump(task)
| Batch::IndexCreation { task, .. }
| Batch::IndexUpdate { task, .. } => vec![task.uid],
Batch::Snapshot(tasks) | Batch::Dump(tasks) | Batch::IndexDeletion { tasks, .. } => {
Batch::Snapshot(tasks) | Batch::IndexDeletion { tasks, .. } => {
tasks.iter().map(|task| task.uid).collect()
}
Batch::IndexOperation(operation) => match operation {
@ -402,8 +407,11 @@ impl IndexScheduler {
// 4. we batch the dumps.
let to_dump = self.get_kind(rtxn, Kind::DumpExport)? & enqueued;
if !to_dump.is_empty() {
return Ok(Some(Batch::Dump(self.get_existing_tasks(rtxn, to_dump)?)));
if let Some(to_dump) = to_dump.min() {
return Ok(Some(Batch::Dump(
self.get_task(rtxn, to_dump)?
.ok_or(Error::CorruptedTaskQueue)?,
)));
}
// 5. We take the next task and try to batch all the tasks associated with this index.
@ -477,7 +485,80 @@ impl IndexScheduler {
Ok(vec![task])
}
Batch::Snapshot(_) => todo!(),
Batch::Dump(_) => todo!(),
Batch::Dump(mut task) => {
let KindWithContent::DumpExport { keys, instance_uid, dump_uid } = &task.kind else {
unreachable!();
};
let dump = dump::DumpWriter::new(instance_uid.clone())?;
let mut d_keys = dump.create_keys()?;
// 1. dump the keys
for key in keys {
d_keys.push_key(key)?;
}
let rtxn = self.env.read_txn()?;
// 2. dump the tasks
let mut tasks = dump.create_tasks_queue()?;
for ret in self.all_tasks.iter(&rtxn)? {
let (_, task) = ret?;
let mut dump_content_file = tasks.push_task(&task)?;
// 2.1. Dump the `content_file` associated with the task if there is one.
if let Some(content_file) = task.content_uuid() {
let content_file = self.file_store.get_update(*content_file)?;
let reader = DocumentsBatchReader::from_reader(content_file)
.map_err(milli::Error::from)?;
let (mut cursor, documents_batch_index) =
reader.into_cursor_and_fields_index();
while let Some(doc) = cursor.next_document().map_err(milli::Error::from)? {
dump_content_file
.push_document(&obkv_to_object(&doc, &documents_batch_index)?)?;
}
}
}
// TODO: maybe `self.indexes` could use this rtxn instead of creating its own
drop(rtxn);
// 3. Dump the indexes
for (uid, index) in self.indexes()? {
let rtxn = index.read_txn()?;
let metadata = IndexMetadata {
uid: uid.clone(),
primary_key: index.primary_key(&rtxn)?.map(String::from),
created_at: index.created_at(&rtxn)?,
updated_at: index.updated_at(&rtxn)?,
};
let mut index_dumper = dump.create_index(&uid, &metadata)?;
let fields_ids_map = index.fields_ids_map(&rtxn)?;
let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect();
// 3.1. Dump the documents
for ret in index.all_documents(&rtxn)? {
let (_id, doc) = ret?;
let document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?;
index_dumper.push_document(&document)?;
}
// 3.2. Dump the settings
let settings = meilisearch_types::settings::settings(&index, &rtxn)?;
index_dumper.settings(&settings)?;
}
let path = self.dumps_path.join(format!("{}.dump", dump_uid));
let file = File::create(path).unwrap();
dump.persist_to(BufWriter::new(file)).unwrap();
task.status = Status::Succeeded;
Ok(vec![task])
}
Batch::IndexOperation(operation) => {
#[rustfmt::skip]
let index = match operation {
@ -679,14 +760,14 @@ impl IndexScheduler {
task.status = Status::Succeeded;
task.details = Some(Details::DocumentAddition {
received_documents: number_of_documents,
indexed_documents,
indexed_documents: Some(indexed_documents),
});
}
Err(error) => {
task.status = Status::Failed;
task.details = Some(Details::DocumentAddition {
received_documents: count,
indexed_documents: count,
indexed_documents: Some(count),
});
task.error = Some(error.into())
}