mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-03 03:47:02 +02:00
3319: Transparently resize indexes on MaxDatabaseSizeReached errors r=Kerollmops a=dureuill # Pull Request ## Related issue Related to https://github.com/meilisearch/meilisearch/discussions/3280, depends on https://github.com/meilisearch/milli/pull/760 ## What does this PR do? ### User standpoint - Meilisearch no longer fails tasks that encounter the `milli::UserError(MaxDatabaseSizeReached)` error. - Instead, these tasks are retried after increasing the maximum size allocated to the index where the failure occurred. ### Implementation standpoint - Add `Batch::index_uid` to get the `index_uid` of a batch of task if there is one - `IndexMapper::create_or_open_index` now takes an additional `size` argument that allows to (re)open indexes with a size different from the base `IndexScheduler::index_size` field - `IndexScheduler::tick` now returns a `Result<TickOutcome>` instead of a `Result<usize>`. This offers more explicit control over what the behavior should be wrt the next tick. - Add `IndexStatus::BeingResized` that contains a handle that a thread can use to await for the resize operation to complete and the index to be available again. - Add `IndexMapper::resize_index` to increase the size of an index. - In `IndexScheduler::tick`, intercept task batches that failed due to `MaxDatabaseSizeReached` and resize the index that caused the error, then request a new tick that will eventually handle the still enqueued task. ## Testing the PR The following diff can be applied to this branch to make testing the PR easier: <details> ```diff diff --git a/index-scheduler/src/index_mapper.rs b/index-scheduler/src/index_mapper.rs index 553ab45a..022b2f00 100644 --- a/index-scheduler/src/index_mapper.rs +++ b/index-scheduler/src/index_mapper.rs `@@` -228,13 +228,15 `@@` impl IndexMapper { drop(lock); + std:🧵:sleep_ms(2000); + let current_size = index.map_size()?; let closing_event = index.prepare_for_closing(); - log::info!("Resizing index {} from {} to {} bytes", name, current_size, current_size * 2); + log::error!("Resizing index {} from {} to {} bytes", name, current_size, current_size * 2); closing_event.wait(); - log::info!("Resized index {} from {} to {} bytes", name, current_size, current_size * 2); + log::error!("Resized index {} from {} to {} bytes", name, current_size, current_size * 2); let index_path = self.base_path.join(uuid.to_string()); let index = self.create_or_open_index(&index_path, None, 2 * current_size)?; `@@` -268,8 +270,10 `@@` impl IndexMapper { match index { Some(Available(index)) => break index, Some(BeingResized(ref resize_operation)) => { + log::error!("waiting for resize end"); // Deadlock: no lock taken while doing this operation. resize_operation.wait(); + log::error!("trying our luck again!"); continue; } Some(BeingDeleted) => return Err(Error::IndexNotFound(name.to_string())), diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs index 11b17d05..242dc095 100644 --- a/index-scheduler/src/lib.rs +++ b/index-scheduler/src/lib.rs `@@` -908,6 +908,7 `@@` impl IndexScheduler { /// /// Returns the number of processed tasks. fn tick(&self) -> Result<TickOutcome> { + log::error!("ticking!"); #[cfg(test)] { *self.run_loop_iteration.write().unwrap() += 1; diff --git a/meilisearch/src/main.rs b/meilisearch/src/main.rs index 050c825a..63f312f6 100644 --- a/meilisearch/src/main.rs +++ b/meilisearch/src/main.rs `@@` -25,7 +25,7 `@@` fn setup(opt: &Opt) -> anyhow::Result<()> { #[actix_web::main] async fn main() -> anyhow::Result<()> { - let (opt, config_read_from) = Opt::try_build()?; + let (mut opt, config_read_from) = Opt::try_build()?; setup(&opt)?; `@@` -56,6 +56,8 `@@` We generated a secure master key for you (you can safely copy this token): _ => (), } + opt.max_index_size = byte_unit::Byte::from_str("1MB").unwrap(); + let (index_scheduler, auth_controller) = setup_meilisearch(&opt)?; #[cfg(all(not(debug_assertions), feature = "analytics"))] ``` </details> Mainly, these debug changes do the following: - Set the default index size to 1MiB so that index resizes are initially frequent - Turn some logs from info to error so that they can be displayed with `--log-level ERROR` (hiding the other infos) - Add a long sleep between the beginning and the end of the resize so that we can observe the `BeingResized` index status (otherwise it would never come up in my tests) ## Open questions - Is the growth factor of x2 the correct solution? For a `Vec` in memory it makes sense, but here we're manipulating quantities that are potentially in the order of 500GiBs. For bigger indexes it may make more sense to add at most e.g. 100GiB on each resize operation, avoiding big steps like 500GiB -> 1TiB. ## PR checklist Please check if your PR fulfills the following requirements: - [ ] Does this PR fix an existing issue, or have you listed the changes applied in the PR description (and why they are needed)? - [ ] Have you read the contributing guidelines? - [ ] Have you made sure that the title is accurate and descriptive of the changes? Thank you so much for contributing to Meilisearch! 3470: Autobatch addition and deletion r=irevoire a=irevoire This PR adds the capability to meilisearch to batch document addition and deletion together. Fix https://github.com/meilisearch/meilisearch/issues/3440 -------------- Things to check before merging; - [x] What happens if we delete multiple time the same documents -> add a test - [x] If a documentDeletion gets batched with a documentAddition but the index doesn't exist yet? It should not work Co-authored-by: Louis Dureuil <louis@meilisearch.com> Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
commit
b08a49a16e
19 changed files with 1547 additions and 303 deletions
|
@ -28,8 +28,7 @@ use meilisearch_types::heed::{RoTxn, RwTxn};
|
|||
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
|
||||
use meilisearch_types::milli::heed::CompactionOption;
|
||||
use meilisearch_types::milli::update::{
|
||||
DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod,
|
||||
Settings as MilliSettings,
|
||||
DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod, Settings as MilliSettings,
|
||||
};
|
||||
use meilisearch_types::milli::{self, BEU32};
|
||||
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
|
||||
|
@ -86,15 +85,21 @@ pub(crate) enum Batch {
|
|||
},
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum DocumentOperation {
|
||||
Add(Uuid),
|
||||
Delete(Vec<String>),
|
||||
}
|
||||
|
||||
/// A [batch](Batch) that combines multiple tasks operating on an index.
|
||||
#[derive(Debug)]
|
||||
pub(crate) enum IndexOperation {
|
||||
DocumentImport {
|
||||
DocumentOperation {
|
||||
index_uid: String,
|
||||
primary_key: Option<String>,
|
||||
method: IndexDocumentsMethod,
|
||||
documents_counts: Vec<u64>,
|
||||
content_files: Vec<Uuid>,
|
||||
operations: Vec<DocumentOperation>,
|
||||
tasks: Vec<Task>,
|
||||
},
|
||||
DocumentDeletion {
|
||||
|
@ -121,13 +126,13 @@ pub(crate) enum IndexOperation {
|
|||
settings: Vec<(bool, Settings<Unchecked>)>,
|
||||
settings_tasks: Vec<Task>,
|
||||
},
|
||||
SettingsAndDocumentImport {
|
||||
SettingsAndDocumentOperation {
|
||||
index_uid: String,
|
||||
|
||||
primary_key: Option<String>,
|
||||
method: IndexDocumentsMethod,
|
||||
documents_counts: Vec<u64>,
|
||||
content_files: Vec<Uuid>,
|
||||
operations: Vec<DocumentOperation>,
|
||||
document_import_tasks: Vec<Task>,
|
||||
|
||||
// The boolean indicates if it's a settings deletion or creation.
|
||||
|
@ -149,13 +154,13 @@ impl Batch {
|
|||
tasks.iter().map(|task| task.uid).collect()
|
||||
}
|
||||
Batch::IndexOperation { op, .. } => match op {
|
||||
IndexOperation::DocumentImport { tasks, .. }
|
||||
IndexOperation::DocumentOperation { tasks, .. }
|
||||
| IndexOperation::DocumentDeletion { tasks, .. }
|
||||
| IndexOperation::Settings { tasks, .. }
|
||||
| IndexOperation::DocumentClear { tasks, .. } => {
|
||||
tasks.iter().map(|task| task.uid).collect()
|
||||
}
|
||||
IndexOperation::SettingsAndDocumentImport {
|
||||
IndexOperation::SettingsAndDocumentOperation {
|
||||
document_import_tasks: tasks,
|
||||
settings_tasks: other,
|
||||
..
|
||||
|
@ -169,17 +174,33 @@ impl Batch {
|
|||
Batch::IndexSwap { task } => vec![task.uid],
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the index UID associated with this batch
|
||||
pub fn index_uid(&self) -> Option<&str> {
|
||||
use Batch::*;
|
||||
match self {
|
||||
TaskCancelation { .. }
|
||||
| TaskDeletion(_)
|
||||
| SnapshotCreation(_)
|
||||
| Dump(_)
|
||||
| IndexSwap { .. } => None,
|
||||
IndexOperation { op, .. } => Some(op.index_uid()),
|
||||
IndexCreation { index_uid, .. }
|
||||
| IndexUpdate { index_uid, .. }
|
||||
| IndexDeletion { index_uid, .. } => Some(index_uid),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl IndexOperation {
|
||||
pub fn index_uid(&self) -> &str {
|
||||
match self {
|
||||
IndexOperation::DocumentImport { index_uid, .. }
|
||||
IndexOperation::DocumentOperation { index_uid, .. }
|
||||
| IndexOperation::DocumentDeletion { index_uid, .. }
|
||||
| IndexOperation::DocumentClear { index_uid, .. }
|
||||
| IndexOperation::Settings { index_uid, .. }
|
||||
| IndexOperation::DocumentClearAndSetting { index_uid, .. }
|
||||
| IndexOperation::SettingsAndDocumentImport { index_uid, .. } => index_uid,
|
||||
| IndexOperation::SettingsAndDocumentOperation { index_uid, .. } => index_uid,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -206,17 +227,22 @@ impl IndexScheduler {
|
|||
},
|
||||
must_create_index,
|
||||
})),
|
||||
BatchKind::DocumentImport { method, import_ids, .. } => {
|
||||
let tasks = self.get_existing_tasks(rtxn, import_ids)?;
|
||||
let primary_key = match &tasks[0].kind {
|
||||
KindWithContent::DocumentAdditionOrUpdate { primary_key, .. } => {
|
||||
primary_key.clone()
|
||||
}
|
||||
_ => unreachable!(),
|
||||
};
|
||||
BatchKind::DocumentOperation { method, operation_ids, .. } => {
|
||||
let tasks = self.get_existing_tasks(rtxn, operation_ids)?;
|
||||
let primary_key = tasks
|
||||
.iter()
|
||||
.find_map(|task| match task.kind {
|
||||
KindWithContent::DocumentAdditionOrUpdate { ref primary_key, .. } => {
|
||||
// we want to stop on the first document addition
|
||||
Some(primary_key.clone())
|
||||
}
|
||||
KindWithContent::DocumentDeletion { .. } => None,
|
||||
_ => unreachable!(),
|
||||
})
|
||||
.flatten();
|
||||
|
||||
let mut documents_counts = Vec::new();
|
||||
let mut content_files = Vec::new();
|
||||
let mut operations = Vec::new();
|
||||
|
||||
for task in tasks.iter() {
|
||||
match task.kind {
|
||||
|
@ -226,19 +252,23 @@ impl IndexScheduler {
|
|||
..
|
||||
} => {
|
||||
documents_counts.push(documents_count);
|
||||
content_files.push(content_file);
|
||||
operations.push(DocumentOperation::Add(content_file));
|
||||
}
|
||||
KindWithContent::DocumentDeletion { ref documents_ids, .. } => {
|
||||
documents_counts.push(documents_ids.len() as u64);
|
||||
operations.push(DocumentOperation::Delete(documents_ids.clone()));
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Some(Batch::IndexOperation {
|
||||
op: IndexOperation::DocumentImport {
|
||||
op: IndexOperation::DocumentOperation {
|
||||
index_uid,
|
||||
primary_key,
|
||||
method,
|
||||
documents_counts,
|
||||
content_files,
|
||||
operations,
|
||||
tasks,
|
||||
},
|
||||
must_create_index,
|
||||
|
@ -322,12 +352,12 @@ impl IndexScheduler {
|
|||
must_create_index,
|
||||
}))
|
||||
}
|
||||
BatchKind::SettingsAndDocumentImport {
|
||||
BatchKind::SettingsAndDocumentOperation {
|
||||
settings_ids,
|
||||
method,
|
||||
allow_index_creation,
|
||||
primary_key,
|
||||
import_ids,
|
||||
operation_ids,
|
||||
} => {
|
||||
let settings = self.create_next_batch_index(
|
||||
rtxn,
|
||||
|
@ -339,11 +369,11 @@ impl IndexScheduler {
|
|||
let document_import = self.create_next_batch_index(
|
||||
rtxn,
|
||||
index_uid.clone(),
|
||||
BatchKind::DocumentImport {
|
||||
BatchKind::DocumentOperation {
|
||||
method,
|
||||
allow_index_creation,
|
||||
primary_key,
|
||||
import_ids,
|
||||
operation_ids,
|
||||
},
|
||||
must_create_index,
|
||||
)?;
|
||||
|
@ -352,10 +382,10 @@ impl IndexScheduler {
|
|||
(
|
||||
Some(Batch::IndexOperation {
|
||||
op:
|
||||
IndexOperation::DocumentImport {
|
||||
IndexOperation::DocumentOperation {
|
||||
primary_key,
|
||||
documents_counts,
|
||||
content_files,
|
||||
operations,
|
||||
tasks: document_import_tasks,
|
||||
..
|
||||
},
|
||||
|
@ -366,12 +396,12 @@ impl IndexScheduler {
|
|||
..
|
||||
}),
|
||||
) => Ok(Some(Batch::IndexOperation {
|
||||
op: IndexOperation::SettingsAndDocumentImport {
|
||||
op: IndexOperation::SettingsAndDocumentOperation {
|
||||
index_uid,
|
||||
primary_key,
|
||||
method,
|
||||
documents_counts,
|
||||
content_files,
|
||||
operations,
|
||||
document_import_tasks,
|
||||
settings,
|
||||
settings_tasks,
|
||||
|
@ -987,12 +1017,12 @@ impl IndexScheduler {
|
|||
|
||||
Ok(tasks)
|
||||
}
|
||||
IndexOperation::DocumentImport {
|
||||
IndexOperation::DocumentOperation {
|
||||
index_uid: _,
|
||||
primary_key,
|
||||
method,
|
||||
documents_counts,
|
||||
content_files,
|
||||
documents_counts: _,
|
||||
operations,
|
||||
mut tasks,
|
||||
} => {
|
||||
let mut primary_key_has_been_set = false;
|
||||
|
@ -1037,26 +1067,82 @@ impl IndexScheduler {
|
|||
|| must_stop_processing.get(),
|
||||
)?;
|
||||
|
||||
let mut results = Vec::new();
|
||||
for content_uuid in content_files.into_iter() {
|
||||
let content_file = self.file_store.get_update(content_uuid)?;
|
||||
let reader = DocumentsBatchReader::from_reader(content_file)
|
||||
.map_err(milli::Error::from)?;
|
||||
let (new_builder, user_result) = builder.add_documents(reader)?;
|
||||
builder = new_builder;
|
||||
for (operation, task) in operations.into_iter().zip(tasks.iter_mut()) {
|
||||
match operation {
|
||||
DocumentOperation::Add(content_uuid) => {
|
||||
let content_file = self.file_store.get_update(content_uuid)?;
|
||||
let reader = DocumentsBatchReader::from_reader(content_file)
|
||||
.map_err(milli::Error::from)?;
|
||||
let (new_builder, user_result) = builder.add_documents(reader)?;
|
||||
builder = new_builder;
|
||||
|
||||
let user_result = match user_result {
|
||||
Ok(count) => Ok(DocumentAdditionResult {
|
||||
indexed_documents: count,
|
||||
number_of_documents: count, // TODO: this is wrong, we should use the value stored in the Details.
|
||||
}),
|
||||
Err(e) => Err(milli::Error::from(e)),
|
||||
};
|
||||
let received_documents =
|
||||
if let Some(Details::DocumentAdditionOrUpdate {
|
||||
received_documents,
|
||||
..
|
||||
}) = task.details
|
||||
{
|
||||
received_documents
|
||||
} else {
|
||||
// In the case of a `documentAdditionOrUpdate` the details MUST be set
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
results.push(user_result);
|
||||
match user_result {
|
||||
Ok(count) => {
|
||||
task.status = Status::Succeeded;
|
||||
task.details = Some(Details::DocumentAdditionOrUpdate {
|
||||
received_documents,
|
||||
indexed_documents: Some(count),
|
||||
})
|
||||
}
|
||||
Err(e) => {
|
||||
task.status = Status::Failed;
|
||||
task.details = Some(Details::DocumentAdditionOrUpdate {
|
||||
received_documents,
|
||||
indexed_documents: Some(0),
|
||||
});
|
||||
task.error = Some(milli::Error::from(e).into());
|
||||
}
|
||||
}
|
||||
}
|
||||
DocumentOperation::Delete(document_ids) => {
|
||||
let (new_builder, user_result) =
|
||||
builder.remove_documents(document_ids)?;
|
||||
builder = new_builder;
|
||||
|
||||
let provided_ids =
|
||||
if let Some(Details::DocumentDeletion { provided_ids, .. }) =
|
||||
task.details
|
||||
{
|
||||
provided_ids
|
||||
} else {
|
||||
// In the case of a `documentAdditionOrUpdate` the details MUST be set
|
||||
unreachable!();
|
||||
};
|
||||
|
||||
match user_result {
|
||||
Ok(count) => {
|
||||
task.status = Status::Succeeded;
|
||||
task.details = Some(Details::DocumentDeletion {
|
||||
provided_ids,
|
||||
deleted_documents: Some(count),
|
||||
});
|
||||
}
|
||||
Err(e) => {
|
||||
task.status = Status::Failed;
|
||||
task.details = Some(Details::DocumentDeletion {
|
||||
provided_ids,
|
||||
deleted_documents: Some(0),
|
||||
});
|
||||
task.error = Some(milli::Error::from(e).into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if results.iter().any(|res| res.is_ok()) {
|
||||
if !tasks.iter().all(|res| res.error.is_some()) {
|
||||
let addition = builder.execute()?;
|
||||
info!("document addition done: {:?}", addition);
|
||||
} else if primary_key_has_been_set {
|
||||
|
@ -1071,29 +1157,6 @@ impl IndexScheduler {
|
|||
)?;
|
||||
}
|
||||
|
||||
for (task, (ret, count)) in
|
||||
tasks.iter_mut().zip(results.into_iter().zip(documents_counts))
|
||||
{
|
||||
match ret {
|
||||
Ok(DocumentAdditionResult { indexed_documents, number_of_documents }) => {
|
||||
task.status = Status::Succeeded;
|
||||
task.details = Some(Details::DocumentAdditionOrUpdate {
|
||||
received_documents: number_of_documents,
|
||||
indexed_documents: Some(indexed_documents),
|
||||
});
|
||||
}
|
||||
Err(error) => {
|
||||
task.status = Status::Failed;
|
||||
task.details = Some(Details::DocumentAdditionOrUpdate {
|
||||
received_documents: count,
|
||||
// if there was an error we indexed 0 documents.
|
||||
indexed_documents: Some(0),
|
||||
});
|
||||
task.error = Some(error.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(tasks)
|
||||
}
|
||||
IndexOperation::DocumentDeletion { index_uid: _, documents, mut tasks } => {
|
||||
|
@ -1136,12 +1199,12 @@ impl IndexScheduler {
|
|||
|
||||
Ok(tasks)
|
||||
}
|
||||
IndexOperation::SettingsAndDocumentImport {
|
||||
IndexOperation::SettingsAndDocumentOperation {
|
||||
index_uid,
|
||||
primary_key,
|
||||
method,
|
||||
documents_counts,
|
||||
content_files,
|
||||
operations,
|
||||
document_import_tasks,
|
||||
settings,
|
||||
settings_tasks,
|
||||
|
@ -1159,12 +1222,12 @@ impl IndexScheduler {
|
|||
let mut import_tasks = self.apply_index_operation(
|
||||
index_wtxn,
|
||||
index,
|
||||
IndexOperation::DocumentImport {
|
||||
IndexOperation::DocumentOperation {
|
||||
index_uid,
|
||||
primary_key,
|
||||
method,
|
||||
documents_counts,
|
||||
content_files,
|
||||
operations,
|
||||
tasks: document_import_tasks,
|
||||
},
|
||||
)?;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue