Prepare for processing documents edition

This commit is contained in:
Clément Renault 2024-05-08 15:26:21 +02:00
parent 2099b4f0dd
commit 1702b5cf44
No known key found for this signature in database
GPG Key ID: F250A4C4E3AE5F5F
7 changed files with 107 additions and 4 deletions

View File

@ -166,6 +166,7 @@ impl From<KindWithContent> for KindDump {
documents_count,
allow_index_creation,
},
KindWithContent::DocumentEdition { .. } => todo!(),
KindWithContent::DocumentDeletion { documents_ids, .. } => {
KindDump::DocumentDeletion { documents_ids }
}

View File

@ -24,6 +24,7 @@ enum AutobatchKind {
allow_index_creation: bool,
primary_key: Option<String>,
},
DocumentEdition,
DocumentDeletion,
DocumentDeletionByFilter,
DocumentClear,
@ -63,6 +64,7 @@ impl From<KindWithContent> for AutobatchKind {
primary_key,
..
} => AutobatchKind::DocumentImport { method, allow_index_creation, primary_key },
KindWithContent::DocumentEdition { .. } => AutobatchKind::DocumentEdition,
KindWithContent::DocumentDeletion { .. } => AutobatchKind::DocumentDeletion,
KindWithContent::DocumentClear { .. } => AutobatchKind::DocumentClear,
KindWithContent::DocumentDeletionByFilter { .. } => {
@ -98,6 +100,9 @@ pub enum BatchKind {
primary_key: Option<String>,
operation_ids: Vec<TaskId>,
},
DocumentEdition {
id: TaskId,
},
DocumentDeletion {
deletion_ids: Vec<TaskId>,
},
@ -199,6 +204,7 @@ impl BatchKind {
}),
allow_index_creation,
),
K::DocumentEdition => (Break(BatchKind::DocumentEdition { id: task_id }), false),
K::DocumentDeletion => {
(Continue(BatchKind::DocumentDeletion { deletion_ids: vec![task_id] }), false)
}
@ -222,7 +228,7 @@ impl BatchKind {
match (self, kind) {
// We don't batch any of these operations
(this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentDeletionByFilter) => Break(this),
(this, K::IndexCreation | K::IndexUpdate | K::IndexSwap | K::DocumentEdition | K::DocumentDeletionByFilter) => Break(this),
// We must not batch tasks that don't have the same index creation rights if the index doesn't already exists.
(this, kind) if !index_already_exists && this.allow_index_creation() == Some(false) && kind.allow_index_creation() == Some(true) => {
Break(this)
@ -519,6 +525,7 @@ impl BatchKind {
| BatchKind::IndexDeletion { .. }
| BatchKind::IndexUpdate { .. }
| BatchKind::IndexSwap { .. }
| BatchKind::DocumentEdition { .. }
| BatchKind::DocumentDeletionByFilter { .. },
_,
) => {

View File

@ -106,6 +106,10 @@ pub(crate) enum IndexOperation {
operations: Vec<DocumentOperation>,
tasks: Vec<Task>,
},
DocumentEdition {
index_uid: String,
task: Task,
},
IndexDocumentDeletionByFilter {
index_uid: String,
task: Task,
@ -164,7 +168,8 @@ impl Batch {
| IndexOperation::DocumentClear { tasks, .. } => {
RoaringBitmap::from_iter(tasks.iter().map(|task| task.uid))
}
IndexOperation::IndexDocumentDeletionByFilter { task, .. } => {
IndexOperation::DocumentEdition { task, .. }
| IndexOperation::IndexDocumentDeletionByFilter { task, .. } => {
RoaringBitmap::from_sorted_iter(std::iter::once(task.uid)).unwrap()
}
IndexOperation::SettingsAndDocumentOperation {
@ -228,6 +233,7 @@ impl IndexOperation {
pub fn index_uid(&self) -> &str {
match self {
IndexOperation::DocumentOperation { index_uid, .. }
| IndexOperation::DocumentEdition { index_uid, .. }
| IndexOperation::IndexDocumentDeletionByFilter { index_uid, .. }
| IndexOperation::DocumentClear { index_uid, .. }
| IndexOperation::Settings { index_uid, .. }
@ -243,6 +249,9 @@ impl fmt::Display for IndexOperation {
IndexOperation::DocumentOperation { .. } => {
f.write_str("IndexOperation::DocumentOperation")
}
IndexOperation::DocumentEdition { .. } => {
f.write_str("IndexOperation::DocumentEdition")
}
IndexOperation::IndexDocumentDeletionByFilter { .. } => {
f.write_str("IndexOperation::IndexDocumentDeletionByFilter")
}
@ -295,6 +304,21 @@ impl IndexScheduler {
_ => unreachable!(),
}
}
BatchKind::DocumentEdition { id } => {
let task = self.get_task(rtxn, id)?.ok_or(Error::CorruptedTaskQueue)?;
match &task.kind {
KindWithContent::DocumentEdition { index_uid, .. } => {
Ok(Some(Batch::IndexOperation {
op: IndexOperation::DocumentEdition {
index_uid: index_uid.clone(),
task,
},
must_create_index: false,
}))
}
_ => unreachable!(),
}
}
BatchKind::DocumentOperation { method, operation_ids, .. } => {
let tasks = self.get_existing_tasks(rtxn, operation_ids)?;
let primary_key = tasks
@ -1386,6 +1410,9 @@ impl IndexScheduler {
Ok(tasks)
}
IndexOperation::DocumentEdition { .. } => {
todo!()
}
IndexOperation::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => {
let filter =
if let KindWithContent::DocumentDeletionByFilter { filter_expr, .. } =

View File

@ -177,6 +177,12 @@ fn snapshot_details(d: &Details) -> String {
} => {
format!("{{ received_documents: {received_documents}, indexed_documents: {indexed_documents:?} }}")
}
Details::DocumentEdition {
edited_documents,
edition_code,
} => {
format!("{{ edited_documents: {edited_documents:?}, edition_code: {edition_code:?} }}")
}
Details::SettingsUpdate { settings } => {
format!("{{ settings: {settings:?} }}")
}

View File

@ -238,6 +238,7 @@ pub fn swap_index_uid_in_task(task: &mut Task, swap: (&str, &str)) {
let mut index_uids = vec![];
match &mut task.kind {
K::DocumentAdditionOrUpdate { index_uid, .. } => index_uids.push(index_uid),
K::DocumentEdition { index_uid, .. } => index_uids.push(index_uid),
K::DocumentDeletion { index_uid, .. } => index_uids.push(index_uid),
K::DocumentDeletionByFilter { index_uid, .. } => index_uids.push(index_uid),
K::DocumentClear { index_uid } => index_uids.push(index_uid),
@ -408,7 +409,26 @@ impl IndexScheduler {
match status {
Status::Succeeded => assert!(indexed_documents <= received_documents),
Status::Failed | Status::Canceled => assert_eq!(indexed_documents, 0),
status => panic!("DocumentAddition can't have an indexed_document set if it's {}", status),
status => panic!("DocumentAddition can't have an indexed_documents set if it's {}", status),
}
}
None => {
assert!(matches!(status, Status::Enqueued | Status::Processing))
}
}
}
Details::DocumentEdition { edited_documents, .. } => {
assert_eq!(kind.as_kind(), Kind::DocumentEdition);
match edited_documents {
Some(edited_documents) => {
assert!(matches!(
status,
Status::Succeeded | Status::Failed | Status::Canceled
));
match status {
Status::Succeeded => (),
Status::Failed | Status::Canceled => assert_eq!(edited_documents, 0),
status => panic!("DocumentEdition can't have an edited_documents set if it's {}", status),
}
}
None => {

View File

@ -54,6 +54,8 @@ pub struct DetailsView {
#[serde(skip_serializing_if = "Option::is_none")]
pub indexed_documents: Option<Option<u64>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub edited_documents: Option<Option<u64>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub primary_key: Option<Option<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub provided_ids: Option<usize>,
@ -70,6 +72,8 @@ pub struct DetailsView {
#[serde(skip_serializing_if = "Option::is_none")]
pub dump_uid: Option<Option<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub edition_code: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(flatten)]
pub settings: Option<Box<Settings<Unchecked>>>,
#[serde(skip_serializing_if = "Option::is_none")]
@ -86,6 +90,11 @@ impl From<Details> for DetailsView {
..DetailsView::default()
}
}
Details::DocumentEdition { edited_documents, edition_code } => DetailsView {
edited_documents: Some(edited_documents),
edition_code: Some(edition_code),
..DetailsView::default()
},
Details::SettingsUpdate { mut settings } => {
settings.hide_secrets();
DetailsView { settings: Some(settings), ..DetailsView::default() }

View File

@ -48,6 +48,7 @@ impl Task {
| TaskDeletion { .. }
| IndexSwap { .. } => None,
DocumentAdditionOrUpdate { index_uid, .. }
| DocumentEdition { index_uid, .. }
| DocumentDeletion { index_uid, .. }
| DocumentDeletionByFilter { index_uid, .. }
| DocumentClear { index_uid }
@ -67,7 +68,8 @@ impl Task {
pub fn content_uuid(&self) -> Option<Uuid> {
match self.kind {
KindWithContent::DocumentAdditionOrUpdate { content_file, .. } => Some(content_file),
KindWithContent::DocumentDeletion { .. }
KindWithContent::DocumentEdition { .. }
| KindWithContent::DocumentDeletion { .. }
| KindWithContent::DocumentDeletionByFilter { .. }
| KindWithContent::DocumentClear { .. }
| KindWithContent::SettingsUpdate { .. }
@ -94,6 +96,10 @@ pub enum KindWithContent {
documents_count: u64,
allow_index_creation: bool,
},
DocumentEdition {
index_uid: String,
edition_code: String,
},
DocumentDeletion {
index_uid: String,
documents_ids: Vec<String>,
@ -150,6 +156,7 @@ impl KindWithContent {
pub fn as_kind(&self) -> Kind {
match self {
KindWithContent::DocumentAdditionOrUpdate { .. } => Kind::DocumentAdditionOrUpdate,
KindWithContent::DocumentEdition { .. } => Kind::DocumentEdition,
KindWithContent::DocumentDeletion { .. } => Kind::DocumentDeletion,
KindWithContent::DocumentDeletionByFilter { .. } => Kind::DocumentDeletion,
KindWithContent::DocumentClear { .. } => Kind::DocumentDeletion,
@ -174,6 +181,7 @@ impl KindWithContent {
| TaskCancelation { .. }
| TaskDeletion { .. } => vec![],
DocumentAdditionOrUpdate { index_uid, .. }
| DocumentEdition { index_uid, .. }
| DocumentDeletion { index_uid, .. }
| DocumentDeletionByFilter { index_uid, .. }
| DocumentClear { index_uid }
@ -202,6 +210,12 @@ impl KindWithContent {
indexed_documents: None,
})
}
KindWithContent::DocumentEdition { edition_code, .. } => {
Some(Details::DocumentEdition {
edited_documents: None,
edition_code: edition_code.clone(),
})
}
KindWithContent::DocumentDeletion { index_uid: _, documents_ids } => {
Some(Details::DocumentDeletion {
provided_ids: documents_ids.len(),
@ -250,6 +264,12 @@ impl KindWithContent {
indexed_documents: Some(0),
})
}
KindWithContent::DocumentEdition { edition_code, .. } => {
Some(Details::DocumentEdition {
edited_documents: Some(0),
edition_code: edition_code.clone(),
})
}
KindWithContent::DocumentDeletion { index_uid: _, documents_ids } => {
Some(Details::DocumentDeletion {
provided_ids: documents_ids.len(),
@ -301,6 +321,12 @@ impl From<&KindWithContent> for Option<Details> {
indexed_documents: None,
})
}
KindWithContent::DocumentEdition { edition_code, .. } => {
Some(Details::DocumentEdition {
edited_documents: None,
edition_code: edition_code.clone(),
})
}
KindWithContent::DocumentDeletion { .. } => None,
KindWithContent::DocumentDeletionByFilter { .. } => None,
KindWithContent::DocumentClear { .. } => None,
@ -394,6 +420,7 @@ impl std::error::Error for ParseTaskStatusError {}
#[serde(rename_all = "camelCase")]
pub enum Kind {
DocumentAdditionOrUpdate,
DocumentEdition,
DocumentDeletion,
SettingsUpdate,
IndexCreation,
@ -410,6 +437,7 @@ impl Kind {
pub fn related_to_one_index(&self) -> bool {
match self {
Kind::DocumentAdditionOrUpdate
| Kind::DocumentEdition
| Kind::DocumentDeletion
| Kind::SettingsUpdate
| Kind::IndexCreation
@ -427,6 +455,7 @@ impl Display for Kind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Kind::DocumentAdditionOrUpdate => write!(f, "documentAdditionOrUpdate"),
Kind::DocumentEdition => write!(f, "documentEdition"),
Kind::DocumentDeletion => write!(f, "documentDeletion"),
Kind::SettingsUpdate => write!(f, "settingsUpdate"),
Kind::IndexCreation => write!(f, "indexCreation"),
@ -454,6 +483,8 @@ impl FromStr for Kind {
Ok(Kind::IndexDeletion)
} else if kind.eq_ignore_ascii_case("documentAdditionOrUpdate") {
Ok(Kind::DocumentAdditionOrUpdate)
} else if kind.eq_ignore_ascii_case("documentEdition") {
Ok(Kind::DocumentEdition)
} else if kind.eq_ignore_ascii_case("documentDeletion") {
Ok(Kind::DocumentDeletion)
} else if kind.eq_ignore_ascii_case("settingsUpdate") {
@ -496,6 +527,7 @@ impl std::error::Error for ParseTaskKindError {}
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub enum Details {
DocumentAdditionOrUpdate { received_documents: u64, indexed_documents: Option<u64> },
DocumentEdition { edited_documents: Option<u64>, edition_code: String },
SettingsUpdate { settings: Box<Settings<Unchecked>> },
IndexInfo { primary_key: Option<String> },
DocumentDeletion { provided_ids: usize, deleted_documents: Option<u64> },
@ -514,6 +546,7 @@ impl Details {
Self::DocumentAdditionOrUpdate { indexed_documents, .. } => {
*indexed_documents = Some(0)
}
Self::DocumentEdition { edited_documents, .. } => *edited_documents = Some(0),
Self::DocumentDeletion { deleted_documents, .. } => *deleted_documents = Some(0),
Self::DocumentDeletionByFilter { deleted_documents, .. } => {
*deleted_documents = Some(0)