first mostly working version

This commit is contained in:
Tamo 2022-10-16 01:39:01 +02:00 committed by Clément Renault
parent 6fae317277
commit cac924b663
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
18 changed files with 403 additions and 57 deletions

View file

@ -736,7 +736,7 @@ impl IndexScheduler {
let user_result = match user_result {
Ok(count) => Ok(DocumentAdditionResult {
indexed_documents: count,
number_of_documents: count,
number_of_documents: count, // TODO: this is wrong, we should use the value stored in the Details.
}),
Err(e) => Err(milli::Error::from(e)),
};

View file

@ -13,6 +13,8 @@ pub enum Error {
IndexAlreadyExists(String),
#[error("Corrupted task queue.")]
CorruptedTaskQueue,
#[error("Corrupted dump.")]
CorruptedDump,
#[error("Task `{0}` not found")]
TaskNotFound(TaskId),
// TODO: Lo: proper error message for this
@ -49,14 +51,15 @@ impl ErrorCode for Error {
Error::InvalidStatus(_) => Code::BadRequest,
Error::InvalidKind(_) => Code::BadRequest,
// TODO: TAMO: are all these errors really internal?
Error::Dump(e) => e.error_code(),
Error::Milli(e) => e.error_code(),
// TODO: TAMO: are all these errors really internal?
Error::Heed(_) => Code::Internal,
Error::FileStore(_) => Code::Internal,
Error::IoError(_) => Code::Internal,
Error::Anyhow(_) => Code::Internal,
Error::CorruptedTaskQueue => Code::Internal,
Error::CorruptedDump => Code::Internal,
}
}
}

View file

@ -28,7 +28,7 @@ pub struct IndexMapper {
base_path: PathBuf,
index_size: usize,
indexer_config: Arc<IndexerConfig>,
pub indexer_config: Arc<IndexerConfig>,
}
/// Weither the index must not be inserted back

View file

@ -9,13 +9,17 @@ mod utils;
pub type Result<T> = std::result::Result<T, Error>;
pub type TaskId = u32;
use dump::{KindDump, TaskDump, UpdateFile};
pub use error::Error;
use meilisearch_types::keys::Key;
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use meilisearch_types::InstanceUid;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use file_store::{File, FileStore};
use file_store::FileStore;
use meilisearch_types::error::ResponseError;
use roaring::RoaringBitmap;
use serde::{Deserialize, Serialize};
@ -220,10 +224,6 @@ impl IndexScheduler {
Ok(this)
}
pub fn import_dump(&self, dump_path: PathBuf) -> Result<()> {
todo!()
}
/// This function will execute in a different thread and must be called only once.
fn run(&self) {
let run = Self {
@ -254,6 +254,10 @@ impl IndexScheduler {
});
}
pub fn indexer_config(&self) -> &IndexerConfig {
&self.index_mapper.indexer_config
}
/// Return the index corresponding to the name. If it wasn't opened before
/// it'll be opened. But if it doesn't exist on disk it'll throw an
/// `IndexNotFound` error.
@ -390,11 +394,138 @@ impl IndexScheduler {
Ok(task)
}
pub fn create_update_file(&self) -> Result<(Uuid, File)> {
/// Register a new task comming from a dump in the scheduler.
/// By takinig a mutable ref we're pretty sure no one will ever import a dump while actix is running.
pub fn register_dumpped_task(
&mut self,
task: TaskDump,
content_file: Option<Box<UpdateFile>>,
keys: &[Key],
instance_uid: Option<InstanceUid>,
) -> Result<Task> {
// Currently we don't need to access the tasks queue while loading a dump thus I can block everything.
let mut wtxn = self.env.write_txn()?;
let content_uuid = if let Some(content_file) = content_file {
let (uuid, mut file) = self.create_update_file()?;
let mut builder = DocumentsBatchBuilder::new(file.as_file_mut());
for doc in content_file {
builder.append_json_object(&doc?)?;
}
builder.into_inner()?;
file.persist()?;
Some(uuid)
} else {
None
};
let task = Task {
uid: task.uid,
enqueued_at: task.enqueued_at,
started_at: task.started_at,
finished_at: task.finished_at,
error: task.error,
details: task.details,
status: task.status,
kind: match task.kind {
KindDump::DocumentImport {
primary_key,
method,
documents_count,
allow_index_creation,
} => KindWithContent::DocumentImport {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
method,
content_file: content_uuid.ok_or(Error::CorruptedDump)?,
documents_count,
allow_index_creation,
},
KindDump::DocumentDeletion { documents_ids } => KindWithContent::DocumentDeletion {
documents_ids,
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::DocumentClear => KindWithContent::DocumentClear {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::Settings {
settings,
is_deletion,
allow_index_creation,
} => KindWithContent::Settings {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
new_settings: settings,
is_deletion,
allow_index_creation,
},
KindDump::IndexDeletion => KindWithContent::IndexDeletion {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
},
KindDump::IndexCreation { primary_key } => KindWithContent::IndexCreation {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
},
KindDump::IndexUpdate { primary_key } => KindWithContent::IndexUpdate {
index_uid: task.index_uid.ok_or(Error::CorruptedDump)?,
primary_key,
},
KindDump::IndexSwap { lhs, rhs } => KindWithContent::IndexSwap { lhs, rhs },
KindDump::CancelTask { tasks } => KindWithContent::CancelTask { tasks },
KindDump::DeleteTasks { query, tasks } => {
KindWithContent::DeleteTasks { query, tasks }
}
KindDump::DumpExport { dump_uid } => KindWithContent::DumpExport {
dump_uid,
keys: keys.to_vec(),
instance_uid,
},
KindDump::Snapshot => KindWithContent::Snapshot,
},
};
self.all_tasks
.append(&mut wtxn, &BEU32::new(task.uid), &task)?;
if let Some(indexes) = task.indexes() {
for index in indexes {
self.update_index(&mut wtxn, index, |bitmap| {
bitmap.insert(task.uid);
})?;
}
}
self.update_status(&mut wtxn, task.status, |bitmap| {
bitmap.insert(task.uid);
})?;
self.update_kind(&mut wtxn, task.kind.as_kind(), |bitmap| {
(bitmap.insert(task.uid));
})?;
match wtxn.commit() {
Ok(()) => (),
_e @ Err(_) => {
todo!("remove the data associated with the task");
// _e?;
}
}
Ok(task)
}
/// Create a new index without any associated task.
pub fn create_raw_index(&self, name: &str) -> Result<Index> {
let mut wtxn = self.env.write_txn()?;
self.index_mapper.create_index(&mut wtxn, name)
}
pub fn create_update_file(&self) -> Result<(Uuid, file_store::File)> {
Ok(self.file_store.new_update()?)
}
#[cfg(test)]
pub fn create_update_file_with_uuid(&self, uuid: u128) -> Result<(Uuid, File)> {
pub fn create_update_file_with_uuid(&self, uuid: u128) -> Result<(Uuid, file_store::File)> {
Ok(self.file_store.new_update_with_uuid(uuid)?)
}