Introduce the fetch_unfinished_tasks function to fetch tasks

This commit is contained in:
Kerollmops 2022-06-01 10:20:33 +02:00
parent 004c8b6be3
commit 461b91fd13
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 56 additions and 13 deletions

View File

@ -342,14 +342,8 @@ impl Scheduler {
}
async fn fetch_pending_tasks(&mut self) -> Result<()> {
// We must NEVER re-enqueue an already processed task! It's content uuid would point to an unexisting file.
//
// TODO(marin): This may create some latency when the first batch lazy loads the pending updates.
let mut filter = TaskFilter::default();
filter.filter_fn(|task| !task.is_finished());
self.store
.list_tasks(Some(self.next_fetched_task_id), Some(filter), None)
.fetch_unfinished_tasks(Some(self.next_fetched_task_id))
.await?
.into_iter()
// The tasks arrive in reverse order, and we need to insert them in order.

View File

@ -186,6 +186,17 @@ impl TaskStore {
Ok(tasks)
}
pub async fn fetch_unfinished_tasks(&self, offset: Option<TaskId>) -> Result<Vec<Task>> {
let store = self.store.clone();
tokio::task::spawn_blocking(move || {
let txn = store.rtxn()?;
let tasks = store.fetch_unfinished_tasks(&txn, offset)?;
Ok(tasks)
})
.await?
}
pub async fn list_tasks(
&self,
offset: Option<TaskId>,
@ -325,6 +336,13 @@ pub mod test {
}
}
pub async fn fetch_unfinished_tasks(&self, from: Option<TaskId>) -> Result<Vec<Task>> {
match self {
Self::Real(s) => s.fetch_unfinished_tasks(from).await,
Self::Mock(m) => unsafe { m.get("fetch_unfinished_tasks").call(from) },
}
}
pub async fn list_tasks(
&self,
from: Option<TaskId>,

View File

@ -121,9 +121,27 @@ impl Store {
Ok(task)
}
pub fn list_tasks<'a>(
/// Returns the unfinished tasks starting from the given taskId in ascending order.
pub fn fetch_unfinished_tasks(&self, txn: &RoTxn, from: Option<TaskId>) -> Result<Vec<Task>> {
// We must NEVER re-enqueue an already processed task! It's content uuid would point to an unexisting file.
//
// TODO(marin): This may create some latency when the first batch lazy loads the pending updates.
let from = from.unwrap_or_default();
let result: StdResult<Vec<_>, milli::heed::Error> = self
.tasks
.range(txn, &(BEU64::new(from)..))?
.map(|r| r.map(|(_, t)| t))
.filter(|result| result.as_ref().map_or(true, |t| !t.is_finished()))
.collect();
result.map_err(Into::into)
}
/// Returns all the tasks starting from the given taskId and going in descending order.
pub fn list_tasks(
&self,
txn: &'a RoTxn,
txn: &RoTxn,
from: Option<TaskId>,
filter: Option<TaskFilter>,
limit: Option<usize>,
@ -132,6 +150,7 @@ impl Store {
let range = from..limit
.map(|limit| (limit as u64).saturating_add(from))
.unwrap_or(u64::MAX);
let iter: Box<dyn Iterator<Item = StdResult<_, milli::heed::Error>>> = match filter {
Some(
ref filter @ TaskFilter {
@ -152,7 +171,7 @@ impl Store {
),
};
let apply_fitler = |task: &StdResult<_, milli::heed::Error>| match task {
let apply_filter = |task: &StdResult<_, milli::heed::Error>| match task {
Ok(ref t) => filter
.as_ref()
.and_then(|filter| filter.filter_fn.as_ref())
@ -160,9 +179,10 @@ impl Store {
.unwrap_or(true),
Err(_) => true,
};
// Collect 'limit' task if it exists or all of them.
let tasks = iter
.filter(apply_fitler)
.filter(apply_filter)
.take(limit.unwrap_or(usize::MAX))
.try_fold::<_, _, StdResult<_, milli::heed::Error>>(Vec::new(), |mut v, task| {
v.push(task?);
@ -305,9 +325,20 @@ pub mod test {
}
}
pub fn list_tasks<'a>(
pub fn fetch_unfinished_tasks(
&self,
txn: &'a RoTxn,
txn: &RoTxn,
from: Option<TaskId>,
) -> Result<Vec<Task>> {
match self {
MockStore::Real(index) => index.fetch_unfinished_tasks(txn, from),
MockStore::Fake(_) => todo!(),
}
}
pub fn list_tasks(
&self,
txn: &RoTxn,
from: Option<TaskId>,
filter: Option<TaskFilter>,
limit: Option<usize>,