mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-04 20:37:15 +02:00
it probably works but it's also horrendous
This commit is contained in:
parent
22d514645e
commit
3357c439e3
3 changed files with 535 additions and 232 deletions
|
@ -57,7 +57,10 @@ pub(crate) enum Batch {
|
|||
TaskDeletion(Task),
|
||||
Snapshot(Vec<Task>),
|
||||
Dump(Task),
|
||||
IndexOperation(IndexOperation),
|
||||
IndexOperation {
|
||||
op: IndexOperation,
|
||||
must_create_index: bool,
|
||||
},
|
||||
IndexCreation {
|
||||
index_uid: String,
|
||||
primary_key: Option<String>,
|
||||
|
@ -142,7 +145,7 @@ impl Batch {
|
|||
Batch::Snapshot(tasks) | Batch::IndexDeletion { tasks, .. } => {
|
||||
tasks.iter().map(|task| task.uid).collect()
|
||||
}
|
||||
Batch::IndexOperation(operation) => match operation {
|
||||
Batch::IndexOperation { op, .. } => match op {
|
||||
IndexOperation::DocumentImport { tasks, .. }
|
||||
| IndexOperation::DocumentDeletion { tasks, .. }
|
||||
| IndexOperation::Settings { tasks, .. }
|
||||
|
@ -165,6 +168,19 @@ impl Batch {
|
|||
}
|
||||
}
|
||||
|
||||
impl IndexOperation {
|
||||
pub fn index_uid(&self) -> &str {
|
||||
match self {
|
||||
IndexOperation::DocumentImport { index_uid, .. }
|
||||
| IndexOperation::DocumentDeletion { index_uid, .. }
|
||||
| IndexOperation::DocumentClear { index_uid, .. }
|
||||
| IndexOperation::Settings { index_uid, .. }
|
||||
| IndexOperation::DocumentClearAndSetting { index_uid, .. }
|
||||
| IndexOperation::SettingsAndDocumentImport { index_uid, .. } => index_uid,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IndexScheduler {
|
||||
/// Convert an [`BatchKind`](crate::autobatcher::BatchKind) into a [`Batch`].
|
||||
///
|
||||
|
@ -177,14 +193,16 @@ impl IndexScheduler {
|
|||
rtxn: &RoTxn,
|
||||
index_uid: String,
|
||||
batch: BatchKind,
|
||||
must_create_index: bool,
|
||||
) -> Result<Option<Batch>> {
|
||||
match batch {
|
||||
BatchKind::DocumentClear { ids } => {
|
||||
Ok(Some(Batch::IndexOperation(IndexOperation::DocumentClear {
|
||||
BatchKind::DocumentClear { ids } => Ok(Some(Batch::IndexOperation {
|
||||
op: IndexOperation::DocumentClear {
|
||||
tasks: self.get_existing_tasks(rtxn, ids)?,
|
||||
index_uid,
|
||||
})))
|
||||
}
|
||||
},
|
||||
must_create_index,
|
||||
})),
|
||||
BatchKind::DocumentImport {
|
||||
method,
|
||||
import_ids,
|
||||
|
@ -212,8 +230,8 @@ impl IndexScheduler {
|
|||
}
|
||||
}
|
||||
|
||||
Ok(Some(Batch::IndexOperation(
|
||||
IndexOperation::DocumentImport {
|
||||
Ok(Some(Batch::IndexOperation {
|
||||
op: IndexOperation::DocumentImport {
|
||||
index_uid,
|
||||
primary_key,
|
||||
method,
|
||||
|
@ -222,7 +240,8 @@ impl IndexScheduler {
|
|||
content_files,
|
||||
tasks,
|
||||
},
|
||||
)))
|
||||
must_create_index,
|
||||
}))
|
||||
}
|
||||
BatchKind::DocumentDeletion { deletion_ids } => {
|
||||
let tasks = self.get_existing_tasks(rtxn, deletion_ids)?;
|
||||
|
@ -237,13 +256,14 @@ impl IndexScheduler {
|
|||
}
|
||||
}
|
||||
|
||||
Ok(Some(Batch::IndexOperation(
|
||||
IndexOperation::DocumentDeletion {
|
||||
Ok(Some(Batch::IndexOperation {
|
||||
op: IndexOperation::DocumentDeletion {
|
||||
index_uid,
|
||||
documents,
|
||||
tasks,
|
||||
},
|
||||
)))
|
||||
must_create_index,
|
||||
}))
|
||||
}
|
||||
BatchKind::Settings {
|
||||
settings_ids,
|
||||
|
@ -263,12 +283,15 @@ impl IndexScheduler {
|
|||
}
|
||||
}
|
||||
|
||||
Ok(Some(Batch::IndexOperation(IndexOperation::Settings {
|
||||
index_uid,
|
||||
settings,
|
||||
allow_index_creation,
|
||||
tasks,
|
||||
})))
|
||||
Ok(Some(Batch::IndexOperation {
|
||||
op: IndexOperation::Settings {
|
||||
index_uid,
|
||||
settings,
|
||||
allow_index_creation,
|
||||
tasks,
|
||||
},
|
||||
must_create_index,
|
||||
}))
|
||||
}
|
||||
BatchKind::ClearAndSettings {
|
||||
other,
|
||||
|
@ -283,15 +306,20 @@ impl IndexScheduler {
|
|||
settings_ids,
|
||||
allow_index_creation,
|
||||
},
|
||||
must_create_index,
|
||||
)?
|
||||
.unwrap()
|
||||
{
|
||||
Batch::IndexOperation(IndexOperation::Settings {
|
||||
index_uid,
|
||||
settings,
|
||||
tasks,
|
||||
Batch::IndexOperation {
|
||||
op:
|
||||
IndexOperation::Settings {
|
||||
index_uid,
|
||||
settings,
|
||||
tasks,
|
||||
..
|
||||
},
|
||||
..
|
||||
}) => (index_uid, settings, tasks),
|
||||
} => (index_uid, settings, tasks),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let (index_uid, cleared_tasks) = match self
|
||||
|
@ -299,24 +327,27 @@ impl IndexScheduler {
|
|||
rtxn,
|
||||
index_uid,
|
||||
BatchKind::DocumentClear { ids: other },
|
||||
must_create_index,
|
||||
)?
|
||||
.unwrap()
|
||||
{
|
||||
Batch::IndexOperation(IndexOperation::DocumentClear { index_uid, tasks }) => {
|
||||
(index_uid, tasks)
|
||||
}
|
||||
Batch::IndexOperation {
|
||||
op: IndexOperation::DocumentClear { index_uid, tasks },
|
||||
..
|
||||
} => (index_uid, tasks),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
Ok(Some(Batch::IndexOperation(
|
||||
IndexOperation::DocumentClearAndSetting {
|
||||
Ok(Some(Batch::IndexOperation {
|
||||
op: IndexOperation::DocumentClearAndSetting {
|
||||
index_uid,
|
||||
cleared_tasks,
|
||||
allow_index_creation,
|
||||
settings,
|
||||
settings_tasks,
|
||||
},
|
||||
)))
|
||||
must_create_index,
|
||||
}))
|
||||
}
|
||||
BatchKind::SettingsAndDocumentImport {
|
||||
settings_ids,
|
||||
|
@ -331,6 +362,7 @@ impl IndexScheduler {
|
|||
settings_ids,
|
||||
allow_index_creation,
|
||||
},
|
||||
must_create_index,
|
||||
)?;
|
||||
|
||||
let document_import = self.create_next_batch_index(
|
||||
|
@ -341,24 +373,33 @@ impl IndexScheduler {
|
|||
allow_index_creation,
|
||||
import_ids,
|
||||
},
|
||||
must_create_index,
|
||||
)?;
|
||||
|
||||
match (document_import, settings) {
|
||||
(
|
||||
Some(Batch::IndexOperation(IndexOperation::DocumentImport {
|
||||
primary_key,
|
||||
documents_counts,
|
||||
content_files,
|
||||
tasks: document_import_tasks,
|
||||
Some(Batch::IndexOperation {
|
||||
op:
|
||||
IndexOperation::DocumentImport {
|
||||
primary_key,
|
||||
documents_counts,
|
||||
content_files,
|
||||
tasks: document_import_tasks,
|
||||
..
|
||||
},
|
||||
..
|
||||
})),
|
||||
Some(Batch::IndexOperation(IndexOperation::Settings {
|
||||
settings,
|
||||
tasks: settings_tasks,
|
||||
}),
|
||||
Some(Batch::IndexOperation {
|
||||
op:
|
||||
IndexOperation::Settings {
|
||||
settings,
|
||||
tasks: settings_tasks,
|
||||
..
|
||||
},
|
||||
..
|
||||
})),
|
||||
) => Ok(Some(Batch::IndexOperation(
|
||||
IndexOperation::SettingsAndDocumentImport {
|
||||
}),
|
||||
) => Ok(Some(Batch::IndexOperation {
|
||||
op: IndexOperation::SettingsAndDocumentImport {
|
||||
index_uid,
|
||||
primary_key,
|
||||
method,
|
||||
|
@ -369,7 +410,8 @@ impl IndexScheduler {
|
|||
settings,
|
||||
settings_tasks,
|
||||
},
|
||||
))),
|
||||
must_create_index,
|
||||
})),
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
@ -466,6 +508,7 @@ impl IndexScheduler {
|
|||
// AT LEAST one index. We can use the right or left one it doesn't
|
||||
// matter.
|
||||
let index_name = task.indexes().unwrap()[0];
|
||||
let index_already_exists = self.index_mapper.exists(rtxn, &index_name)?;
|
||||
|
||||
let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued;
|
||||
|
||||
|
@ -486,8 +529,15 @@ impl IndexScheduler {
|
|||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
if let Some(batchkind) = crate::autobatcher::autobatch(enqueued) {
|
||||
return self.create_next_batch_index(rtxn, index_name.to_string(), batchkind);
|
||||
if let Some((batchkind, create_index)) =
|
||||
crate::autobatcher::autobatch(enqueued, index_already_exists)
|
||||
{
|
||||
return self.create_next_batch_index(
|
||||
rtxn,
|
||||
index_name.to_string(),
|
||||
batchkind,
|
||||
create_index,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -679,34 +729,24 @@ impl IndexScheduler {
|
|||
task.status = Status::Succeeded;
|
||||
Ok(vec![task])
|
||||
}
|
||||
Batch::IndexOperation(operation) => {
|
||||
#[rustfmt::skip]
|
||||
let index = match operation {
|
||||
IndexOperation::DocumentDeletion { ref index_uid, .. }
|
||||
| IndexOperation::DocumentClear { ref index_uid, .. } => {
|
||||
// only get the index, don't create it
|
||||
let rtxn = self.env.read_txn()?;
|
||||
self.index_mapper.index(&rtxn, index_uid)?
|
||||
}
|
||||
IndexOperation::DocumentImport { ref index_uid, allow_index_creation, .. }
|
||||
| IndexOperation::Settings { ref index_uid, allow_index_creation, .. }
|
||||
| IndexOperation::DocumentClearAndSetting { ref index_uid, allow_index_creation, .. }
|
||||
| IndexOperation::SettingsAndDocumentImport {ref index_uid, allow_index_creation, .. } => {
|
||||
if allow_index_creation {
|
||||
// create the index if it doesn't already exist
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
let index = self.index_mapper.create_index(&mut wtxn, index_uid)?;
|
||||
wtxn.commit()?;
|
||||
index
|
||||
} else {
|
||||
let rtxn = self.env.read_txn()?;
|
||||
self.index_mapper.index(&rtxn, index_uid)?
|
||||
}
|
||||
}
|
||||
Batch::IndexOperation {
|
||||
op,
|
||||
must_create_index,
|
||||
} => {
|
||||
let index_uid = op.index_uid();
|
||||
let index = if must_create_index {
|
||||
// create the index if it doesn't already exist
|
||||
let mut wtxn = self.env.write_txn()?;
|
||||
let index = self.index_mapper.create_index(&mut wtxn, index_uid)?;
|
||||
wtxn.commit()?;
|
||||
index
|
||||
} else {
|
||||
let rtxn = self.env.read_txn()?;
|
||||
self.index_mapper.index(&rtxn, index_uid)?
|
||||
};
|
||||
|
||||
let mut index_wtxn = index.write_txn()?;
|
||||
let tasks = self.apply_index_operation(&mut index_wtxn, &index, operation)?;
|
||||
let tasks = self.apply_index_operation(&mut index_wtxn, &index, op)?;
|
||||
index_wtxn.commit()?;
|
||||
|
||||
Ok(tasks)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue