wip integrating the scheduler in meilisearch-http

This commit is contained in:
Irevoire 2022-09-22 12:14:51 +02:00 committed by Clément Renault
parent acc6d3a82b
commit 60ee1f5e64
No known key found for this signature in database
GPG key ID: 92ADA4E935E71FA4
16 changed files with 251 additions and 192 deletions

View file

@ -30,7 +30,7 @@ pub(crate) enum Batch {
},
DocumentDeletion {
index_uid: String,
documents: Vec<DocumentId>,
documents: Vec<String>,
tasks: Vec<Task>,
},
DocumentClear {

View file

@ -1,3 +1,4 @@
use meilisearch_types::error::{Code, ErrorCode};
use milli::heed;
use thiserror::Error;
@ -13,6 +14,13 @@ pub enum Error {
CorruptedTaskQueue,
#[error("Task `{0}` not found")]
TaskNotFound(TaskId),
// maybe the two next errors are going to move to the frontend
#[error("`{0}` is not a status. Available status are")]
InvalidStatus(String),
#[error("`{0}` is not a type. Available types are")]
InvalidKind(String),
#[error(transparent)]
Heed(#[from] heed::Error),
#[error(transparent)]
@ -27,3 +35,22 @@ pub enum Error {
#[error(transparent)]
Anyhow(#[from] anyhow::Error),
}
impl ErrorCode for Error {
fn error_code(&self) -> Code {
match self {
Error::IndexNotFound(_) => Code::IndexNotFound,
Error::IndexAlreadyExists(_) => Code::IndexAlreadyExists,
Error::TaskNotFound(_) => Code::TaskNotFound,
Error::InvalidStatus(_) => todo!(),
Error::InvalidKind(_) => todo!(),
Error::Heed(_) => todo!(),
Error::Milli(_) => todo!(),
Error::IndexError(_) => todo!(),
Error::FileStore(_) => todo!(),
Error::IoError(_) => todo!(),
Error::Anyhow(_) => Code::Internal,
Error::CorruptedTaskQueue => Code::Internal,
}
}
}

View file

@ -1,11 +1,12 @@
use crate::index_mapper::IndexMapper;
use crate::task::{Kind, KindWithContent, Status, Task, TaskView};
use crate::{Error, Result, TaskId};
use file_store::FileStore;
use file_store::{File, FileStore};
use index::Index;
use milli::update::IndexerConfig;
use synchronoise::SignalEvent;
use time::OffsetDateTime;
use uuid::Uuid;
use std::path::PathBuf;
use std::sync::Arc;
@ -24,12 +25,12 @@ const DEFAULT_LIMIT: fn() -> u32 = || 20;
#[serde(rename_all = "camelCase")]
pub struct Query {
#[serde(default = "DEFAULT_LIMIT")]
limit: u32,
from: Option<u32>,
status: Option<Vec<Status>>,
pub limit: u32,
pub from: Option<u32>,
pub status: Option<Vec<Status>>,
#[serde(rename = "type")]
kind: Option<Vec<Kind>>,
index_uid: Option<Vec<String>>,
pub kind: Option<Vec<Kind>>,
pub index_uid: Option<Vec<String>>,
}
impl Default for Query {
@ -62,6 +63,15 @@ impl Query {
..self
}
}
pub fn with_index(self, index_uid: String) -> Self {
let mut index_vec = self.index_uid.unwrap_or_default();
index_vec.push(index_uid);
Self {
index_uid: Some(index_vec),
..self
}
}
}
pub mod db_name {
@ -193,15 +203,6 @@ impl IndexScheduler {
Ok(tasks.into_iter().map(|task| task.as_task_view()).collect())
}
/// Returns the tasks corresponding to the query.
pub fn task(&self, uid: TaskId) -> Result<TaskView> {
let rtxn = self.env.read_txn()?;
self.get_task(&rtxn, uid).and_then(|opt| {
opt.ok_or(Error::TaskNotFound(uid))
.map(|task| task.as_task_view())
})
}
/// Register a new task in the scheduler. If it fails and data was associated with the task
/// it tries to delete the file.
pub fn register(&self, task: KindWithContent) -> Result<TaskView> {
@ -251,6 +252,10 @@ impl IndexScheduler {
Ok(task.as_task_view())
}
pub fn create_update_file(&self) -> Result<(Uuid, File)> {
Ok(self.file_store.new_update()?)
}
/// This worker function must be run in a different thread and must be run only once.
pub fn run(&self) -> ! {
loop {
@ -422,10 +427,8 @@ mod tests {
"doggo": "bob"
}"#;
let (uuid, mut file) = index_scheduler.file_store.new_update().unwrap();
let (uuid, mut file) = index_scheduler.create_update_file().unwrap();
document_formats::read_json(content.as_bytes(), file.as_file_mut()).unwrap();
file.persist().unwrap();
index_scheduler
.register(KindWithContent::DocumentAddition {
index_uid: S("doggos"),
@ -435,6 +438,7 @@ mod tests {
allow_index_creation: true,
})
.unwrap();
file.persist().unwrap();
index_scheduler.tick().unwrap();

View file

@ -11,10 +11,7 @@ pub type TaskId = u32;
pub use crate::index_scheduler::{IndexScheduler, Query};
pub use error::Error;
/// from the exterior you don't need to know there is multiple type of `Kind`
pub use task::KindWithContent;
/// from the exterior you don't need to know there is multiple type of `Task`
pub use task::TaskView;
pub use task::{Kind, KindWithContent, Status, TaskView};
#[cfg(test)]
mod tests {

View file

@ -2,22 +2,12 @@ use anyhow::Result;
use index::{Settings, Unchecked};
use meilisearch_types::error::ResponseError;
use milli::DocumentId;
use serde::{Deserialize, Serialize, Serializer};
use std::{fmt::Write, path::PathBuf};
use std::{fmt::Write, path::PathBuf, str::FromStr};
use time::{Duration, OffsetDateTime};
use uuid::Uuid;
use crate::TaskId;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum Status {
Enqueued,
Processing,
Succeeded,
Failed,
}
use crate::{Error, TaskId};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
@ -98,6 +88,29 @@ impl Task {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum Status {
Enqueued,
Processing,
Succeeded,
Failed,
}
impl FromStr for Status {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"enqueued" => Ok(Status::Enqueued),
"processing" => Ok(Status::Processing),
"succeeded" => Ok(Status::Succeeded),
"failed" => Ok(Status::Failed),
s => Err(Error::InvalidStatus(s.to_string())),
}
}
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum KindWithContent {
@ -117,7 +130,7 @@ pub enum KindWithContent {
},
DocumentDeletion {
index_uid: String,
documents_ids: Vec<DocumentId>,
documents_ids: Vec<String>,
},
DocumentClear {
index_uid: String,
@ -261,6 +274,29 @@ pub enum Kind {
Snapshot,
}
impl FromStr for Kind {
type Err = Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"document_addition" => Ok(Kind::DocumentAddition),
"document_update" => Ok(Kind::DocumentUpdate),
"document_deletion" => Ok(Kind::DocumentDeletion),
"document_clear" => Ok(Kind::DocumentClear),
"settings" => Ok(Kind::Settings),
"index_creation" => Ok(Kind::IndexCreation),
"index_deletion" => Ok(Kind::IndexDeletion),
"index_update" => Ok(Kind::IndexUpdate),
"index_rename" => Ok(Kind::IndexRename),
"index_swap" => Ok(Kind::IndexSwap),
"cancel_task" => Ok(Kind::CancelTask),
"dump_export" => Ok(Kind::DumpExport),
"snapshot" => Ok(Kind::Snapshot),
s => Err(Error::InvalidKind(s.to_string())),
}
}
}
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
#[serde(untagged)]
#[allow(clippy::large_enum_variant)]