mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-01-25 20:57:35 +01:00
Merge both DocumentAddition/Update into one DocumentImport variant
This commit is contained in:
parent
5174c78f87
commit
f68906f5dc
@ -1,3 +1,4 @@
|
||||
use milli::update::IndexDocumentsMethod::{self, ReplaceDocuments, UpdateDocuments};
|
||||
use std::ops::ControlFlow;
|
||||
|
||||
use crate::{task::Kind, TaskId};
|
||||
@ -7,11 +8,9 @@ pub enum BatchKind {
|
||||
DocumentClear {
|
||||
ids: Vec<TaskId>,
|
||||
},
|
||||
DocumentAddition {
|
||||
addition_ids: Vec<TaskId>,
|
||||
},
|
||||
DocumentUpdate {
|
||||
update_ids: Vec<TaskId>,
|
||||
DocumentImport {
|
||||
method: IndexDocumentsMethod,
|
||||
import_ids: Vec<TaskId>,
|
||||
},
|
||||
DocumentDeletion {
|
||||
deletion_ids: Vec<TaskId>,
|
||||
@ -20,13 +19,10 @@ pub enum BatchKind {
|
||||
other: Vec<TaskId>,
|
||||
settings_ids: Vec<TaskId>,
|
||||
},
|
||||
SettingsAndDocumentAddition {
|
||||
SettingsAndDocumentImport {
|
||||
settings_ids: Vec<TaskId>,
|
||||
addition_ids: Vec<TaskId>,
|
||||
},
|
||||
SettingsAndDocumentUpdate {
|
||||
settings_ids: Vec<TaskId>,
|
||||
update_ids: Vec<TaskId>,
|
||||
method: IndexDocumentsMethod,
|
||||
import_ids: Vec<TaskId>,
|
||||
},
|
||||
Settings {
|
||||
settings_ids: Vec<TaskId>,
|
||||
@ -59,14 +55,16 @@ impl BatchKind {
|
||||
Kind::IndexSwap => (BatchKind::IndexSwap { id: task_id }, true),
|
||||
Kind::DocumentClear => (BatchKind::DocumentClear { ids: vec![task_id] }, false),
|
||||
Kind::DocumentAddition => (
|
||||
BatchKind::DocumentAddition {
|
||||
addition_ids: vec![task_id],
|
||||
BatchKind::DocumentImport {
|
||||
method: ReplaceDocuments,
|
||||
import_ids: vec![task_id],
|
||||
},
|
||||
false,
|
||||
),
|
||||
Kind::DocumentUpdate => (
|
||||
BatchKind::DocumentUpdate {
|
||||
update_ids: vec![task_id],
|
||||
BatchKind::DocumentImport {
|
||||
method: UpdateDocuments,
|
||||
import_ids: vec![task_id],
|
||||
},
|
||||
false,
|
||||
),
|
||||
@ -98,11 +96,9 @@ impl BatchKind {
|
||||
// The index deletion can batch with everything but must stop after
|
||||
(
|
||||
BatchKind::DocumentClear { mut ids }
|
||||
| BatchKind::DocumentAddition {
|
||||
addition_ids: mut ids,
|
||||
}
|
||||
| BatchKind::DocumentUpdate {
|
||||
update_ids: mut ids,
|
||||
| BatchKind::DocumentImport {
|
||||
method: _,
|
||||
import_ids: mut ids,
|
||||
}
|
||||
| BatchKind::DocumentDeletion {
|
||||
deletion_ids: mut ids,
|
||||
@ -120,12 +116,9 @@ impl BatchKind {
|
||||
settings_ids: mut ids,
|
||||
mut other,
|
||||
}
|
||||
| BatchKind::SettingsAndDocumentAddition {
|
||||
addition_ids: mut ids,
|
||||
settings_ids: mut other,
|
||||
}
|
||||
| BatchKind::SettingsAndDocumentUpdate {
|
||||
update_ids: mut ids,
|
||||
| BatchKind::SettingsAndDocumentImport {
|
||||
import_ids: mut ids,
|
||||
method: _,
|
||||
settings_ids: mut other,
|
||||
},
|
||||
Kind::IndexDeletion,
|
||||
@ -147,11 +140,9 @@ impl BatchKind {
|
||||
Kind::DocumentAddition | Kind::DocumentUpdate | Kind::Settings,
|
||||
) => ControlFlow::Break(this),
|
||||
(
|
||||
BatchKind::DocumentAddition {
|
||||
addition_ids: mut ids,
|
||||
}
|
||||
| BatchKind::DocumentUpdate {
|
||||
update_ids: mut ids,
|
||||
BatchKind::DocumentImport {
|
||||
method: _,
|
||||
import_ids: mut ids,
|
||||
},
|
||||
Kind::DocumentClear,
|
||||
) => {
|
||||
@ -160,30 +151,43 @@ impl BatchKind {
|
||||
}
|
||||
|
||||
// we can autobatch the same kind of document additions / updates
|
||||
(BatchKind::DocumentAddition { mut addition_ids }, Kind::DocumentAddition) => {
|
||||
addition_ids.push(id);
|
||||
ControlFlow::Continue(BatchKind::DocumentAddition { addition_ids })
|
||||
(
|
||||
BatchKind::DocumentImport {
|
||||
method: ReplaceDocuments,
|
||||
mut import_ids,
|
||||
},
|
||||
Kind::DocumentAddition,
|
||||
) => {
|
||||
import_ids.push(id);
|
||||
ControlFlow::Continue(BatchKind::DocumentImport {
|
||||
method: ReplaceDocuments,
|
||||
import_ids,
|
||||
})
|
||||
}
|
||||
(BatchKind::DocumentUpdate { mut update_ids }, Kind::DocumentUpdate) => {
|
||||
update_ids.push(id);
|
||||
ControlFlow::Continue(BatchKind::DocumentUpdate { update_ids })
|
||||
(
|
||||
BatchKind::DocumentImport {
|
||||
method: UpdateDocuments,
|
||||
mut import_ids,
|
||||
},
|
||||
Kind::DocumentUpdate,
|
||||
) => {
|
||||
import_ids.push(id);
|
||||
ControlFlow::Continue(BatchKind::DocumentImport {
|
||||
method: UpdateDocuments,
|
||||
import_ids,
|
||||
})
|
||||
}
|
||||
// but we can't autobatch documents if it's not the same kind
|
||||
// this match branch MUST be AFTER the previous one
|
||||
(
|
||||
this @ BatchKind::DocumentAddition { .. } | this @ BatchKind::DocumentUpdate { .. },
|
||||
this @ BatchKind::DocumentImport { .. },
|
||||
Kind::DocumentDeletion | Kind::DocumentAddition | Kind::DocumentUpdate,
|
||||
) => ControlFlow::Break(this),
|
||||
(BatchKind::DocumentAddition { addition_ids }, Kind::Settings) => {
|
||||
ControlFlow::Continue(BatchKind::SettingsAndDocumentAddition {
|
||||
(BatchKind::DocumentImport { method, import_ids }, Kind::Settings) => {
|
||||
ControlFlow::Continue(BatchKind::SettingsAndDocumentImport {
|
||||
settings_ids: vec![id],
|
||||
addition_ids,
|
||||
})
|
||||
}
|
||||
(BatchKind::DocumentUpdate { update_ids }, Kind::Settings) => {
|
||||
ControlFlow::Continue(BatchKind::SettingsAndDocumentUpdate {
|
||||
settings_ids: vec![id],
|
||||
update_ids,
|
||||
method,
|
||||
import_ids,
|
||||
})
|
||||
}
|
||||
|
||||
@ -260,18 +264,14 @@ impl BatchKind {
|
||||
})
|
||||
}
|
||||
(
|
||||
BatchKind::SettingsAndDocumentAddition {
|
||||
BatchKind::SettingsAndDocumentImport {
|
||||
settings_ids,
|
||||
addition_ids: mut other,
|
||||
}
|
||||
| BatchKind::SettingsAndDocumentUpdate {
|
||||
settings_ids,
|
||||
update_ids: mut other,
|
||||
method: _,
|
||||
import_ids: mut other,
|
||||
},
|
||||
Kind::DocumentClear,
|
||||
) => {
|
||||
other.push(id);
|
||||
|
||||
ControlFlow::Continue(BatchKind::ClearAndSettings {
|
||||
settings_ids,
|
||||
other,
|
||||
@ -280,62 +280,54 @@ impl BatchKind {
|
||||
|
||||
// we can batch the settings with a kind of document operation with the same kind of document operation
|
||||
(
|
||||
BatchKind::SettingsAndDocumentAddition {
|
||||
mut addition_ids,
|
||||
BatchKind::SettingsAndDocumentImport {
|
||||
settings_ids,
|
||||
method: ReplaceDocuments,
|
||||
mut import_ids,
|
||||
},
|
||||
Kind::DocumentAddition,
|
||||
) => {
|
||||
addition_ids.push(id);
|
||||
ControlFlow::Continue(BatchKind::SettingsAndDocumentAddition {
|
||||
addition_ids,
|
||||
import_ids.push(id);
|
||||
ControlFlow::Continue(BatchKind::SettingsAndDocumentImport {
|
||||
settings_ids,
|
||||
method: ReplaceDocuments,
|
||||
import_ids,
|
||||
})
|
||||
}
|
||||
(
|
||||
BatchKind::SettingsAndDocumentUpdate {
|
||||
mut update_ids,
|
||||
BatchKind::SettingsAndDocumentImport {
|
||||
settings_ids,
|
||||
method: UpdateDocuments,
|
||||
mut import_ids,
|
||||
},
|
||||
Kind::DocumentUpdate,
|
||||
) => {
|
||||
update_ids.push(id);
|
||||
ControlFlow::Continue(BatchKind::SettingsAndDocumentUpdate {
|
||||
update_ids,
|
||||
import_ids.push(id);
|
||||
ControlFlow::Continue(BatchKind::SettingsAndDocumentImport {
|
||||
settings_ids,
|
||||
method: UpdateDocuments,
|
||||
import_ids,
|
||||
})
|
||||
}
|
||||
// But we can't batch a settings and a doc op with another doc op
|
||||
// this MUST be AFTER the two previous branch
|
||||
(
|
||||
this @ BatchKind::SettingsAndDocumentAddition { .. }
|
||||
| this @ BatchKind::SettingsAndDocumentUpdate { .. },
|
||||
this @ BatchKind::SettingsAndDocumentImport { .. },
|
||||
Kind::DocumentDeletion | Kind::DocumentAddition | Kind::DocumentUpdate,
|
||||
) => ControlFlow::Break(this),
|
||||
(
|
||||
BatchKind::SettingsAndDocumentAddition {
|
||||
BatchKind::SettingsAndDocumentImport {
|
||||
mut settings_ids,
|
||||
addition_ids,
|
||||
method,
|
||||
import_ids,
|
||||
},
|
||||
Kind::Settings,
|
||||
) => {
|
||||
settings_ids.push(id);
|
||||
ControlFlow::Continue(BatchKind::SettingsAndDocumentAddition {
|
||||
ControlFlow::Continue(BatchKind::SettingsAndDocumentImport {
|
||||
settings_ids,
|
||||
addition_ids,
|
||||
})
|
||||
}
|
||||
(
|
||||
BatchKind::SettingsAndDocumentUpdate {
|
||||
mut settings_ids,
|
||||
update_ids,
|
||||
},
|
||||
Kind::Settings,
|
||||
) => {
|
||||
settings_ids.push(id);
|
||||
ControlFlow::Continue(BatchKind::SettingsAndDocumentUpdate {
|
||||
settings_ids,
|
||||
update_ids,
|
||||
method,
|
||||
import_ids,
|
||||
})
|
||||
}
|
||||
(_, Kind::CancelTask | Kind::DumpExport | Kind::Snapshot) => unreachable!(),
|
||||
@ -391,11 +383,11 @@ mod tests {
|
||||
#[test]
|
||||
fn autobatch_simple_operation_together() {
|
||||
// we can autobatch one or multiple DocumentAddition together
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition]), @"Some(DocumentAddition { addition_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, DocumentAddition, DocumentAddition]), @"Some(DocumentAddition { addition_ids: [0, 1, 2] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition]), @"Some(DocumentImport { method: ReplaceDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, DocumentAddition, DocumentAddition]), @"Some(DocumentImport { method: ReplaceDocuments, import_ids: [0, 1, 2] })");
|
||||
// we can autobatch one or multiple DocumentUpdate together
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate]), @"Some(DocumentUpdate { update_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, DocumentUpdate, DocumentUpdate]), @"Some(DocumentUpdate { update_ids: [0, 1, 2] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate]), @"Some(DocumentImport { method: UpdateDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, DocumentUpdate, DocumentUpdate]), @"Some(DocumentImport { method: UpdateDocuments, import_ids: [0, 1, 2] })");
|
||||
// we can autobatch one or multiple DocumentDeletion together
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion]), @"Some(DocumentDeletion { deletion_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, DocumentDeletion, DocumentDeletion]), @"Some(DocumentDeletion { deletion_ids: [0, 1, 2] })");
|
||||
@ -407,57 +399,57 @@ mod tests {
|
||||
#[test]
|
||||
fn simple_document_operation_dont_autobatch_with_other() {
|
||||
// addition, updates and deletion can't batch together
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, DocumentUpdate]), @"Some(DocumentAddition { addition_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, DocumentDeletion]), @"Some(DocumentAddition { addition_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, DocumentAddition]), @"Some(DocumentUpdate { update_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, DocumentDeletion]), @"Some(DocumentUpdate { update_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, DocumentUpdate]), @"Some(DocumentImport { method: ReplaceDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, DocumentDeletion]), @"Some(DocumentImport { method: ReplaceDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, DocumentAddition]), @"Some(DocumentImport { method: UpdateDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, DocumentDeletion]), @"Some(DocumentImport { method: UpdateDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, DocumentAddition]), @"Some(DocumentDeletion { deletion_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, DocumentUpdate]), @"Some(DocumentDeletion { deletion_ids: [0] })");
|
||||
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, IndexCreation]), @"Some(DocumentAddition { addition_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, IndexCreation]), @"Some(DocumentUpdate { update_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, IndexCreation]), @"Some(DocumentImport { method: ReplaceDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, IndexCreation]), @"Some(DocumentImport { method: UpdateDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, IndexCreation]), @"Some(DocumentDeletion { deletion_ids: [0] })");
|
||||
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, IndexUpdate]), @"Some(DocumentAddition { addition_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, IndexUpdate]), @"Some(DocumentUpdate { update_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, IndexUpdate]), @"Some(DocumentImport { method: ReplaceDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, IndexUpdate]), @"Some(DocumentImport { method: UpdateDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, IndexUpdate]), @"Some(DocumentDeletion { deletion_ids: [0] })");
|
||||
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, IndexRename]), @"Some(DocumentAddition { addition_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, IndexRename]), @"Some(DocumentUpdate { update_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, IndexRename]), @"Some(DocumentImport { method: ReplaceDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, IndexRename]), @"Some(DocumentImport { method: UpdateDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, IndexRename]), @"Some(DocumentDeletion { deletion_ids: [0] })");
|
||||
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, IndexSwap]), @"Some(DocumentAddition { addition_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, IndexSwap]), @"Some(DocumentUpdate { update_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, IndexSwap]), @"Some(DocumentImport { method: ReplaceDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, IndexSwap]), @"Some(DocumentImport { method: UpdateDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentDeletion, IndexSwap]), @"Some(DocumentDeletion { deletion_ids: [0] })");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn document_addition_batch_with_settings() {
|
||||
// simple case
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings]), @"Some(SettingsAndDocumentAddition { settings_ids: [1], addition_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings]), @"Some(SettingsAndDocumentUpdate { settings_ids: [1], update_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, import_ids: [0] })");
|
||||
|
||||
// multiple settings and doc addition
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, DocumentAddition, Settings, Settings]), @"Some(SettingsAndDocumentAddition { settings_ids: [2, 3], addition_ids: [0, 1] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, DocumentAddition, Settings, Settings]), @"Some(SettingsAndDocumentAddition { settings_ids: [2, 3], addition_ids: [0, 1] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, DocumentAddition, Settings, Settings]), @"Some(SettingsAndDocumentImport { settings_ids: [2, 3], method: ReplaceDocuments, import_ids: [0, 1] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, DocumentAddition, Settings, Settings]), @"Some(SettingsAndDocumentImport { settings_ids: [2, 3], method: ReplaceDocuments, import_ids: [0, 1] })");
|
||||
|
||||
// addition and setting unordered
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, DocumentAddition, Settings]), @"Some(SettingsAndDocumentAddition { settings_ids: [1, 3], addition_ids: [0, 2] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, DocumentUpdate, Settings]), @"Some(SettingsAndDocumentUpdate { settings_ids: [1, 3], update_ids: [0, 2] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, DocumentAddition, Settings]), @"Some(SettingsAndDocumentImport { settings_ids: [1, 3], method: ReplaceDocuments, import_ids: [0, 2] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, DocumentUpdate, Settings]), @"Some(SettingsAndDocumentImport { settings_ids: [1, 3], method: UpdateDocuments, import_ids: [0, 2] })");
|
||||
|
||||
// We ensure this kind of batch doesn't batch with forbidden operations
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, DocumentUpdate]), @"Some(SettingsAndDocumentAddition { settings_ids: [1], addition_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, DocumentAddition]), @"Some(SettingsAndDocumentUpdate { settings_ids: [1], update_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, DocumentDeletion]), @"Some(SettingsAndDocumentAddition { settings_ids: [1], addition_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, DocumentDeletion]), @"Some(SettingsAndDocumentUpdate { settings_ids: [1], update_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, IndexCreation]), @"Some(SettingsAndDocumentAddition { settings_ids: [1], addition_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, IndexCreation]), @"Some(SettingsAndDocumentUpdate { settings_ids: [1], update_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, IndexUpdate]), @"Some(SettingsAndDocumentAddition { settings_ids: [1], addition_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, IndexUpdate]), @"Some(SettingsAndDocumentUpdate { settings_ids: [1], update_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, IndexRename]), @"Some(SettingsAndDocumentAddition { settings_ids: [1], addition_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, IndexRename]), @"Some(SettingsAndDocumentUpdate { settings_ids: [1], update_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, IndexSwap]), @"Some(SettingsAndDocumentAddition { settings_ids: [1], addition_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, IndexSwap]), @"Some(SettingsAndDocumentUpdate { settings_ids: [1], update_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, DocumentUpdate]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, DocumentAddition]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, DocumentDeletion]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, DocumentDeletion]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, IndexCreation]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, IndexCreation]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, IndexUpdate]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, IndexUpdate]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, IndexRename]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, IndexRename]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentAddition, Settings, IndexSwap]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: ReplaceDocuments, import_ids: [0] })");
|
||||
assert_smol_debug_snapshot!(autobatch_from([DocumentUpdate, Settings, IndexSwap]), @"Some(SettingsAndDocumentImport { settings_ids: [1], method: UpdateDocuments, import_ids: [0] })");
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -12,15 +12,10 @@ pub(crate) enum Batch {
|
||||
Cancel(Task),
|
||||
Snapshot(Vec<Task>),
|
||||
Dump(Vec<Task>),
|
||||
DocumentAddition {
|
||||
index_uid: String,
|
||||
primary_key: Option<String>,
|
||||
content_files: Vec<Uuid>,
|
||||
tasks: Vec<Task>,
|
||||
},
|
||||
DocumentUpdate {
|
||||
DocumentImport {
|
||||
index_uid: String,
|
||||
primary_key: Option<String>,
|
||||
method: IndexDocumentsMethod,
|
||||
content_files: Vec<Uuid>,
|
||||
tasks: Vec<Task>,
|
||||
},
|
||||
@ -47,23 +42,13 @@ pub(crate) enum Batch {
|
||||
settings: Vec<(bool, Settings<Unchecked>)>,
|
||||
settings_tasks: Vec<Task>,
|
||||
},
|
||||
SettingsAndDocumentAddition {
|
||||
SettingsAndDocumentImport {
|
||||
index_uid: String,
|
||||
|
||||
primary_key: Option<String>,
|
||||
method: IndexDocumentsMethod,
|
||||
content_files: Vec<Uuid>,
|
||||
document_addition_tasks: Vec<Task>,
|
||||
|
||||
// TODO what's that boolean, does it mean that it removes things or what?
|
||||
settings: Vec<(bool, Settings<Unchecked>)>,
|
||||
settings_tasks: Vec<Task>,
|
||||
},
|
||||
SettingsAndDocumentUpdate {
|
||||
index_uid: String,
|
||||
|
||||
primary_key: Option<String>,
|
||||
content_files: Vec<Uuid>,
|
||||
document_update_tasks: Vec<Task>,
|
||||
document_import_tasks: Vec<Task>,
|
||||
|
||||
// TODO what's that boolean, does it mean that it removes things or what?
|
||||
settings: Vec<(bool, Settings<Unchecked>)>,
|
||||
@ -93,14 +78,13 @@ impl Batch {
|
||||
| Batch::IndexUpdate { task, .. } => vec![task.uid],
|
||||
Batch::Snapshot(tasks)
|
||||
| Batch::Dump(tasks)
|
||||
| Batch::DocumentAddition { tasks, .. }
|
||||
| Batch::DocumentUpdate { tasks, .. }
|
||||
| Batch::DocumentImport { tasks, .. }
|
||||
| Batch::DocumentDeletion { tasks, .. }
|
||||
| Batch::Settings { tasks, .. }
|
||||
| Batch::DocumentClear { tasks, .. }
|
||||
| Batch::IndexDeletion { tasks, .. } => tasks.iter().map(|task| task.uid).collect(),
|
||||
Batch::SettingsAndDocumentAddition {
|
||||
document_addition_tasks: tasks,
|
||||
Batch::SettingsAndDocumentImport {
|
||||
document_import_tasks: tasks,
|
||||
settings_tasks: other,
|
||||
..
|
||||
}
|
||||
@ -108,11 +92,6 @@ impl Batch {
|
||||
cleared_tasks: tasks,
|
||||
settings_tasks: other,
|
||||
..
|
||||
}
|
||||
| Batch::SettingsAndDocumentUpdate {
|
||||
document_update_tasks: tasks,
|
||||
settings_tasks: other,
|
||||
..
|
||||
} => tasks.iter().chain(other).map(|task| task.uid).collect(),
|
||||
}
|
||||
}
|
||||
@ -130,44 +109,24 @@ impl IndexScheduler {
|
||||
tasks: self.get_existing_tasks(rtxn, ids)?,
|
||||
index_uid,
|
||||
})),
|
||||
BatchKind::DocumentAddition { addition_ids } => {
|
||||
let tasks = self.get_existing_tasks(rtxn, addition_ids)?;
|
||||
BatchKind::DocumentImport { method, import_ids } => {
|
||||
let tasks = self.get_existing_tasks(rtxn, import_ids)?;
|
||||
let primary_key = match &tasks[0].kind {
|
||||
KindWithContent::DocumentAddition { primary_key, .. } => primary_key.clone(),
|
||||
KindWithContent::DocumentImport { primary_key, .. } => primary_key.clone(),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let content_files = tasks
|
||||
.iter()
|
||||
.map(|task| match task.kind {
|
||||
KindWithContent::DocumentAddition { content_file, .. } => content_file,
|
||||
KindWithContent::DocumentImport { content_file, .. } => content_file,
|
||||
_ => unreachable!(),
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(Some(Batch::DocumentAddition {
|
||||
index_uid,
|
||||
primary_key,
|
||||
content_files,
|
||||
tasks,
|
||||
}))
|
||||
}
|
||||
BatchKind::DocumentUpdate { update_ids } => {
|
||||
let tasks = self.get_existing_tasks(rtxn, update_ids)?;
|
||||
let primary_key = match &tasks[0].kind {
|
||||
KindWithContent::DocumentUpdate { primary_key, .. } => primary_key.clone(),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let content_files = tasks
|
||||
.iter()
|
||||
.map(|task| match task.kind {
|
||||
KindWithContent::DocumentUpdate { content_file, .. } => content_file,
|
||||
_ => unreachable!(),
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok(Some(Batch::DocumentUpdate {
|
||||
Ok(Some(Batch::DocumentImport {
|
||||
index_uid,
|
||||
primary_key,
|
||||
method,
|
||||
content_files,
|
||||
tasks,
|
||||
}))
|
||||
@ -246,51 +205,10 @@ impl IndexScheduler {
|
||||
settings_tasks,
|
||||
}))
|
||||
}
|
||||
BatchKind::SettingsAndDocumentAddition {
|
||||
addition_ids,
|
||||
settings_ids,
|
||||
} => {
|
||||
let (index_uid, settings, settings_tasks) = match self
|
||||
.create_next_batch_index(rtxn, index_uid, BatchKind::Settings { settings_ids })?
|
||||
.unwrap()
|
||||
{
|
||||
Batch::Settings {
|
||||
index_uid,
|
||||
settings,
|
||||
tasks,
|
||||
} => (index_uid, settings, tasks),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
let (index_uid, primary_key, content_files, document_addition_tasks) = match self
|
||||
.create_next_batch_index(
|
||||
rtxn,
|
||||
index_uid,
|
||||
BatchKind::DocumentAddition { addition_ids },
|
||||
)?
|
||||
.unwrap()
|
||||
{
|
||||
Batch::DocumentAddition {
|
||||
index_uid,
|
||||
primary_key,
|
||||
content_files,
|
||||
tasks,
|
||||
} => (index_uid, primary_key, content_files, tasks),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
Ok(Some(Batch::SettingsAndDocumentAddition {
|
||||
index_uid,
|
||||
primary_key,
|
||||
content_files,
|
||||
document_addition_tasks,
|
||||
settings,
|
||||
settings_tasks,
|
||||
}))
|
||||
}
|
||||
BatchKind::SettingsAndDocumentUpdate {
|
||||
update_ids,
|
||||
BatchKind::SettingsAndDocumentImport {
|
||||
settings_ids,
|
||||
method,
|
||||
import_ids,
|
||||
} => {
|
||||
let settings = self.create_next_batch_index(
|
||||
rtxn,
|
||||
@ -298,18 +216,18 @@ impl IndexScheduler {
|
||||
BatchKind::Settings { settings_ids },
|
||||
)?;
|
||||
|
||||
let document_update = self.create_next_batch_index(
|
||||
let document_import = self.create_next_batch_index(
|
||||
rtxn,
|
||||
index_uid.clone(),
|
||||
BatchKind::DocumentUpdate { update_ids },
|
||||
BatchKind::DocumentImport { method, import_ids },
|
||||
)?;
|
||||
|
||||
match (document_update, settings) {
|
||||
match (document_import, settings) {
|
||||
(
|
||||
Some(Batch::DocumentUpdate {
|
||||
Some(Batch::DocumentImport {
|
||||
primary_key,
|
||||
content_files,
|
||||
tasks: document_update_tasks,
|
||||
tasks: document_import_tasks,
|
||||
..
|
||||
}),
|
||||
Some(Batch::Settings {
|
||||
@ -317,11 +235,12 @@ impl IndexScheduler {
|
||||
tasks: settings_tasks,
|
||||
..
|
||||
}),
|
||||
) => Ok(Some(Batch::SettingsAndDocumentUpdate {
|
||||
) => Ok(Some(Batch::SettingsAndDocumentImport {
|
||||
index_uid,
|
||||
primary_key,
|
||||
method,
|
||||
content_files,
|
||||
document_update_tasks,
|
||||
document_import_tasks,
|
||||
settings,
|
||||
settings_tasks,
|
||||
})),
|
||||
@ -453,10 +372,10 @@ impl IndexScheduler {
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
// TODO we should merge both document import with a method field
|
||||
Batch::DocumentAddition {
|
||||
Batch::DocumentImport {
|
||||
index_uid,
|
||||
primary_key,
|
||||
method,
|
||||
content_files,
|
||||
mut tasks,
|
||||
} => {
|
||||
@ -467,7 +386,7 @@ impl IndexScheduler {
|
||||
wtxn.commit()?;
|
||||
|
||||
let ret = index.update_documents(
|
||||
IndexDocumentsMethod::ReplaceDocuments,
|
||||
method,
|
||||
primary_key,
|
||||
self.file_store.clone(),
|
||||
content_files,
|
||||
@ -490,53 +409,17 @@ impl IndexScheduler {
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
Batch::SettingsAndDocumentAddition {
|
||||
Batch::SettingsAndDocumentImport {
|
||||
index_uid,
|
||||
primary_key,
|
||||
method,
|
||||
content_files,
|
||||
document_addition_tasks,
|
||||
document_import_tasks,
|
||||
settings: _,
|
||||
settings_tasks: _,
|
||||
} => {
|
||||
todo!();
|
||||
}
|
||||
// TODO we should merge both document import with a method field
|
||||
Batch::DocumentUpdate {
|
||||
index_uid,
|
||||
primary_key,
|
||||
content_files,
|
||||
mut tasks,
|
||||
} => {
|
||||
// we NEED a write transaction for the index creation.
|
||||
// To avoid blocking the whole process we're going to commit asap.
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
let index = self.index_mapper.create_index(&mut wtxn, &index_uid)?;
|
||||
wtxn.commit()?;
|
||||
|
||||
let ret = index.update_documents(
|
||||
IndexDocumentsMethod::UpdateDocuments,
|
||||
primary_key,
|
||||
self.file_store.clone(),
|
||||
content_files,
|
||||
)?;
|
||||
|
||||
for (task, ret) in tasks.iter_mut().zip(ret) {
|
||||
match ret {
|
||||
Ok(DocumentAdditionResult {
|
||||
indexed_documents,
|
||||
number_of_documents,
|
||||
}) => {
|
||||
task.details = Some(Details::DocumentAddition {
|
||||
received_documents: number_of_documents,
|
||||
indexed_documents,
|
||||
});
|
||||
}
|
||||
Err(error) => task.error = Some(error.into()),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
Batch::DocumentDeletion {
|
||||
index_uid,
|
||||
documents,
|
||||
@ -628,14 +511,6 @@ impl IndexScheduler {
|
||||
tasks.append(&mut settings_tasks);
|
||||
Ok(tasks)
|
||||
}
|
||||
Batch::SettingsAndDocumentUpdate {
|
||||
index_uid,
|
||||
primary_key,
|
||||
content_files,
|
||||
document_update_tasks,
|
||||
settings,
|
||||
settings_tasks,
|
||||
} => todo!(),
|
||||
Batch::IndexCreation {
|
||||
index_uid,
|
||||
primary_key,
|
||||
|
@ -84,10 +84,7 @@ impl Query {
|
||||
}
|
||||
|
||||
pub fn with_limit(self, limit: u32) -> Self {
|
||||
Self {
|
||||
limit,
|
||||
..self
|
||||
}
|
||||
Self { limit, ..self }
|
||||
}
|
||||
}
|
||||
|
||||
@ -435,6 +432,7 @@ impl IndexScheduler {
|
||||
mod tests {
|
||||
use big_s::S;
|
||||
use insta::*;
|
||||
use milli::update::IndexDocumentsMethod::{self, ReplaceDocuments, UpdateDocuments};
|
||||
use tempfile::TempDir;
|
||||
use uuid::Uuid;
|
||||
|
||||
@ -501,24 +499,27 @@ mod tests {
|
||||
index_uid: S("catto"),
|
||||
primary_key: Some(S("mouse")),
|
||||
},
|
||||
KindWithContent::DocumentAddition {
|
||||
KindWithContent::DocumentImport {
|
||||
index_uid: S("catto"),
|
||||
primary_key: None,
|
||||
method: ReplaceDocuments,
|
||||
content_file: Uuid::new_v4(),
|
||||
documents_count: 12,
|
||||
allow_index_creation: true,
|
||||
},
|
||||
KindWithContent::CancelTask { tasks: vec![0, 1] },
|
||||
KindWithContent::DocumentAddition {
|
||||
KindWithContent::DocumentImport {
|
||||
index_uid: S("catto"),
|
||||
primary_key: None,
|
||||
method: ReplaceDocuments,
|
||||
content_file: Uuid::new_v4(),
|
||||
documents_count: 50,
|
||||
allow_index_creation: true,
|
||||
},
|
||||
KindWithContent::DocumentAddition {
|
||||
KindWithContent::DocumentImport {
|
||||
index_uid: S("doggo"),
|
||||
primary_key: Some(S("bone")),
|
||||
method: ReplaceDocuments,
|
||||
content_file: Uuid::new_v4(),
|
||||
documents_count: 5000,
|
||||
allow_index_creation: true,
|
||||
@ -603,9 +604,10 @@ mod tests {
|
||||
let documents_count =
|
||||
document_formats::read_json(content.as_bytes(), file.as_file_mut()).unwrap();
|
||||
index_scheduler
|
||||
.register(KindWithContent::DocumentAddition {
|
||||
.register(KindWithContent::DocumentImport {
|
||||
index_uid: S("doggos"),
|
||||
primary_key: Some(S("id")),
|
||||
method: ReplaceDocuments,
|
||||
content_file: uuid,
|
||||
documents_count,
|
||||
allow_index_creation: true,
|
||||
@ -633,7 +635,7 @@ mod tests {
|
||||
|
||||
// Once the task has started being batched it should be marked as processing
|
||||
let task = index_scheduler.get_tasks(Query::default()).unwrap();
|
||||
assert_json_snapshot!(task,
|
||||
assert_json_snapshot!(task,
|
||||
{ "[].enqueuedAt" => "date", "[].startedAt" => "date", "[].finishedAt" => "date", "[].duration" => "duration" }
|
||||
,@r###"
|
||||
[
|
||||
@ -650,7 +652,7 @@ mod tests {
|
||||
handle.wait_till(Breakpoint::AfterProcessing);
|
||||
|
||||
let task = index_scheduler.get_tasks(Query::default()).unwrap();
|
||||
assert_json_snapshot!(task,
|
||||
assert_json_snapshot!(task,
|
||||
{ "[].enqueuedAt" => "date", "[].startedAt" => "date", "[].finishedAt" => "date", "[].duration" => "duration" }
|
||||
,@r###"
|
||||
[
|
||||
|
@ -1,6 +1,7 @@
|
||||
use anyhow::Result;
|
||||
use index::{Settings, Unchecked};
|
||||
use meilisearch_types::error::ResponseError;
|
||||
use milli::update::IndexDocumentsMethod;
|
||||
|
||||
use serde::{Deserialize, Serialize, Serializer};
|
||||
use std::{fmt::Write, path::PathBuf, str::FromStr};
|
||||
@ -125,16 +126,10 @@ impl FromStr for Status {
|
||||
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub enum KindWithContent {
|
||||
DocumentAddition {
|
||||
index_uid: String,
|
||||
primary_key: Option<String>,
|
||||
content_file: Uuid,
|
||||
documents_count: usize,
|
||||
allow_index_creation: bool,
|
||||
},
|
||||
DocumentUpdate {
|
||||
DocumentImport {
|
||||
index_uid: String,
|
||||
primary_key: Option<String>,
|
||||
method: IndexDocumentsMethod,
|
||||
content_file: Uuid,
|
||||
documents_count: usize,
|
||||
allow_index_creation: bool,
|
||||
@ -183,8 +178,15 @@ pub enum KindWithContent {
|
||||
impl KindWithContent {
|
||||
pub fn as_kind(&self) -> Kind {
|
||||
match self {
|
||||
KindWithContent::DocumentAddition { .. } => Kind::DocumentAddition,
|
||||
KindWithContent::DocumentUpdate { .. } => Kind::DocumentUpdate,
|
||||
KindWithContent::DocumentImport {
|
||||
method: IndexDocumentsMethod::ReplaceDocuments,
|
||||
..
|
||||
} => Kind::DocumentAddition,
|
||||
KindWithContent::DocumentImport {
|
||||
method: IndexDocumentsMethod::UpdateDocuments,
|
||||
..
|
||||
} => Kind::DocumentUpdate,
|
||||
KindWithContent::DocumentImport { .. } => unreachable!(),
|
||||
KindWithContent::DocumentDeletion { .. } => Kind::DocumentDeletion,
|
||||
KindWithContent::DocumentClear { .. } => Kind::DocumentClear,
|
||||
KindWithContent::Settings { .. } => Kind::Settings,
|
||||
@ -203,7 +205,7 @@ impl KindWithContent {
|
||||
use KindWithContent::*;
|
||||
|
||||
match self {
|
||||
DocumentAddition { .. } | DocumentUpdate { .. } => {
|
||||
DocumentImport { .. } => {
|
||||
// TODO: TAMO: persist the file
|
||||
// content_file.persist();
|
||||
Ok(())
|
||||
@ -226,7 +228,7 @@ impl KindWithContent {
|
||||
use KindWithContent::*;
|
||||
|
||||
match self {
|
||||
DocumentAddition { .. } | DocumentUpdate { .. } => {
|
||||
DocumentImport { .. } => {
|
||||
// TODO: TAMO: delete the file
|
||||
// content_file.delete();
|
||||
Ok(())
|
||||
@ -250,8 +252,7 @@ impl KindWithContent {
|
||||
|
||||
match self {
|
||||
DumpExport { .. } | Snapshot | CancelTask { .. } => None,
|
||||
DocumentAddition { index_uid, .. }
|
||||
| DocumentUpdate { index_uid, .. }
|
||||
DocumentImport { index_uid, .. }
|
||||
| DocumentDeletion { index_uid, .. }
|
||||
| DocumentClear { index_uid }
|
||||
| Settings { index_uid, .. }
|
||||
|
Loading…
x
Reference in New Issue
Block a user