process_task updates task events

This commit is contained in:
ad hoc 2022-06-02 15:04:33 +02:00
parent b594d49def
commit 64e3096790
No known key found for this signature in database
GPG Key ID: 4F00A782990CC643
4 changed files with 16 additions and 115 deletions

View File

@ -265,7 +265,7 @@ mod real {
Ok(())
}
pub async fn process_task(&self, task: &Task) -> Result<TaskResult> {
async fn process_task_inner(&self, task: &Task) -> Result<TaskResult> {
match &task.content {
TaskContent::DocumentAddition { .. } => {
panic!("updates should be handled by batch")
@ -354,6 +354,13 @@ mod real {
}
}
pub async fn process_task(&self, task: &mut Task) {
match self.process_task_inner(&task).await {
Ok(res) => task.events.push(TaskEvent::succeeded(res)),
Err(e) => task.events.push(TaskEvent::failed(e)),
}
}
pub async fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
for (_, index) in self.list().await? {
index.dump(&path)?;
@ -511,110 +518,7 @@ mod test {
}
}
pub async fn process_task(&self, task: &Task) -> Result<TaskResult> {
match self {
IndexResolver::Real(r) => r.process_task(task).await,
IndexResolver::Mock(m) => unsafe { m.get("process_task").call(task) },
}
}
pub async fn dump(&self, path: impl AsRef<Path>) -> Result<()> {
match self {
IndexResolver::Real(r) => r.dump(path).await,
IndexResolver::Mock(_) => todo!(),
}
}
/// Get or create an index with name `uid`.
pub async fn get_or_create_index(&self, uid: IndexUid, task_id: TaskId) -> Result<Index> {
match self {
IndexResolver::Real(r) => r.get_or_create_index(uid, task_id).await,
IndexResolver::Mock(_) => todo!(),
}
}
pub async fn list(&self) -> Result<Vec<(String, Index)>> {
match self {
IndexResolver::Real(r) => r.list().await,
IndexResolver::Mock(_) => todo!(),
}
}
pub async fn delete_index(&self, uid: String) -> Result<Index> {
match self {
IndexResolver::Real(r) => r.delete_index(uid).await,
IndexResolver::Mock(_) => todo!(),
}
}
pub async fn get_index(&self, uid: String) -> Result<Index> {
match self {
IndexResolver::Real(r) => r.get_index(uid).await,
IndexResolver::Mock(_) => todo!(),
}
}
pub async fn get_index_creation_task_id(&self, index_uid: String) -> Result<TaskId> {
match self {
IndexResolver::Real(r) => r.get_index_creation_task_id(index_uid).await,
IndexResolver::Mock(_) => todo!(),
}
}
pub async fn delete_content_file(&self, content_uuid: Uuid) -> Result<()> {
match self {
IndexResolver::Real(r) => r.delete_content_file(content_uuid).await,
IndexResolver::Mock(m) => unsafe {
m.get("delete_content_file").call(content_uuid)
},
}
}
}
pub enum MockIndexResolver<U, I> {
Real(super::real::IndexResolver<U, I>),
Mock(Mocker),
}
impl MockIndexResolver<HeedMetaStore, MapIndexStore> {
pub fn load_dump(
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
index_db_size: usize,
env: Arc<Env>,
indexer_opts: &IndexerOpts,
) -> anyhow::Result<()> {
super::real::IndexResolver::load_dump(src, dst, index_db_size, env, indexer_opts)
}
}
impl<U, I> MockIndexResolver<U, I>
where
U: IndexMetaStore,
I: IndexStore,
{
pub fn new(index_uuid_store: U, index_store: I, file_store: UpdateFileStore) -> Self {
Self::Real(super::real::IndexResolver {
index_uuid_store,
index_store,
file_store,
})
}
pub fn mock(mocker: Mocker) -> Self {
Self::Mock(mocker)
}
pub async fn process_document_addition_batch(&self, tasks: Vec<Task>) -> Vec<Task> {
match self {
IndexResolver::Real(r) => r.process_document_addition_batch(tasks).await,
IndexResolver::Mock(m) => unsafe {
m.get("process_document_addition_batch").call(tasks)
},
}
}
pub async fn process_task(&self, task: &Task) -> Result<TaskResult> {
pub async fn process_task(&self, task: &mut Task) {
match self {
IndexResolver::Real(r) => r.process_task(task).await,
IndexResolver::Mock(m) => unsafe { m.get("process_task").call(task) },

View File

@ -27,7 +27,7 @@ where
.content
.push_event(TaskEvent::succeeded(TaskResult::Other));
}
Err(e) => batch.content.push_event(TaskEvent::failed(e.into())),
Err(e) => batch.content.push_event(TaskEvent::failed(e)),
}
batch
}

View File

@ -1,7 +1,6 @@
use crate::index_resolver::IndexResolver;
use crate::index_resolver::{index_store::IndexStore, meta_store::IndexMetaStore};
use crate::tasks::batch::{Batch, BatchContent};
use crate::tasks::task::TaskEvent;
use crate::tasks::BatchHandler;
#[async_trait::async_trait]
@ -24,10 +23,9 @@ where
.process_document_addition_batch(std::mem::take(tasks))
.await;
}
BatchContent::IndexUpdate(ref mut task) => match self.process_task(task).await {
Ok(success) => task.events.push(TaskEvent::succeeded(success)),
Err(err) => task.events.push(TaskEvent::failed(err.into())),
},
BatchContent::IndexUpdate(ref mut task) => {
self.process_task(task).await;
}
_ => unreachable!(),
}
@ -54,7 +52,6 @@ mod test {
use crate::index_resolver::{
error::Result as IndexResult, index_store::MockIndexStore, meta_store::MockIndexMetaStore,
};
use crate::tasks::task::TaskResult;
use crate::tasks::{
handlers::test::task_to_batch,
task::{Task, TaskContent},
@ -181,7 +178,7 @@ mod test {
}
TaskContent::Dump { .. } => (),
_ => {
mocker.when::<&Task, IndexResult<TaskResult>>("process_task").then(|_| Ok(TaskResult::Other));
mocker.when::<&mut Task, ()>("process_task").then(|_| ());
}
}
let index_resolver: IndexResolver<HeedMetaStore, MapIndexStore> = IndexResolver::mock(mocker);

View File

@ -68,9 +68,9 @@ impl TaskEvent {
}
}
pub fn failed(error: ResponseError) -> Self {
pub fn failed(error: impl Into<ResponseError>) -> Self {
Self::Failed {
error,
error: error.into(),
timestamp: OffsetDateTime::now_utc(),
}
}