add the task to the index db in the register task

This commit is contained in:
Tamo 2022-09-07 00:22:58 +02:00 committed by Clément Renault
parent ed745591e1
commit 705af94fd7
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 84 additions and 33 deletions

View File

@ -110,6 +110,15 @@ impl IndexScheduler {
self.all_tasks self.all_tasks
.append(&mut wtxn, &BEU32::new(task_id), &task)?; .append(&mut wtxn, &BEU32::new(task_id), &task)?;
if let Some(indexes) = task.indexes() {
for index in indexes {
self.update_index(&mut wtxn, index, |mut bitmap| {
bitmap.insert(task_id);
bitmap
})?;
}
}
self.update_status(&mut wtxn, Status::Enqueued, |mut bitmap| { self.update_status(&mut wtxn, Status::Enqueued, |mut bitmap| {
bitmap.insert(task_id); bitmap.insert(task_id);
bitmap bitmap

View File

@ -29,13 +29,20 @@ pub struct Task {
} }
impl Task { impl Task {
/// Persist all the temp files associated with the task.
pub fn persist(&self) -> Result<()> { pub fn persist(&self) -> Result<()> {
self.kind.persist() self.kind.persist()
} }
/// Delete all the files associated with the task.
pub fn remove_data(&self) -> Result<()> { pub fn remove_data(&self) -> Result<()> {
self.kind.remove_data() self.kind.remove_data()
} }
/// Return the list of indexes updated by this tasks.
pub fn indexes(&self) -> Option<Vec<&str>> {
self.kind.indexes()
}
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@ -98,8 +105,10 @@ impl KindWithContent {
} }
pub fn persist(&self) -> Result<()> { pub fn persist(&self) -> Result<()> {
use KindWithContent::*;
match self { match self {
KindWithContent::DocumentAddition { DocumentAddition {
index_name: _, index_name: _,
content_file: _, content_file: _,
} => { } => {
@ -108,21 +117,23 @@ impl KindWithContent {
Ok(()) Ok(())
} }
// There is nothing to persist for all these tasks // There is nothing to persist for all these tasks
KindWithContent::DumpExport { .. } DumpExport { .. }
| KindWithContent::DocumentDeletion { .. } | DocumentDeletion { .. }
| KindWithContent::ClearAllDocuments { .. } | ClearAllDocuments { .. }
| KindWithContent::RenameIndex { .. } | RenameIndex { .. }
| KindWithContent::CreateIndex { .. } | CreateIndex { .. }
| KindWithContent::DeleteIndex { .. } | DeleteIndex { .. }
| KindWithContent::SwapIndex { .. } | SwapIndex { .. }
| KindWithContent::CancelTask { .. } | CancelTask { .. }
| KindWithContent::Snapshot => Ok(()), | Snapshot => Ok(()),
} }
} }
pub fn remove_data(&self) -> Result<()> { pub fn remove_data(&self) -> Result<()> {
use KindWithContent::*;
match self { match self {
KindWithContent::DocumentAddition { DocumentAddition {
index_name: _, index_name: _,
content_file: _, content_file: _,
} => { } => {
@ -131,15 +142,33 @@ impl KindWithContent {
Ok(()) Ok(())
} }
// There is no data associated with all these tasks // There is no data associated with all these tasks
KindWithContent::DumpExport { .. } DumpExport { .. }
| KindWithContent::DocumentDeletion { .. } | DocumentDeletion { .. }
| KindWithContent::ClearAllDocuments { .. } | ClearAllDocuments { .. }
| KindWithContent::RenameIndex { .. } | RenameIndex { .. }
| KindWithContent::CreateIndex { .. } | CreateIndex { .. }
| KindWithContent::DeleteIndex { .. } | DeleteIndex { .. }
| KindWithContent::SwapIndex { .. } | SwapIndex { .. }
| KindWithContent::CancelTask { .. } | CancelTask { .. }
| KindWithContent::Snapshot => Ok(()), | Snapshot => Ok(()),
}
}
pub fn indexes(&self) -> Option<Vec<&str>> {
use KindWithContent::*;
match self {
DumpExport { .. } | Snapshot | CancelTask { .. } => None,
DocumentAddition { index_name, .. }
| DocumentDeletion { index_name, .. }
| ClearAllDocuments { index_name }
| CreateIndex { index_name, .. }
| DeleteIndex { index_name } => Some(vec![index_name]),
RenameIndex {
index_name: lhs,
new_name: rhs,
}
| SwapIndex { lhs, rhs } => Some(vec![lhs, rhs]),
} }
} }
} }

View File

@ -45,6 +45,19 @@ impl IndexScheduler {
Ok(self.index_tasks.put(wtxn, index, bitmap)?) Ok(self.index_tasks.put(wtxn, index, bitmap)?)
} }
pub(crate) fn update_index(
&self,
wtxn: &mut RwTxn,
index: &str,
f: impl Fn(RoaringBitmap) -> RoaringBitmap,
) -> Result<()> {
let tasks = self.get_index(&wtxn, index)?;
let tasks = f(tasks);
self.put_index(wtxn, index, &tasks)?;
Ok(())
}
pub(crate) fn get_status(&self, rtxn: &RoTxn, status: Status) -> Result<RoaringBitmap> { pub(crate) fn get_status(&self, rtxn: &RoTxn, status: Status) -> Result<RoaringBitmap> {
Ok(self.status.get(&rtxn, &status)?.unwrap_or_default()) Ok(self.status.get(&rtxn, &status)?.unwrap_or_default())
} }
@ -58,19 +71,6 @@ impl IndexScheduler {
Ok(self.status.put(wtxn, &status, bitmap)?) Ok(self.status.put(wtxn, &status, bitmap)?)
} }
pub(crate) fn get_kind(&self, rtxn: &RoTxn, kind: Kind) -> Result<RoaringBitmap> {
Ok(self.kind.get(&rtxn, &kind)?.unwrap_or_default())
}
pub(crate) fn put_kind(
&self,
wtxn: &mut RwTxn,
kind: Kind,
bitmap: &RoaringBitmap,
) -> Result<()> {
Ok(self.kind.put(wtxn, &kind, bitmap)?)
}
pub(crate) fn update_status( pub(crate) fn update_status(
&self, &self,
wtxn: &mut RwTxn, wtxn: &mut RwTxn,
@ -84,6 +84,19 @@ impl IndexScheduler {
Ok(()) Ok(())
} }
pub(crate) fn get_kind(&self, rtxn: &RoTxn, kind: Kind) -> Result<RoaringBitmap> {
Ok(self.kind.get(&rtxn, &kind)?.unwrap_or_default())
}
pub(crate) fn put_kind(
&self,
wtxn: &mut RwTxn,
kind: Kind,
bitmap: &RoaringBitmap,
) -> Result<()> {
Ok(self.kind.put(wtxn, &kind, bitmap)?)
}
pub(crate) fn update_kind( pub(crate) fn update_kind(
&self, &self,
wtxn: &mut RwTxn, wtxn: &mut RwTxn,