mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-24 04:07:30 +01:00
204 lines
7.9 KiB
Rust
204 lines
7.9 KiB
Rust
|
use std::collections::HashMap;
|
||
|
|
||
|
use dump::{KindDump, TaskDump, UpdateFile};
|
||
|
use meilisearch_types::heed::RwTxn;
|
||
|
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
|
||
|
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
|
||
|
use roaring::RoaringBitmap;
|
||
|
use uuid::Uuid;
|
||
|
|
||
|
use crate::{utils, Error, IndexScheduler, Result};
|
||
|
|
||
|
pub struct Dump<'a> {
|
||
|
index_scheduler: &'a IndexScheduler,
|
||
|
wtxn: RwTxn<'a>,
|
||
|
|
||
|
indexes: HashMap<String, RoaringBitmap>,
|
||
|
statuses: HashMap<Status, RoaringBitmap>,
|
||
|
kinds: HashMap<Kind, RoaringBitmap>,
|
||
|
}
|
||
|
|
||
|
impl<'a> Dump<'a> {
|
||
|
pub(crate) fn new(index_scheduler: &'a mut IndexScheduler) -> Result<Self> {
|
||
|
// While loading a dump no one should be able to access the scheduler thus I can block everything.
|
||
|
let wtxn = index_scheduler.env.write_txn()?;
|
||
|
|
||
|
Ok(Dump {
|
||
|
index_scheduler,
|
||
|
wtxn,
|
||
|
indexes: HashMap::new(),
|
||
|
statuses: HashMap::new(),
|
||
|
kinds: HashMap::new(),
|
||
|
})
|
||
|
}
|
||
|
|
||
|
/// Register a new task coming from a dump in the scheduler.
|
||
|
/// By taking a mutable ref we're pretty sure no one will ever import a dump while actix is running.
|
||
|
pub fn register_dumped_task(
|
||
|
&mut self,
|
||
|
task: TaskDump,
|
||
|
content_file: Option<Box<UpdateFile>>,
|
||
|
) -> Result<Task> {
|
||
|
let content_uuid = match content_file {
|
||
|
Some(content_file) if task.status == Status::Enqueued => {
|
||
|
let (uuid, mut file) = self.index_scheduler.queue.create_update_file(false)?;
|
||
|
let mut builder = DocumentsBatchBuilder::new(&mut file);
|
||
|
for doc in content_file {
|
||
|
builder.append_json_object(&doc?)?;
|
||
|
}
|
||
|
builder.into_inner()?;
|
||
|
file.persist()?;
|
||
|
|
||
|
Some(uuid)
|
||
|
}
|
||
|
// If the task isn't `Enqueued` then just generate a recognisable `Uuid`
|
||
|
// in case we try to open it later.
|
||
|
_ if task.status != Status::Enqueued => Some(Uuid::nil()),
|
||
|
_ => None,
|
||
|
};
|
||
|
|
||
|
let task = Task {
|
||
|
uid: task.uid,
|
||
|
batch_uid: task.batch_uid,
|
||
|
enqueued_at: task.enqueued_at,
|
||
|
started_at: task.started_at,
|
||
|
finished_at: task.finished_at,
|
||
|
error: task.error,
|
||
|
canceled_by: task.canceled_by,
|
||
|
details: task.details,
|
||
|
status: task.status,
|
||
|
kind: match task.kind {
|
||
|
KindDump::DocumentImport {
|
||
|
primary_key,
|
||
|
method,
|
||
|
documents_count,
|
||
|
allow_index_creation,
|
||
|
} => KindWithContent::DocumentAdditionOrUpdate {
|
||
|
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||
|
primary_key,
|
||
|
method,
|
||
|
content_file: content_uuid.ok_or(Error::CorruptedDump)?,
|
||
|
documents_count,
|
||
|
allow_index_creation,
|
||
|
},
|
||
|
KindDump::DocumentDeletion { documents_ids } => KindWithContent::DocumentDeletion {
|
||
|
documents_ids,
|
||
|
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||
|
},
|
||
|
KindDump::DocumentDeletionByFilter { filter } => {
|
||
|
KindWithContent::DocumentDeletionByFilter {
|
||
|
filter_expr: filter,
|
||
|
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||
|
}
|
||
|
}
|
||
|
KindDump::DocumentEdition { filter, context, function } => {
|
||
|
KindWithContent::DocumentEdition {
|
||
|
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||
|
filter_expr: filter,
|
||
|
context,
|
||
|
function,
|
||
|
}
|
||
|
}
|
||
|
KindDump::DocumentClear => KindWithContent::DocumentClear {
|
||
|
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||
|
},
|
||
|
KindDump::Settings { settings, is_deletion, allow_index_creation } => {
|
||
|
KindWithContent::SettingsUpdate {
|
||
|
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||
|
new_settings: settings,
|
||
|
is_deletion,
|
||
|
allow_index_creation,
|
||
|
}
|
||
|
}
|
||
|
KindDump::IndexDeletion => KindWithContent::IndexDeletion {
|
||
|
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||
|
},
|
||
|
KindDump::IndexCreation { primary_key } => KindWithContent::IndexCreation {
|
||
|
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||
|
primary_key,
|
||
|
},
|
||
|
KindDump::IndexUpdate { primary_key } => KindWithContent::IndexUpdate {
|
||
|
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
|
||
|
primary_key,
|
||
|
},
|
||
|
KindDump::IndexSwap { swaps } => KindWithContent::IndexSwap { swaps },
|
||
|
KindDump::TaskCancelation { query, tasks } => {
|
||
|
KindWithContent::TaskCancelation { query, tasks }
|
||
|
}
|
||
|
KindDump::TasksDeletion { query, tasks } => {
|
||
|
KindWithContent::TaskDeletion { query, tasks }
|
||
|
}
|
||
|
KindDump::DumpCreation { keys, instance_uid } => {
|
||
|
KindWithContent::DumpCreation { keys, instance_uid }
|
||
|
}
|
||
|
KindDump::SnapshotCreation => KindWithContent::SnapshotCreation,
|
||
|
},
|
||
|
};
|
||
|
|
||
|
self.index_scheduler.queue.tasks.all_tasks.put(&mut self.wtxn, &task.uid, &task)?;
|
||
|
|
||
|
for index in task.indexes() {
|
||
|
match self.indexes.get_mut(index) {
|
||
|
Some(bitmap) => {
|
||
|
bitmap.insert(task.uid);
|
||
|
}
|
||
|
None => {
|
||
|
let mut bitmap = RoaringBitmap::new();
|
||
|
bitmap.insert(task.uid);
|
||
|
self.indexes.insert(index.to_string(), bitmap);
|
||
|
}
|
||
|
};
|
||
|
}
|
||
|
|
||
|
utils::insert_task_datetime(
|
||
|
&mut self.wtxn,
|
||
|
self.index_scheduler.queue.tasks.enqueued_at,
|
||
|
task.enqueued_at,
|
||
|
task.uid,
|
||
|
)?;
|
||
|
|
||
|
// we can't override the started_at & finished_at, so we must only set it if the tasks is finished and won't change
|
||
|
if matches!(task.status, Status::Succeeded | Status::Failed | Status::Canceled) {
|
||
|
if let Some(started_at) = task.started_at {
|
||
|
utils::insert_task_datetime(
|
||
|
&mut self.wtxn,
|
||
|
self.index_scheduler.queue.tasks.started_at,
|
||
|
started_at,
|
||
|
task.uid,
|
||
|
)?;
|
||
|
}
|
||
|
if let Some(finished_at) = task.finished_at {
|
||
|
utils::insert_task_datetime(
|
||
|
&mut self.wtxn,
|
||
|
self.index_scheduler.queue.tasks.finished_at,
|
||
|
finished_at,
|
||
|
task.uid,
|
||
|
)?;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
self.statuses.entry(task.status).or_default().insert(task.uid);
|
||
|
self.kinds.entry(task.kind.as_kind()).or_default().insert(task.uid);
|
||
|
|
||
|
Ok(task)
|
||
|
}
|
||
|
|
||
|
/// Commit all the changes and exit the importing dump state
|
||
|
pub fn finish(mut self) -> Result<()> {
|
||
|
for (index, bitmap) in self.indexes {
|
||
|
self.index_scheduler.queue.tasks.index_tasks.put(&mut self.wtxn, &index, &bitmap)?;
|
||
|
}
|
||
|
for (status, bitmap) in self.statuses {
|
||
|
self.index_scheduler.queue.tasks.put_status(&mut self.wtxn, status, &bitmap)?;
|
||
|
}
|
||
|
for (kind, bitmap) in self.kinds {
|
||
|
self.index_scheduler.queue.tasks.put_kind(&mut self.wtxn, kind, &bitmap)?;
|
||
|
}
|
||
|
|
||
|
self.wtxn.commit()?;
|
||
|
self.index_scheduler.scheduler.wake_up.signal();
|
||
|
|
||
|
Ok(())
|
||
|
}
|
||
|
}
|