mirror of
https://github.com/meilisearch/MeiliSearch
synced 2024-11-26 14:54:27 +01:00
Merge #2455
2455: refactor perform task r=curquiza a=MarinPostma Refactor some index resolver functions to make duties more consistent, and code easier to test Co-authored-by: ad hoc <postma.marin@protonmail.com>
This commit is contained in:
commit
20dd259f23
@ -170,7 +170,7 @@ mod real {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn process_document_addition_batch(&self, mut tasks: Vec<Task>) -> Vec<Task> {
|
pub async fn process_document_addition_batch(&self, tasks: &mut [Task]) {
|
||||||
fn get_content_uuid(task: &Task) -> Uuid {
|
fn get_content_uuid(task: &Task) -> Uuid {
|
||||||
match task {
|
match task {
|
||||||
Task {
|
Task {
|
||||||
@ -218,7 +218,8 @@ mod real {
|
|||||||
timestamp: now,
|
timestamp: now,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
return tasks;
|
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -253,8 +254,6 @@ mod real {
|
|||||||
for task in tasks.iter_mut() {
|
for task in tasks.iter_mut() {
|
||||||
task.events.push(event.clone());
|
task.events.push(event.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
tasks
|
|
||||||
}
|
}
|
||||||
_ => panic!("invalid batch!"),
|
_ => panic!("invalid batch!"),
|
||||||
}
|
}
|
||||||
@ -265,7 +264,7 @@ mod real {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn process_task(&self, task: &Task) -> Result<TaskResult> {
|
async fn process_task_inner(&self, task: &Task) -> Result<TaskResult> {
|
||||||
match &task.content {
|
match &task.content {
|
||||||
TaskContent::DocumentAddition { .. } => {
|
TaskContent::DocumentAddition { .. } => {
|
||||||
panic!("updates should be handled by batch")
|
panic!("updates should be handled by batch")
|
||||||
@ -354,6 +353,13 @@ mod real {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn process_task(&self, task: &mut Task) {
|
||||||
|
match self.process_task_inner(task).await {
|
||||||
|
Ok(res) => task.events.push(TaskEvent::succeeded(res)),
|
||||||
|
Err(e) => task.events.push(TaskEvent::failed(e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
|
pub async fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
|
||||||
for (_, index) in self.list().await? {
|
for (_, index) in self.list().await? {
|
||||||
index.dump(&path)?;
|
index.dump(&path)?;
|
||||||
@ -502,7 +508,7 @@ mod test {
|
|||||||
Self::Mock(mocker)
|
Self::Mock(mocker)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn process_document_addition_batch(&self, tasks: Vec<Task>) -> Vec<Task> {
|
pub async fn process_document_addition_batch(&self, tasks: &mut [Task]) {
|
||||||
match self {
|
match self {
|
||||||
IndexResolver::Real(r) => r.process_document_addition_batch(tasks).await,
|
IndexResolver::Real(r) => r.process_document_addition_batch(tasks).await,
|
||||||
IndexResolver::Mock(m) => unsafe {
|
IndexResolver::Mock(m) => unsafe {
|
||||||
@ -511,7 +517,7 @@ mod test {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn process_task(&self, task: &Task) -> Result<TaskResult> {
|
pub async fn process_task(&self, task: &mut Task) {
|
||||||
match self {
|
match self {
|
||||||
IndexResolver::Real(r) => r.process_task(task).await,
|
IndexResolver::Real(r) => r.process_task(task).await,
|
||||||
IndexResolver::Mock(m) => unsafe { m.get("process_task").call(task) },
|
IndexResolver::Mock(m) => unsafe { m.get("process_task").call(task) },
|
||||||
|
@ -27,7 +27,7 @@ where
|
|||||||
.content
|
.content
|
||||||
.push_event(TaskEvent::succeeded(TaskResult::Other));
|
.push_event(TaskEvent::succeeded(TaskResult::Other));
|
||||||
}
|
}
|
||||||
Err(e) => batch.content.push_event(TaskEvent::failed(e.into())),
|
Err(e) => batch.content.push_event(TaskEvent::failed(e)),
|
||||||
}
|
}
|
||||||
batch
|
batch
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
use crate::index_resolver::IndexResolver;
|
use crate::index_resolver::IndexResolver;
|
||||||
use crate::index_resolver::{index_store::IndexStore, meta_store::IndexMetaStore};
|
use crate::index_resolver::{index_store::IndexStore, meta_store::IndexMetaStore};
|
||||||
use crate::tasks::batch::{Batch, BatchContent};
|
use crate::tasks::batch::{Batch, BatchContent};
|
||||||
use crate::tasks::task::TaskEvent;
|
|
||||||
use crate::tasks::BatchHandler;
|
use crate::tasks::BatchHandler;
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
@ -20,14 +19,11 @@ where
|
|||||||
async fn process_batch(&self, mut batch: Batch) -> Batch {
|
async fn process_batch(&self, mut batch: Batch) -> Batch {
|
||||||
match batch.content {
|
match batch.content {
|
||||||
BatchContent::DocumentsAdditionBatch(ref mut tasks) => {
|
BatchContent::DocumentsAdditionBatch(ref mut tasks) => {
|
||||||
*tasks = self
|
self.process_document_addition_batch(tasks).await;
|
||||||
.process_document_addition_batch(std::mem::take(tasks))
|
}
|
||||||
.await;
|
BatchContent::IndexUpdate(ref mut task) => {
|
||||||
|
self.process_task(task).await;
|
||||||
}
|
}
|
||||||
BatchContent::IndexUpdate(ref mut task) => match self.process_task(task).await {
|
|
||||||
Ok(success) => task.events.push(TaskEvent::succeeded(success)),
|
|
||||||
Err(err) => task.events.push(TaskEvent::failed(err.into())),
|
|
||||||
},
|
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -54,7 +50,6 @@ mod test {
|
|||||||
use crate::index_resolver::{
|
use crate::index_resolver::{
|
||||||
error::Result as IndexResult, index_store::MockIndexStore, meta_store::MockIndexMetaStore,
|
error::Result as IndexResult, index_store::MockIndexStore, meta_store::MockIndexMetaStore,
|
||||||
};
|
};
|
||||||
use crate::tasks::task::TaskResult;
|
|
||||||
use crate::tasks::{
|
use crate::tasks::{
|
||||||
handlers::test::task_to_batch,
|
handlers::test::task_to_batch,
|
||||||
task::{Task, TaskContent},
|
task::{Task, TaskContent},
|
||||||
@ -177,11 +172,11 @@ mod test {
|
|||||||
let mocker = Mocker::default();
|
let mocker = Mocker::default();
|
||||||
match task.content {
|
match task.content {
|
||||||
TaskContent::DocumentAddition { .. } => {
|
TaskContent::DocumentAddition { .. } => {
|
||||||
mocker.when::<Vec<Task>, Vec<Task>>("process_document_addition_batch").then(|tasks| tasks);
|
mocker.when::<&mut [Task], ()>("process_document_addition_batch").then(|_| ());
|
||||||
}
|
}
|
||||||
TaskContent::Dump { .. } => (),
|
TaskContent::Dump { .. } => (),
|
||||||
_ => {
|
_ => {
|
||||||
mocker.when::<&Task, IndexResult<TaskResult>>("process_task").then(|_| Ok(TaskResult::Other));
|
mocker.when::<&mut Task, ()>("process_task").then(|_| ());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let index_resolver: IndexResolver<HeedMetaStore, MapIndexStore> = IndexResolver::mock(mocker);
|
let index_resolver: IndexResolver<HeedMetaStore, MapIndexStore> = IndexResolver::mock(mocker);
|
||||||
|
@ -68,9 +68,9 @@ impl TaskEvent {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn failed(error: ResponseError) -> Self {
|
pub fn failed(error: impl Into<ResponseError>) -> Self {
|
||||||
Self::Failed {
|
Self::Failed {
|
||||||
error,
|
error: error.into(),
|
||||||
timestamp: OffsetDateTime::now_utc(),
|
timestamp: OffsetDateTime::now_utc(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user