3688: Following release v1.1.1: bring back changes into `main` r=curquiza a=curquiza

`@meilisearch/engine-team` ensure the changes we bring to `main` are the ones you want

Co-authored-by: Louis Dureuil <louis@meilisearch.com>
Co-authored-by: bors[bot] <26634292+bors[bot]@users.noreply.github.com>
Co-authored-by: Tamo <tamo@meilisearch.com>
Co-authored-by: dureuill <dureuill@users.noreply.github.com>
This commit is contained in:
bors[bot] 2023-04-24 11:38:23 +00:00 committed by GitHub
commit 654a3a9e19
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 177 additions and 185 deletions

26
Cargo.lock generated
View File

@ -410,7 +410,7 @@ checksum = "b645a089122eccb6111b4f81cbc1a49f5900ac4666bb93ac027feaecf15607bf"
[[package]] [[package]]
name = "benchmarks" name = "benchmarks"
version = "1.1.0" version = "1.1.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bytes", "bytes",
@ -1150,7 +1150,7 @@ dependencies = [
[[package]] [[package]]
name = "dump" name = "dump"
version = "1.1.0" version = "1.1.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"big_s", "big_s",
@ -1371,7 +1371,7 @@ dependencies = [
[[package]] [[package]]
name = "file-store" name = "file-store"
version = "1.1.0" version = "1.1.1"
dependencies = [ dependencies = [
"faux", "faux",
"tempfile", "tempfile",
@ -1393,7 +1393,7 @@ dependencies = [
[[package]] [[package]]
name = "filter-parser" name = "filter-parser"
version = "1.1.0" version = "1.1.1"
dependencies = [ dependencies = [
"insta", "insta",
"nom", "nom",
@ -1413,7 +1413,7 @@ dependencies = [
[[package]] [[package]]
name = "flatten-serde-json" name = "flatten-serde-json"
version = "1.1.0" version = "1.1.1"
dependencies = [ dependencies = [
"criterion", "criterion",
"serde_json", "serde_json",
@ -1890,7 +1890,7 @@ dependencies = [
[[package]] [[package]]
name = "index-scheduler" name = "index-scheduler"
version = "1.1.0" version = "1.1.1"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"big_s", "big_s",
@ -2049,7 +2049,7 @@ dependencies = [
[[package]] [[package]]
name = "json-depth-checker" name = "json-depth-checker"
version = "1.1.0" version = "1.1.1"
dependencies = [ dependencies = [
"criterion", "criterion",
"serde_json", "serde_json",
@ -2445,7 +2445,7 @@ checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]] [[package]]
name = "meili-snap" name = "meili-snap"
version = "1.1.0" version = "1.1.1"
dependencies = [ dependencies = [
"insta", "insta",
"md5", "md5",
@ -2454,7 +2454,7 @@ dependencies = [
[[package]] [[package]]
name = "meilisearch" name = "meilisearch"
version = "1.1.0" version = "1.1.1"
dependencies = [ dependencies = [
"actix-cors", "actix-cors",
"actix-http", "actix-http",
@ -2542,7 +2542,7 @@ dependencies = [
[[package]] [[package]]
name = "meilisearch-auth" name = "meilisearch-auth"
version = "1.1.0" version = "1.1.1"
dependencies = [ dependencies = [
"base64 0.13.1", "base64 0.13.1",
"enum-iterator", "enum-iterator",
@ -2561,7 +2561,7 @@ dependencies = [
[[package]] [[package]]
name = "meilisearch-types" name = "meilisearch-types"
version = "1.1.0" version = "1.1.1"
dependencies = [ dependencies = [
"actix-web", "actix-web",
"anyhow", "anyhow",
@ -2615,7 +2615,7 @@ dependencies = [
[[package]] [[package]]
name = "milli" name = "milli"
version = "1.1.0" version = "1.1.1"
dependencies = [ dependencies = [
"big_s", "big_s",
"bimap", "bimap",
@ -2969,7 +2969,7 @@ checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
[[package]] [[package]]
name = "permissive-json-pointer" name = "permissive-json-pointer"
version = "1.1.0" version = "1.1.1"
dependencies = [ dependencies = [
"big_s", "big_s",
"serde_json", "serde_json",

View File

@ -17,7 +17,7 @@ members = [
] ]
[workspace.package] [workspace.package]
version = "1.1.0" version = "1.1.1"
authors = ["Quentin de Quelen <quentin@dequelen.me>", "Clément Renault <clement@meilisearch.com>"] authors = ["Quentin de Quelen <quentin@dequelen.me>", "Clément Renault <clement@meilisearch.com>"]
description = "Meilisearch HTTP server" description = "Meilisearch HTTP server"
homepage = "https://meilisearch.com" homepage = "https://meilisearch.com"

View File

@ -311,18 +311,9 @@ impl BatchKind {
}) })
} }
( (
BatchKind::DocumentOperation { method, allow_index_creation, primary_key, mut operation_ids }, this @ BatchKind::DocumentOperation { .. },
K::DocumentDeletion, K::DocumentDeletion,
) => { ) => Break(this),
operation_ids.push(id);
Continue(BatchKind::DocumentOperation {
method,
allow_index_creation,
primary_key,
operation_ids,
})
}
// but we can't autobatch documents if it's not the same kind // but we can't autobatch documents if it's not the same kind
// this match branch MUST be AFTER the previous one // this match branch MUST be AFTER the previous one
( (
@ -345,35 +336,7 @@ impl BatchKind {
deletion_ids.push(id); deletion_ids.push(id);
Continue(BatchKind::DocumentClear { ids: deletion_ids }) Continue(BatchKind::DocumentClear { ids: deletion_ids })
} }
// we can autobatch the deletion and import if the index already exists // we can't autobatch a deletion and an import
(
BatchKind::DocumentDeletion { mut deletion_ids },
K::DocumentImport { method, allow_index_creation, primary_key }
) if index_already_exists => {
deletion_ids.push(id);
Continue(BatchKind::DocumentOperation {
method,
allow_index_creation,
primary_key,
operation_ids: deletion_ids,
})
}
// we can autobatch the deletion and import if both can't create an index
(
BatchKind::DocumentDeletion { mut deletion_ids },
K::DocumentImport { method, allow_index_creation, primary_key }
) if !allow_index_creation => {
deletion_ids.push(id);
Continue(BatchKind::DocumentOperation {
method,
allow_index_creation,
primary_key,
operation_ids: deletion_ids,
})
}
// we can't autobatch a deletion and an import if the index does not exists but would be created by an addition
( (
this @ BatchKind::DocumentDeletion { .. }, this @ BatchKind::DocumentDeletion { .. },
K::DocumentImport { .. } K::DocumentImport { .. }
@ -674,36 +637,36 @@ mod tests {
debug_snapshot!(autobatch_from(false,None, [settings(false)]), @"Some((Settings { allow_index_creation: false, settings_ids: [0] }, false))"); debug_snapshot!(autobatch_from(false,None, [settings(false)]), @"Some((Settings { allow_index_creation: false, settings_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false,None, [settings(false), settings(false), settings(false)]), @"Some((Settings { allow_index_creation: false, settings_ids: [0, 1, 2] }, false))"); debug_snapshot!(autobatch_from(false,None, [settings(false), settings(false), settings(false)]), @"Some((Settings { allow_index_creation: false, settings_ids: [0, 1, 2] }, false))");
// We can autobatch document addition with document deletion // We can't autobatch document addition with document deletion
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, true))"###); debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###);
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, true))"###); debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###);
debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); debug_snapshot!(autobatch_from(true, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###);
debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); debug_snapshot!(autobatch_from(true, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###);
debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, true))"); debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0] }, true))");
debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, None), doc_del()]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, true))"###); debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###);
debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, true))"###); debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, true, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0] }, true))"###);
debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); debug_snapshot!(autobatch_from(false, None, [doc_imp(ReplaceDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###);
debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); debug_snapshot!(autobatch_from(false, None, [doc_imp(UpdateDocuments, false, Some("catto")), doc_del()]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0] }, false))"###);
// And the other way around // we also can't do the only way around
debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: None, operation_ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, true, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, true, Some("catto"))]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: true, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, true, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); debug_snapshot!(autobatch_from(true, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: None, operation_ids: [0, 1] }, false))"); debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, None)]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: ReplaceDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(ReplaceDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @r###"Some((DocumentOperation { method: UpdateDocuments, allow_index_creation: false, primary_key: Some("catto"), operation_ids: [0, 1] }, false))"###); debug_snapshot!(autobatch_from(false, None, [doc_del(), doc_imp(UpdateDocuments, false, Some("catto"))]), @"Some((DocumentDeletion { deletion_ids: [0] }, false))");
} }
#[test] #[test]

View File

@ -61,6 +61,8 @@ pub enum Error {
SwapDuplicateIndexesFound(Vec<String>), SwapDuplicateIndexesFound(Vec<String>),
#[error("Index `{0}` not found.")] #[error("Index `{0}` not found.")]
SwapIndexNotFound(String), SwapIndexNotFound(String),
#[error("Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.")]
NoSpaceLeftInTaskQueue,
#[error( #[error(
"Indexes {} not found.", "Indexes {} not found.",
.0.iter().map(|s| format!("`{}`", s)).collect::<Vec<_>>().join(", ") .0.iter().map(|s| format!("`{}`", s)).collect::<Vec<_>>().join(", ")
@ -152,6 +154,8 @@ impl ErrorCode for Error {
Error::TaskNotFound(_) => Code::TaskNotFound, Error::TaskNotFound(_) => Code::TaskNotFound,
Error::TaskDeletionWithEmptyQuery => Code::MissingTaskFilters, Error::TaskDeletionWithEmptyQuery => Code::MissingTaskFilters,
Error::TaskCancelationWithEmptyQuery => Code::MissingTaskFilters, Error::TaskCancelationWithEmptyQuery => Code::MissingTaskFilters,
// TODO: not sure of the Code to use
Error::NoSpaceLeftInTaskQueue => Code::NoSpaceLeftOnDevice,
Error::Dump(e) => e.error_code(), Error::Dump(e) => e.error_code(),
Error::Milli(e) => e.error_code(), Error::Milli(e) => e.error_code(),
Error::ProcessBatchPanicked => Code::Internal, Error::ProcessBatchPanicked => Code::Internal,

View File

@ -828,6 +828,13 @@ impl IndexScheduler {
pub fn register(&self, kind: KindWithContent) -> Result<Task> { pub fn register(&self, kind: KindWithContent) -> Result<Task> {
let mut wtxn = self.env.write_txn()?; let mut wtxn = self.env.write_txn()?;
// if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task
if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty())
&& (self.env.non_free_pages_size()? * 100) / self.env.map_size()? as u64 > 50
{
return Err(Error::NoSpaceLeftInTaskQueue);
}
let mut task = Task { let mut task = Task {
uid: self.next_task_id(&wtxn)?, uid: self.next_task_id(&wtxn)?,
enqueued_at: OffsetDateTime::now_utc(), enqueued_at: OffsetDateTime::now_utc(),
@ -1936,105 +1943,6 @@ mod tests {
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "both_task_succeeded"); snapshot!(snapshot_index_scheduler(&index_scheduler), name: "both_task_succeeded");
} }
#[test]
fn document_addition_and_document_deletion() {
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);
let content = r#"[
{ "id": 1, "doggo": "jean bob" },
{ "id": 2, "catto": "jorts" },
{ "id": 3, "doggo": "bork" }
]"#;
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
file.persist().unwrap();
index_scheduler
.register(KindWithContent::DocumentAdditionOrUpdate {
index_uid: S("doggos"),
primary_key: Some(S("id")),
method: ReplaceDocuments,
content_file: uuid,
documents_count,
allow_index_creation: true,
})
.unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
index_scheduler
.register(KindWithContent::DocumentDeletion {
index_uid: S("doggos"),
documents_ids: vec![S("1"), S("2")],
})
.unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task");
handle.advance_one_successful_batch(); // The addition AND deletion should've been batched together
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_processing_the_batch");
let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index
.all_documents(&rtxn)
.unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
}
#[test]
fn document_deletion_and_document_addition() {
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);
index_scheduler
.register(KindWithContent::DocumentDeletion {
index_uid: S("doggos"),
documents_ids: vec![S("1"), S("2")],
})
.unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_first_task");
let content = r#"[
{ "id": 1, "doggo": "jean bob" },
{ "id": 2, "catto": "jorts" },
{ "id": 3, "doggo": "bork" }
]"#;
let (uuid, mut file) = index_scheduler.create_update_file_with_uuid(0).unwrap();
let documents_count = read_json(content.as_bytes(), file.as_file_mut()).unwrap();
file.persist().unwrap();
index_scheduler
.register(KindWithContent::DocumentAdditionOrUpdate {
index_uid: S("doggos"),
primary_key: Some(S("id")),
method: ReplaceDocuments,
content_file: uuid,
documents_count,
allow_index_creation: true,
})
.unwrap();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "registered_the_second_task");
// The deletion should have failed because it can't create an index
handle.advance_one_failed_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_failing_the_deletion");
// The addition should works
handle.advance_one_successful_batch();
snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_last_successful_addition");
let index = index_scheduler.index("doggos").unwrap();
let rtxn = index.read_txn().unwrap();
let field_ids_map = index.fields_ids_map(&rtxn).unwrap();
let field_ids = field_ids_map.ids().collect::<Vec<_>>();
let documents = index
.all_documents(&rtxn)
.unwrap()
.map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap())
.collect::<Vec<_>>();
snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents");
}
#[test] #[test]
fn do_not_batch_task_of_different_indexes() { fn do_not_batch_task_of_different_indexes() {
let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]); let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);

View File

@ -68,7 +68,7 @@ const DEFAULT_LOG_EVERY_N: usize = 100_000;
// The actual size of the virtual address space is computed at startup to determine how many 2TiB indexes can be // The actual size of the virtual address space is computed at startup to determine how many 2TiB indexes can be
// opened simultaneously. // opened simultaneously.
pub const INDEX_SIZE: u64 = 2 * 1024 * 1024 * 1024 * 1024; // 2 TiB pub const INDEX_SIZE: u64 = 2 * 1024 * 1024 * 1024 * 1024; // 2 TiB
pub const TASK_DB_SIZE: u64 = 10 * 1024 * 1024 * 1024; // 10 GiB pub const TASK_DB_SIZE: u64 = 20 * 1024 * 1024 * 1024; // 20 GiB
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)] #[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "UPPERCASE")] #[serde(rename_all = "UPPERCASE")]

View File

@ -1,11 +1,14 @@
mod errors; mod errors;
use byte_unit::{Byte, ByteUnit};
use meili_snap::insta::assert_json_snapshot; use meili_snap::insta::assert_json_snapshot;
use meili_snap::{json_string, snapshot};
use serde_json::json; use serde_json::json;
use tempfile::TempDir;
use time::format_description::well_known::Rfc3339; use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime; use time::OffsetDateTime;
use crate::common::Server; use crate::common::{default_settings, Server};
#[actix_rt::test] #[actix_rt::test]
async fn error_get_unexisting_task_status() { async fn error_get_unexisting_task_status() {
@ -1000,3 +1003,117 @@ async fn test_summarized_dump_creation() {
} }
"###); "###);
} }
#[actix_web::test]
async fn test_task_queue_is_full() {
let dir = TempDir::new().unwrap();
let mut options = default_settings(dir.path());
options.max_task_db_size = Byte::from_unit(500.0, ByteUnit::B).unwrap();
let server = Server::new_with_options(options).await.unwrap();
// the first task should be enqueued without issue
let (result, code) = server.create_index(json!({ "uid": "doggo" })).await;
snapshot!(code, @"202 Accepted");
snapshot!(json_string!(result, { ".enqueuedAt" => "[date]" }), @r###"
{
"taskUid": 0,
"indexUid": "doggo",
"status": "enqueued",
"type": "indexCreation",
"enqueuedAt": "[date]"
}
"###);
loop {
let (res, code) = server.create_index(json!({ "uid": "doggo" })).await;
if code == 422 {
break;
}
if res["taskUid"] == json!(null) {
panic!(
"Encountered the strange case:\n{}",
serde_json::to_string_pretty(&res).unwrap()
);
}
}
let (result, code) = server.create_index(json!({ "uid": "doggo" })).await;
snapshot!(code, @"422 Unprocessable Entity");
snapshot!(json_string!(result), @r###"
{
"message": "Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.",
"code": "no_space_left_on_device",
"type": "system",
"link": "https://docs.meilisearch.com/errors#no_space_left_on_device"
}
"###);
// But we should still be able to register tasks deletion IF they delete something
let (result, code) = server.delete_tasks("uids=*").await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###"
{
"taskUid": "uid",
"indexUid": null,
"status": "enqueued",
"type": "taskDeletion",
"enqueuedAt": "[date]"
}
"###);
let result = server.wait_task(result["taskUid"].as_u64().unwrap()).await;
snapshot!(json_string!(result["status"]), @r###""succeeded""###);
// Now we should be able to register tasks again
let (result, code) = server.create_index(json!({ "uid": "doggo" })).await;
snapshot!(code, @"202 Accepted");
snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###"
{
"taskUid": "uid",
"indexUid": "doggo",
"status": "enqueued",
"type": "indexCreation",
"enqueuedAt": "[date]"
}
"###);
// we're going to fill up the queue once again
loop {
let (res, code) = server.delete_tasks("uids=0").await;
if code == 422 {
break;
}
if res["taskUid"] == json!(null) {
panic!(
"Encountered the strange case:\n{}",
serde_json::to_string_pretty(&res).unwrap()
);
}
}
// But we should NOT be able to register this task because it doesn't match any tasks
let (result, code) = server.delete_tasks("uids=0").await;
snapshot!(code, @"422 Unprocessable Entity");
snapshot!(json_string!(result), @r###"
{
"message": "Meilisearch cannot receive write operations because the limit of the task database has been reached. Please delete tasks to continue performing write operations.",
"code": "no_space_left_on_device",
"type": "system",
"link": "https://docs.meilisearch.com/errors#no_space_left_on_device"
}
"###);
// The deletion still works
let (result, code) = server.delete_tasks("uids=*").await;
snapshot!(code, @"200 OK");
snapshot!(json_string!(result, { ".enqueuedAt" => "[date]", ".taskUid" => "uid" }), @r###"
{
"taskUid": "uid",
"indexUid": null,
"status": "enqueued",
"type": "taskDeletion",
"enqueuedAt": "[date]"
}
"###);
}