⚡ A lightning-fast search engine that fits effortlessly into your apps, websites, and workflow 🔍
----
-
-### 🔥 On November 2nd, we are hosting our first-ever live demo and product updates for [Meilisearch Cloud](https://www.meilisearch.com/cloud?utm_campaign=oss&utm_source=github&utm_medium=meilisearch). Make sure to [register here](https://us06web.zoom.us/meeting/register/tZMlc-mqrjIsH912-HTRe-AaT-pp41bDe81a#/registration) and bring your questions for live Q&A!
-
----
-
Meilisearch helps you shape a delightful search experience in a snap, offering features that work out-of-the-box to speed up your workflow.
diff --git a/benchmarks/benches/indexing.rs b/benchmarks/benches/indexing.rs
index 9446c0b0f..0c19b89cf 100644
--- a/benchmarks/benches/indexing.rs
+++ b/benchmarks/benches/indexing.rs
@@ -6,9 +6,7 @@ use std::path::Path;
use criterion::{criterion_group, criterion_main, Criterion};
use milli::heed::{EnvOpenOptions, RwTxn};
-use milli::update::{
- DeleteDocuments, IndexDocuments, IndexDocumentsConfig, IndexerConfig, Settings,
-};
+use milli::update::{IndexDocuments, IndexDocumentsConfig, IndexerConfig, Settings};
use milli::Index;
use rand::seq::SliceRandom;
use rand_chacha::rand_core::SeedableRng;
@@ -38,7 +36,7 @@ fn setup_index() -> Index {
}
fn setup_settings<'t>(
- wtxn: &mut RwTxn<'t, '_>,
+ wtxn: &mut RwTxn<'t>,
index: &'t Index,
primary_key: &str,
searchable_fields: &[&str],
@@ -266,17 +264,7 @@ fn deleting_songs_in_batches_default(c: &mut Criterion) {
(index, document_ids_to_delete)
},
move |(index, document_ids_to_delete)| {
- let mut wtxn = index.write_txn().unwrap();
-
- for ids in document_ids_to_delete {
- let mut builder = DeleteDocuments::new(&mut wtxn, &index).unwrap();
- builder.delete_documents(&ids);
- builder.execute().unwrap();
- }
-
- wtxn.commit().unwrap();
-
- index.prepare_for_closing().wait();
+ delete_documents_from_ids(index, document_ids_to_delete)
},
)
});
@@ -613,17 +601,7 @@ fn deleting_wiki_in_batches_default(c: &mut Criterion) {
(index, document_ids_to_delete)
},
move |(index, document_ids_to_delete)| {
- let mut wtxn = index.write_txn().unwrap();
-
- for ids in document_ids_to_delete {
- let mut builder = DeleteDocuments::new(&mut wtxn, &index).unwrap();
- builder.delete_documents(&ids);
- builder.execute().unwrap();
- }
-
- wtxn.commit().unwrap();
-
- index.prepare_for_closing().wait();
+ delete_documents_from_ids(index, document_ids_to_delete)
},
)
});
@@ -875,22 +853,31 @@ fn deleting_movies_in_batches_default(c: &mut Criterion) {
(index, document_ids_to_delete)
},
move |(index, document_ids_to_delete)| {
- let mut wtxn = index.write_txn().unwrap();
-
- for ids in document_ids_to_delete {
- let mut builder = DeleteDocuments::new(&mut wtxn, &index).unwrap();
- builder.delete_documents(&ids);
- builder.execute().unwrap();
- }
-
- wtxn.commit().unwrap();
-
- index.prepare_for_closing().wait();
+ delete_documents_from_ids(index, document_ids_to_delete)
},
)
});
}
+fn delete_documents_from_ids(index: Index, document_ids_to_delete: Vec) {
+ let mut wtxn = index.write_txn().unwrap();
+
+ let indexer_config = IndexerConfig::default();
+ for ids in document_ids_to_delete {
+ let config = IndexDocumentsConfig::default();
+
+ let mut builder =
+ IndexDocuments::new(&mut wtxn, &index, &indexer_config, config, |_| (), || false)
+ .unwrap();
+ (builder, _) = builder.remove_documents_from_db_no_batch(&ids).unwrap();
+ builder.execute().unwrap();
+ }
+
+ wtxn.commit().unwrap();
+
+ index.prepare_for_closing().wait();
+}
+
fn indexing_movies_in_three_batches(c: &mut Criterion) {
let mut group = c.benchmark_group("indexing");
group.sample_size(BENCHMARK_ITERATION);
@@ -1112,17 +1099,7 @@ fn deleting_nested_movies_in_batches_default(c: &mut Criterion) {
(index, document_ids_to_delete)
},
move |(index, document_ids_to_delete)| {
- let mut wtxn = index.write_txn().unwrap();
-
- for ids in document_ids_to_delete {
- let mut builder = DeleteDocuments::new(&mut wtxn, &index).unwrap();
- builder.delete_documents(&ids);
- builder.execute().unwrap();
- }
-
- wtxn.commit().unwrap();
-
- index.prepare_for_closing().wait();
+ delete_documents_from_ids(index, document_ids_to_delete)
},
)
});
@@ -1338,17 +1315,7 @@ fn deleting_geo_in_batches_default(c: &mut Criterion) {
(index, document_ids_to_delete)
},
move |(index, document_ids_to_delete)| {
- let mut wtxn = index.write_txn().unwrap();
-
- for ids in document_ids_to_delete {
- let mut builder = DeleteDocuments::new(&mut wtxn, &index).unwrap();
- builder.delete_documents(&ids);
- builder.execute().unwrap();
- }
-
- wtxn.commit().unwrap();
-
- index.prepare_for_closing().wait();
+ delete_documents_from_ids(index, document_ids_to_delete)
},
)
});
diff --git a/config.toml b/config.toml
index c47989f56..bbd70a63f 100644
--- a/config.toml
+++ b/config.toml
@@ -129,3 +129,6 @@ experimental_enable_metrics = false
# Experimental RAM reduction during indexing, do not use in production, see:
experimental_reduce_indexing_memory_usage = false
+
+# Experimentally reduces the maximum number of tasks that will be processed at once, see:
+# experimental_max_number_of_batched_tasks = 100
diff --git a/dump/src/lib.rs b/dump/src/lib.rs
index fa3cfb49a..15b281c41 100644
--- a/dump/src/lib.rs
+++ b/dump/src/lib.rs
@@ -267,6 +267,7 @@ pub(crate) mod test {
dictionary: Setting::NotSet,
synonyms: Setting::NotSet,
distinct_attribute: Setting::NotSet,
+ proximity_precision: Setting::NotSet,
typo_tolerance: Setting::NotSet,
faceting: Setting::Set(FacetingSettings {
max_values_per_facet: Setting::Set(111),
diff --git a/dump/src/reader/compat/v5_to_v6.rs b/dump/src/reader/compat/v5_to_v6.rs
index 9e938d756..8a0d6e5e1 100644
--- a/dump/src/reader/compat/v5_to_v6.rs
+++ b/dump/src/reader/compat/v5_to_v6.rs
@@ -345,6 +345,7 @@ impl From> for v6::Settings {
dictionary: v6::Setting::NotSet,
synonyms: settings.synonyms.into(),
distinct_attribute: settings.distinct_attribute.into(),
+ proximity_precision: v6::Setting::NotSet,
typo_tolerance: match settings.typo_tolerance {
v5::Setting::Set(typo) => v6::Setting::Set(v6::TypoTolerance {
enabled: typo.enabled.into(),
diff --git a/dump/src/reader/mod.rs b/dump/src/reader/mod.rs
index af02888d2..5bbf4ec4d 100644
--- a/dump/src/reader/mod.rs
+++ b/dump/src/reader/mod.rs
@@ -13,12 +13,12 @@ use crate::{Result, Version};
mod compat;
-pub(self) mod v1;
-pub(self) mod v2;
-pub(self) mod v3;
-pub(self) mod v4;
-pub(self) mod v5;
-pub(self) mod v6;
+mod v1;
+mod v2;
+mod v3;
+mod v4;
+mod v5;
+mod v6;
pub type Document = serde_json::Map;
pub type UpdateFile = dyn Iterator>;
@@ -526,12 +526,12 @@ pub(crate) mod test {
assert!(indexes.is_empty());
// products
- insta::assert_json_snapshot!(products.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###"
+ insta::assert_json_snapshot!(products.metadata(), @r###"
{
"uid": "products",
"primaryKey": "sku",
- "createdAt": "[now]",
- "updatedAt": "[now]"
+ "createdAt": "2022-10-09T20:27:22.688964637Z",
+ "updatedAt": "2022-10-09T20:27:23.951017769Z"
}
"###);
@@ -541,12 +541,12 @@ pub(crate) mod test {
meili_snap::snapshot_hash!(format!("{:#?}", documents), @"548284a84de510f71e88e6cdea495cf5");
// movies
- insta::assert_json_snapshot!(movies.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###"
+ insta::assert_json_snapshot!(movies.metadata(), @r###"
{
"uid": "movies",
"primaryKey": "id",
- "createdAt": "[now]",
- "updatedAt": "[now]"
+ "createdAt": "2022-10-09T20:27:22.197788495Z",
+ "updatedAt": "2022-10-09T20:28:01.93111053Z"
}
"###);
@@ -571,12 +571,12 @@ pub(crate) mod test {
meili_snap::snapshot_hash!(format!("{:#?}", documents), @"d751713988987e9331980363e24189ce");
// spells
- insta::assert_json_snapshot!(spells.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###"
+ insta::assert_json_snapshot!(spells.metadata(), @r###"
{
"uid": "dnd_spells",
"primaryKey": "index",
- "createdAt": "[now]",
- "updatedAt": "[now]"
+ "createdAt": "2022-10-09T20:27:24.242683494Z",
+ "updatedAt": "2022-10-09T20:27:24.312809641Z"
}
"###);
@@ -617,12 +617,12 @@ pub(crate) mod test {
assert!(indexes.is_empty());
// products
- insta::assert_json_snapshot!(products.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###"
+ insta::assert_json_snapshot!(products.metadata(), @r###"
{
"uid": "products",
"primaryKey": "sku",
- "createdAt": "[now]",
- "updatedAt": "[now]"
+ "createdAt": "2023-01-30T16:25:56.595257Z",
+ "updatedAt": "2023-01-30T16:25:58.70348Z"
}
"###);
@@ -632,12 +632,12 @@ pub(crate) mod test {
meili_snap::snapshot_hash!(format!("{:#?}", documents), @"548284a84de510f71e88e6cdea495cf5");
// movies
- insta::assert_json_snapshot!(movies.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###"
+ insta::assert_json_snapshot!(movies.metadata(), @r###"
{
"uid": "movies",
"primaryKey": "id",
- "createdAt": "[now]",
- "updatedAt": "[now]"
+ "createdAt": "2023-01-30T16:25:56.192178Z",
+ "updatedAt": "2023-01-30T16:25:56.455714Z"
}
"###);
@@ -647,12 +647,12 @@ pub(crate) mod test {
meili_snap::snapshot_hash!(format!("{:#?}", documents), @"0227598af846e574139ee0b80e03a720");
// spells
- insta::assert_json_snapshot!(spells.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###"
+ insta::assert_json_snapshot!(spells.metadata(), @r###"
{
"uid": "dnd_spells",
"primaryKey": "index",
- "createdAt": "[now]",
- "updatedAt": "[now]"
+ "createdAt": "2023-01-30T16:25:58.876405Z",
+ "updatedAt": "2023-01-30T16:25:59.079906Z"
}
"###);
diff --git a/dump/src/reader/snapshots/dump__reader__test__import_dump_v1-11.snap b/dump/src/reader/snapshots/dump__reader__test__import_dump_v1-11.snap
deleted file mode 100644
index 92fc61d72..000000000
--- a/dump/src/reader/snapshots/dump__reader__test__import_dump_v1-11.snap
+++ /dev/null
@@ -1,24 +0,0 @@
----
-source: dump/src/reader/mod.rs
-expression: spells.settings().unwrap()
----
-{
- "displayedAttributes": [
- "*"
- ],
- "searchableAttributes": [
- "*"
- ],
- "filterableAttributes": [],
- "sortableAttributes": [],
- "rankingRules": [
- "typo",
- "words",
- "proximity",
- "attribute",
- "exactness"
- ],
- "stopWords": [],
- "synonyms": {},
- "distinctAttribute": null
-}
diff --git a/dump/src/reader/snapshots/dump__reader__test__import_dump_v1-5.snap b/dump/src/reader/snapshots/dump__reader__test__import_dump_v1-5.snap
deleted file mode 100644
index b0b54c136..000000000
--- a/dump/src/reader/snapshots/dump__reader__test__import_dump_v1-5.snap
+++ /dev/null
@@ -1,38 +0,0 @@
----
-source: dump/src/reader/mod.rs
-expression: products.settings().unwrap()
----
-{
- "displayedAttributes": [
- "*"
- ],
- "searchableAttributes": [
- "*"
- ],
- "filterableAttributes": [],
- "sortableAttributes": [],
- "rankingRules": [
- "typo",
- "words",
- "proximity",
- "attribute",
- "exactness"
- ],
- "stopWords": [],
- "synonyms": {
- "android": [
- "phone",
- "smartphone"
- ],
- "iphone": [
- "phone",
- "smartphone"
- ],
- "phone": [
- "android",
- "iphone",
- "smartphone"
- ]
- },
- "distinctAttribute": null
-}
diff --git a/dump/src/reader/snapshots/dump__reader__test__import_dump_v1-8.snap b/dump/src/reader/snapshots/dump__reader__test__import_dump_v1-8.snap
deleted file mode 100644
index 5c12a0438..000000000
--- a/dump/src/reader/snapshots/dump__reader__test__import_dump_v1-8.snap
+++ /dev/null
@@ -1,31 +0,0 @@
----
-source: dump/src/reader/mod.rs
-expression: movies.settings().unwrap()
----
-{
- "displayedAttributes": [
- "*"
- ],
- "searchableAttributes": [
- "*"
- ],
- "filterableAttributes": [
- "genres",
- "id"
- ],
- "sortableAttributes": [
- "genres",
- "id"
- ],
- "rankingRules": [
- "typo",
- "words",
- "proximity",
- "attribute",
- "exactness",
- "release_date:asc"
- ],
- "stopWords": [],
- "synonyms": {},
- "distinctAttribute": null
-}
diff --git a/dump/src/reader/v1/settings.rs b/dump/src/reader/v1/settings.rs
index 2f7976534..94343d150 100644
--- a/dump/src/reader/v1/settings.rs
+++ b/dump/src/reader/v1/settings.rs
@@ -56,8 +56,7 @@ pub enum RankingRule {
Desc(String),
}
-static ASC_DESC_REGEX: Lazy =
- Lazy::new(|| Regex::new(r#"(asc|desc)\(([\w_-]+)\)"#).unwrap());
+static ASC_DESC_REGEX: Lazy = Lazy::new(|| Regex::new(r"(asc|desc)\(([\w_-]+)\)").unwrap());
impl FromStr for RankingRule {
type Err = ();
diff --git a/dump/src/reader/v2/mod.rs b/dump/src/reader/v2/mod.rs
index 4016e6341..a0ff13a3b 100644
--- a/dump/src/reader/v2/mod.rs
+++ b/dump/src/reader/v2/mod.rs
@@ -46,6 +46,7 @@ pub type Checked = settings::Checked;
pub type Unchecked = settings::Unchecked;
pub type Task = updates::UpdateEntry;
+pub type Kind = updates::UpdateMeta;
// everything related to the errors
pub type ResponseError = errors::ResponseError;
@@ -107,8 +108,11 @@ impl V2Reader {
pub fn indexes(&self) -> Result> + '_> {
Ok(self.index_uuid.iter().map(|index| -> Result<_> {
V2IndexReader::new(
- index.uid.clone(),
&self.dump.path().join("indexes").join(format!("index-{}", index.uuid)),
+ index,
+ BufReader::new(
+ File::open(self.dump.path().join("updates").join("data.jsonl")).unwrap(),
+ ),
)
}))
}
@@ -143,16 +147,41 @@ pub struct V2IndexReader {
}
impl V2IndexReader {
- pub fn new(name: String, path: &Path) -> Result {
+ pub fn new(path: &Path, index_uuid: &IndexUuid, tasks: BufReader) -> Result {
let meta = File::open(path.join("meta.json"))?;
let meta: DumpMeta = serde_json::from_reader(meta)?;
+ let mut created_at = None;
+ let mut updated_at = None;
+
+ for line in tasks.lines() {
+ let task: Task = serde_json::from_str(&line?)?;
+ if !(task.uuid == index_uuid.uuid && task.is_finished()) {
+ continue;
+ }
+
+ let new_created_at = match task.update.meta() {
+ Kind::DocumentsAddition { .. } | Kind::Settings(_) => task.update.finished_at(),
+ _ => None,
+ };
+ let new_updated_at = task.update.finished_at();
+
+ if created_at.is_none() || created_at > new_created_at {
+ created_at = new_created_at;
+ }
+
+ if updated_at.is_none() || updated_at < new_updated_at {
+ updated_at = new_updated_at;
+ }
+ }
+
+ let current_time = OffsetDateTime::now_utc();
+
let metadata = IndexMetadata {
- uid: name,
+ uid: index_uuid.uid.clone(),
primary_key: meta.primary_key,
- // FIXME: Iterate over the whole task queue to find the creation and last update date.
- created_at: OffsetDateTime::now_utc(),
- updated_at: OffsetDateTime::now_utc(),
+ created_at: created_at.unwrap_or(current_time),
+ updated_at: updated_at.unwrap_or(current_time),
};
let ret = V2IndexReader {
@@ -248,12 +277,12 @@ pub(crate) mod test {
assert!(indexes.is_empty());
// products
- insta::assert_json_snapshot!(products.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###"
+ insta::assert_json_snapshot!(products.metadata(), @r###"
{
"uid": "products",
"primaryKey": "sku",
- "createdAt": "[now]",
- "updatedAt": "[now]"
+ "createdAt": "2022-10-09T20:27:22.688964637Z",
+ "updatedAt": "2022-10-09T20:27:23.951017769Z"
}
"###);
@@ -263,12 +292,12 @@ pub(crate) mod test {
meili_snap::snapshot_hash!(format!("{:#?}", documents), @"548284a84de510f71e88e6cdea495cf5");
// movies
- insta::assert_json_snapshot!(movies.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###"
+ insta::assert_json_snapshot!(movies.metadata(), @r###"
{
"uid": "movies",
"primaryKey": "id",
- "createdAt": "[now]",
- "updatedAt": "[now]"
+ "createdAt": "2022-10-09T20:27:22.197788495Z",
+ "updatedAt": "2022-10-09T20:28:01.93111053Z"
}
"###);
@@ -293,12 +322,12 @@ pub(crate) mod test {
meili_snap::snapshot_hash!(format!("{:#?}", documents), @"d751713988987e9331980363e24189ce");
// spells
- insta::assert_json_snapshot!(spells.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###"
+ insta::assert_json_snapshot!(spells.metadata(), @r###"
{
"uid": "dnd_spells",
"primaryKey": "index",
- "createdAt": "[now]",
- "updatedAt": "[now]"
+ "createdAt": "2022-10-09T20:27:24.242683494Z",
+ "updatedAt": "2022-10-09T20:27:24.312809641Z"
}
"###);
@@ -340,12 +369,12 @@ pub(crate) mod test {
assert!(indexes.is_empty());
// products
- insta::assert_json_snapshot!(products.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###"
+ insta::assert_json_snapshot!(products.metadata(), @r###"
{
"uid": "products",
"primaryKey": "sku",
- "createdAt": "[now]",
- "updatedAt": "[now]"
+ "createdAt": "2023-01-30T16:25:56.595257Z",
+ "updatedAt": "2023-01-30T16:25:58.70348Z"
}
"###);
@@ -355,12 +384,12 @@ pub(crate) mod test {
meili_snap::snapshot_hash!(format!("{:#?}", documents), @"548284a84de510f71e88e6cdea495cf5");
// movies
- insta::assert_json_snapshot!(movies.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###"
+ insta::assert_json_snapshot!(movies.metadata(), @r###"
{
"uid": "movies",
"primaryKey": "id",
- "createdAt": "[now]",
- "updatedAt": "[now]"
+ "createdAt": "2023-01-30T16:25:56.192178Z",
+ "updatedAt": "2023-01-30T16:25:56.455714Z"
}
"###);
@@ -370,12 +399,12 @@ pub(crate) mod test {
meili_snap::snapshot_hash!(format!("{:#?}", documents), @"0227598af846e574139ee0b80e03a720");
// spells
- insta::assert_json_snapshot!(spells.metadata(), { ".createdAt" => "[now]", ".updatedAt" => "[now]" }, @r###"
+ insta::assert_json_snapshot!(spells.metadata(), @r###"
{
"uid": "dnd_spells",
"primaryKey": "index",
- "createdAt": "[now]",
- "updatedAt": "[now]"
+ "createdAt": "2023-01-30T16:25:58.876405Z",
+ "updatedAt": "2023-01-30T16:25:59.079906Z"
}
"###);
diff --git a/dump/src/reader/v2/updates.rs b/dump/src/reader/v2/updates.rs
index 33d88d46f..bf1227c7a 100644
--- a/dump/src/reader/v2/updates.rs
+++ b/dump/src/reader/v2/updates.rs
@@ -227,4 +227,14 @@ impl UpdateStatus {
_ => None,
}
}
+
+ pub fn finished_at(&self) -> Option {
+ match self {
+ UpdateStatus::Processing(_) => None,
+ UpdateStatus::Enqueued(_) => None,
+ UpdateStatus::Processed(u) => Some(u.processed_at),
+ UpdateStatus::Aborted(_) => None,
+ UpdateStatus::Failed(u) => Some(u.failed_at),
+ }
+ }
}
diff --git a/filter-parser/src/lib.rs b/filter-parser/src/lib.rs
index 5760c8865..fa5b70606 100644
--- a/filter-parser/src/lib.rs
+++ b/filter-parser/src/lib.rs
@@ -564,10 +564,10 @@ pub mod tests {
#[test]
fn parse_escaped() {
- insta::assert_display_snapshot!(p(r#"title = 'foo\\'"#), @r#"{title} = {foo\}"#);
- insta::assert_display_snapshot!(p(r#"title = 'foo\\\\'"#), @r#"{title} = {foo\\}"#);
- insta::assert_display_snapshot!(p(r#"title = 'foo\\\\\\'"#), @r#"{title} = {foo\\\}"#);
- insta::assert_display_snapshot!(p(r#"title = 'foo\\\\\\\\'"#), @r#"{title} = {foo\\\\}"#);
+ insta::assert_display_snapshot!(p(r"title = 'foo\\'"), @r#"{title} = {foo\}"#);
+ insta::assert_display_snapshot!(p(r"title = 'foo\\\\'"), @r#"{title} = {foo\\}"#);
+ insta::assert_display_snapshot!(p(r"title = 'foo\\\\\\'"), @r#"{title} = {foo\\\}"#);
+ insta::assert_display_snapshot!(p(r"title = 'foo\\\\\\\\'"), @r#"{title} = {foo\\\\}"#);
// but it also works with other sequencies
insta::assert_display_snapshot!(p(r#"title = 'foo\x20\n\t\"\'"'"#), @"{title} = {foo \n\t\"\'\"}");
}
diff --git a/filter-parser/src/value.rs b/filter-parser/src/value.rs
index 63d5ac384..1d70cb025 100644
--- a/filter-parser/src/value.rs
+++ b/filter-parser/src/value.rs
@@ -270,8 +270,8 @@ pub mod test {
("aaaa", "", rtok("", "aaaa"), "aaaa"),
(r#"aa"aa"#, r#""aa"#, rtok("", "aa"), "aa"),
(r#"aa\"aa"#, r#""#, rtok("", r#"aa\"aa"#), r#"aa"aa"#),
- (r#"aa\\\aa"#, r#""#, rtok("", r#"aa\\\aa"#), r#"aa\\\aa"#),
- (r#"aa\\"\aa"#, r#""\aa"#, rtok("", r#"aa\\"#), r#"aa\\"#),
+ (r"aa\\\aa", r#""#, rtok("", r"aa\\\aa"), r"aa\\\aa"),
+ (r#"aa\\"\aa"#, r#""\aa"#, rtok("", r"aa\\"), r"aa\\"),
(r#"aa\\\"\aa"#, r#""#, rtok("", r#"aa\\\"\aa"#), r#"aa\\"\aa"#),
(r#"\"\""#, r#""#, rtok("", r#"\"\""#), r#""""#),
];
@@ -301,12 +301,12 @@ pub mod test {
);
// simple quote
assert_eq!(
- unescape(Span::new_extra(r#"Hello \'World\'"#, ""), '\''),
+ unescape(Span::new_extra(r"Hello \'World\'", ""), '\''),
r#"Hello 'World'"#.to_string()
);
assert_eq!(
- unescape(Span::new_extra(r#"Hello \\\'World\\\'"#, ""), '\''),
- r#"Hello \\'World\\'"#.to_string()
+ unescape(Span::new_extra(r"Hello \\\'World\\\'", ""), '\''),
+ r"Hello \\'World\\'".to_string()
);
}
@@ -335,19 +335,19 @@ pub mod test {
("\"cha'nnel\"", "cha'nnel", false),
("I'm tamo", "I", false),
// escaped thing but not quote
- (r#""\\""#, r#"\"#, true),
- (r#""\\\\\\""#, r#"\\\"#, true),
- (r#""aa\\aa""#, r#"aa\aa"#, true),
+ (r#""\\""#, r"\", true),
+ (r#""\\\\\\""#, r"\\\", true),
+ (r#""aa\\aa""#, r"aa\aa", true),
// with double quote
(r#""Hello \"world\"""#, r#"Hello "world""#, true),
(r#""Hello \\\"world\\\"""#, r#"Hello \"world\""#, true),
(r#""I'm \"super\" tamo""#, r#"I'm "super" tamo"#, true),
(r#""\"\"""#, r#""""#, true),
// with simple quote
- (r#"'Hello \'world\''"#, r#"Hello 'world'"#, true),
- (r#"'Hello \\\'world\\\''"#, r#"Hello \'world\'"#, true),
+ (r"'Hello \'world\''", r#"Hello 'world'"#, true),
+ (r"'Hello \\\'world\\\''", r"Hello \'world\'", true),
(r#"'I\'m "super" tamo'"#, r#"I'm "super" tamo"#, true),
- (r#"'\'\''"#, r#"''"#, true),
+ (r"'\'\''", r#"''"#, true),
];
for (input, expected, escaped) in test_case {
diff --git a/fuzzers/src/bin/fuzz-indexing.rs b/fuzzers/src/bin/fuzz-indexing.rs
index 1d53e069c..baf705709 100644
--- a/fuzzers/src/bin/fuzz-indexing.rs
+++ b/fuzzers/src/bin/fuzz-indexing.rs
@@ -113,7 +113,7 @@ fn main() {
index.documents(&wtxn, res.documents_ids).unwrap();
progression.fetch_add(1, Ordering::Relaxed);
}
- wtxn.abort().unwrap();
+ wtxn.abort();
});
if let err @ Err(_) = handle.join() {
stop.store(true, Ordering::Relaxed);
diff --git a/index-scheduler/Cargo.toml b/index-scheduler/Cargo.toml
index 9e7c2ae4b..c4a37b7d6 100644
--- a/index-scheduler/Cargo.toml
+++ b/index-scheduler/Cargo.toml
@@ -22,7 +22,7 @@ log = "0.4.17"
meilisearch-auth = { path = "../meilisearch-auth" }
meilisearch-types = { path = "../meilisearch-types" }
page_size = "0.5.0"
-puffin = "0.16.0"
+puffin = { version = "0.16.0", features = ["serialization"] }
roaring = { version = "0.10.1", features = ["serde"] }
serde = { version = "1.0.160", features = ["derive"] }
serde_json = { version = "1.0.95", features = ["preserve_order"] }
diff --git a/index-scheduler/src/batch.rs b/index-scheduler/src/batch.rs
index aa93cda2a..94a8b3f07 100644
--- a/index-scheduler/src/batch.rs
+++ b/index-scheduler/src/batch.rs
@@ -24,16 +24,15 @@ use std::fs::{self, File};
use std::io::BufWriter;
use dump::IndexMetadata;
-use log::{debug, error, info};
+use log::{debug, error, info, trace};
use meilisearch_types::error::Code;
use meilisearch_types::heed::{RoTxn, RwTxn};
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
use meilisearch_types::milli::heed::CompactionOption;
use meilisearch_types::milli::update::{
- DeleteDocuments, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod,
- Settings as MilliSettings,
+ IndexDocumentsConfig, IndexDocumentsMethod, IndexerConfig, Settings as MilliSettings,
};
-use meilisearch_types::milli::{self, Filter, BEU32};
+use meilisearch_types::milli::{self, Filter};
use meilisearch_types::settings::{apply_settings_to_builder, Settings, Unchecked};
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status, Task};
use meilisearch_types::{compression, Index, VERSION_FILE_NAME};
@@ -44,7 +43,7 @@ use uuid::Uuid;
use crate::autobatcher::{self, BatchKind};
use crate::utils::{self, swap_index_uid_in_task};
-use crate::{Error, IndexScheduler, ProcessingTasks, Result, TaskId};
+use crate::{Error, IndexScheduler, MustStopProcessing, ProcessingTasks, Result, TaskId};
/// Represents a combination of tasks that can all be processed at the same time.
///
@@ -105,12 +104,6 @@ pub(crate) enum IndexOperation {
operations: Vec,
tasks: Vec,
},
- DocumentDeletion {
- index_uid: String,
- // The vec associated with each document deletion tasks.
- documents: Vec>,
- tasks: Vec,
- },
IndexDocumentDeletionByFilter {
index_uid: String,
task: Task,
@@ -162,7 +155,6 @@ impl Batch {
}
Batch::IndexOperation { op, .. } => match op {
IndexOperation::DocumentOperation { tasks, .. }
- | IndexOperation::DocumentDeletion { tasks, .. }
| IndexOperation::Settings { tasks, .. }
| IndexOperation::DocumentClear { tasks, .. } => {
tasks.iter().map(|task| task.uid).collect()
@@ -227,7 +219,6 @@ impl IndexOperation {
pub fn index_uid(&self) -> &str {
match self {
IndexOperation::DocumentOperation { index_uid, .. }
- | IndexOperation::DocumentDeletion { index_uid, .. }
| IndexOperation::IndexDocumentDeletionByFilter { index_uid, .. }
| IndexOperation::DocumentClear { index_uid, .. }
| IndexOperation::Settings { index_uid, .. }
@@ -243,9 +234,6 @@ impl fmt::Display for IndexOperation {
IndexOperation::DocumentOperation { .. } => {
f.write_str("IndexOperation::DocumentOperation")
}
- IndexOperation::DocumentDeletion { .. } => {
- f.write_str("IndexOperation::DocumentDeletion")
- }
IndexOperation::IndexDocumentDeletionByFilter { .. } => {
f.write_str("IndexOperation::IndexDocumentDeletionByFilter")
}
@@ -348,18 +336,27 @@ impl IndexScheduler {
BatchKind::DocumentDeletion { deletion_ids } => {
let tasks = self.get_existing_tasks(rtxn, deletion_ids)?;
- let mut documents = Vec::new();
+ let mut operations = Vec::with_capacity(tasks.len());
+ let mut documents_counts = Vec::with_capacity(tasks.len());
for task in &tasks {
match task.kind {
KindWithContent::DocumentDeletion { ref documents_ids, .. } => {
- documents.push(documents_ids.clone())
+ operations.push(DocumentOperation::Delete(documents_ids.clone()));
+ documents_counts.push(documents_ids.len() as u64);
}
_ => unreachable!(),
}
}
Ok(Some(Batch::IndexOperation {
- op: IndexOperation::DocumentDeletion { index_uid, documents, tasks },
+ op: IndexOperation::DocumentOperation {
+ index_uid,
+ primary_key: None,
+ method: IndexDocumentsMethod::ReplaceDocuments,
+ documents_counts,
+ operations,
+ tasks,
+ },
must_create_index,
}))
}
@@ -587,7 +584,9 @@ impl IndexScheduler {
let index_tasks = self.index_tasks(rtxn, index_name)? & enqueued;
// If autobatching is disabled we only take one task at a time.
- let tasks_limit = if self.autobatching_enabled { usize::MAX } else { 1 };
+ // Otherwise, we take only a maximum of tasks to create batches.
+ let tasks_limit =
+ if self.autobatching_enabled { self.max_number_of_batched_tasks } else { 1 };
let enqueued = index_tasks
.into_iter()
@@ -718,7 +717,7 @@ impl IndexScheduler {
// 2. Snapshot the index-scheduler LMDB env
//
- // When we call copy_to_path, LMDB opens a read transaction by itself,
+ // When we call copy_to_file, LMDB opens a read transaction by itself,
// we can't provide our own. It is an issue as we would like to know
// the update files to copy but new ones can be enqueued between the copy
// of the env and the new transaction we open to retrieve the enqueued tasks.
@@ -731,7 +730,7 @@ impl IndexScheduler {
// 2.1 First copy the LMDB env of the index-scheduler
let dst = temp_snapshot_dir.path().join("tasks");
fs::create_dir_all(&dst)?;
- self.env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
+ self.env.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;
// 2.2 Create a read transaction on the index-scheduler
let rtxn = self.env.read_txn()?;
@@ -756,7 +755,7 @@ impl IndexScheduler {
let index = self.index_mapper.index(&rtxn, name)?;
let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string());
fs::create_dir_all(&dst)?;
- index.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
+ index.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;
}
drop(rtxn);
@@ -769,7 +768,7 @@ impl IndexScheduler {
.map_size(1024 * 1024 * 1024) // 1 GiB
.max_dbs(2)
.open(&self.auth_path)?;
- auth.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
+ auth.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;
// 5. Copy and tarball the flat snapshot
// 5.1 Find the original name of the database
@@ -825,6 +824,10 @@ impl IndexScheduler {
// 2. dump the tasks
let mut dump_tasks = dump.create_tasks_queue()?;
for ret in self.all_tasks.iter(&rtxn)? {
+ if self.must_stop_processing.get() {
+ return Err(Error::AbortedTask);
+ }
+
let (_, mut t) = ret?;
let status = t.status;
let content_file = t.content_uuid();
@@ -845,6 +848,9 @@ impl IndexScheduler {
// 2.1. Dump the `content_file` associated with the task if there is one and the task is not finished yet.
if let Some(content_file) = content_file {
+ if self.must_stop_processing.get() {
+ return Err(Error::AbortedTask);
+ }
if status == Status::Enqueued {
let content_file = self.file_store.get_update(content_file)?;
@@ -884,6 +890,9 @@ impl IndexScheduler {
// 3.1. Dump the documents
for ret in index.all_documents(&rtxn)? {
+ if self.must_stop_processing.get() {
+ return Err(Error::AbortedTask);
+ }
let (_id, doc) = ret?;
let document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc)?;
index_dumper.push_document(&document)?;
@@ -903,6 +912,9 @@ impl IndexScheduler {
"[year repr:full][month repr:numerical][day padding:zero]-[hour padding:zero][minute padding:zero][second padding:zero][subsecond digits:3]"
)).unwrap();
+ if self.must_stop_processing.get() {
+ return Err(Error::AbortedTask);
+ }
let path = self.dumps_path.join(format!("{}.dump", dump_uid));
let file = File::create(path)?;
dump.persist_to(BufWriter::new(file))?;
@@ -1096,7 +1108,7 @@ impl IndexScheduler {
for task_id in &index_lhs_task_ids | &index_rhs_task_ids {
let mut task = self.get_task(wtxn, task_id)?.ok_or(Error::CorruptedTaskQueue)?;
swap_index_uid_in_task(&mut task, (lhs, rhs));
- self.all_tasks.put(wtxn, &BEU32::new(task_id), &task)?;
+ self.all_tasks.put(wtxn, &task_id, &task)?;
}
// 4. remove the task from indexuid = before_name
@@ -1122,7 +1134,7 @@ impl IndexScheduler {
/// The list of processed tasks.
fn apply_index_operation<'i>(
&self,
- index_wtxn: &mut RwTxn<'i, '_>,
+ index_wtxn: &mut RwTxn<'i>,
index: &'i Index,
operation: IndexOperation,
) -> Result> {
@@ -1195,7 +1207,7 @@ impl IndexScheduler {
index,
indexer_config,
config,
- |indexing_step| debug!("update: {:?}", indexing_step),
+ |indexing_step| trace!("update: {:?}", indexing_step),
|| must_stop_processing.get(),
)?;
@@ -1242,7 +1254,8 @@ impl IndexScheduler {
let (new_builder, user_result) =
builder.remove_documents(document_ids)?;
builder = new_builder;
-
+ // Uses Invariant: remove documents actually always returns Ok for the inner result
+ let count = user_result.unwrap();
let provided_ids =
if let Some(Details::DocumentDeletion { provided_ids, .. }) =
task.details
@@ -1253,23 +1266,11 @@ impl IndexScheduler {
unreachable!();
};
- match user_result {
- Ok(count) => {
- task.status = Status::Succeeded;
- task.details = Some(Details::DocumentDeletion {
- provided_ids,
- deleted_documents: Some(count),
- });
- }
- Err(e) => {
- task.status = Status::Failed;
- task.details = Some(Details::DocumentDeletion {
- provided_ids,
- deleted_documents: Some(0),
- });
- task.error = Some(milli::Error::from(e).into());
- }
- }
+ task.status = Status::Succeeded;
+ task.details = Some(Details::DocumentDeletion {
+ provided_ids,
+ deleted_documents: Some(count),
+ });
}
}
}
@@ -1284,31 +1285,13 @@ impl IndexScheduler {
milli::update::Settings::new(index_wtxn, index, indexer_config);
builder.reset_primary_key();
builder.execute(
- |indexing_step| debug!("update: {:?}", indexing_step),
+ |indexing_step| trace!("update: {:?}", indexing_step),
|| must_stop_processing.clone().get(),
)?;
}
Ok(tasks)
}
- IndexOperation::DocumentDeletion { index_uid: _, documents, mut tasks } => {
- let mut builder = milli::update::DeleteDocuments::new(index_wtxn, index)?;
- documents.iter().flatten().for_each(|id| {
- builder.delete_external_id(id);
- });
-
- let DocumentDeletionResult { deleted_documents, .. } = builder.execute()?;
-
- for (task, documents) in tasks.iter_mut().zip(documents) {
- task.status = Status::Succeeded;
- task.details = Some(Details::DocumentDeletion {
- provided_ids: documents.len(),
- deleted_documents: Some(deleted_documents.min(documents.len() as u64)),
- });
- }
-
- Ok(tasks)
- }
IndexOperation::IndexDocumentDeletionByFilter { mut task, index_uid: _ } => {
let filter =
if let KindWithContent::DocumentDeletionByFilter { filter_expr, .. } =
@@ -1318,7 +1301,13 @@ impl IndexScheduler {
} else {
unreachable!()
};
- let deleted_documents = delete_document_by_filter(index_wtxn, filter, index);
+ let deleted_documents = delete_document_by_filter(
+ index_wtxn,
+ filter,
+ self.index_mapper.indexer_config(),
+ self.must_stop_processing.clone(),
+ index,
+ );
let original_filter = if let Some(Details::DocumentDeletionByFilter {
original_filter,
deleted_documents: _,
@@ -1356,6 +1345,9 @@ impl IndexScheduler {
for (task, (_, settings)) in tasks.iter_mut().zip(settings) {
let checked_settings = settings.clone().check();
+ if checked_settings.proximity_precision.set().is_some() {
+ self.features.features().check_proximity_precision()?;
+ }
task.details = Some(Details::SettingsUpdate { settings: Box::new(settings) });
apply_settings_to_builder(&checked_settings, &mut builder);
@@ -1492,10 +1484,9 @@ impl IndexScheduler {
}
for task in to_delete_tasks.iter() {
- self.all_tasks.delete(wtxn, &BEU32::new(task))?;
+ self.all_tasks.delete(wtxn, &task)?;
}
for canceled_by in affected_canceled_by {
- let canceled_by = BEU32::new(canceled_by);
if let Some(mut tasks) = self.canceled_by.get(wtxn, &canceled_by)? {
tasks -= &to_delete_tasks;
if tasks.is_empty() {
@@ -1543,15 +1534,17 @@ impl IndexScheduler {
task.details = task.details.map(|d| d.to_failed());
self.update_task(wtxn, &task)?;
}
- self.canceled_by.put(wtxn, &BEU32::new(cancel_task_id), &tasks_to_cancel)?;
+ self.canceled_by.put(wtxn, &cancel_task_id, &tasks_to_cancel)?;
Ok(content_files_to_delete)
}
}
fn delete_document_by_filter<'a>(
- wtxn: &mut RwTxn<'a, '_>,
+ wtxn: &mut RwTxn<'a>,
filter: &serde_json::Value,
+ indexer_config: &IndexerConfig,
+ must_stop_processing: MustStopProcessing,
index: &'a Index,
) -> Result {
let filter = Filter::from_json(filter)?;
@@ -1562,9 +1555,26 @@ fn delete_document_by_filter<'a>(
}
e => e.into(),
})?;
- let mut delete_operation = DeleteDocuments::new(wtxn, index)?;
- delete_operation.delete_documents(&candidates);
- delete_operation.execute().map(|result| result.deleted_documents)?
+
+ let config = IndexDocumentsConfig {
+ update_method: IndexDocumentsMethod::ReplaceDocuments,
+ ..Default::default()
+ };
+
+ let mut builder = milli::update::IndexDocuments::new(
+ wtxn,
+ index,
+ indexer_config,
+ config,
+ |indexing_step| debug!("update: {:?}", indexing_step),
+ || must_stop_processing.get(),
+ )?;
+
+ let (new_builder, count) = builder.remove_documents_from_db_no_batch(&candidates)?;
+ builder = new_builder;
+
+ let _ = builder.execute()?;
+ count
} else {
0
})
diff --git a/index-scheduler/src/error.rs b/index-scheduler/src/error.rs
index ddc6960f7..bbe526460 100644
--- a/index-scheduler/src/error.rs
+++ b/index-scheduler/src/error.rs
@@ -108,6 +108,8 @@ pub enum Error {
TaskDeletionWithEmptyQuery,
#[error("Query parameters to filter the tasks to cancel are missing. Available query parameters are: `uids`, `indexUids`, `statuses`, `types`, `canceledBy`, `beforeEnqueuedAt`, `afterEnqueuedAt`, `beforeStartedAt`, `afterStartedAt`, `beforeFinishedAt`, `afterFinishedAt`.")]
TaskCancelationWithEmptyQuery,
+ #[error("Aborted task")]
+ AbortedTask,
#[error(transparent)]
Dump(#[from] dump::Error),
@@ -175,6 +177,7 @@ impl Error {
| Error::TaskNotFound(_)
| Error::TaskDeletionWithEmptyQuery
| Error::TaskCancelationWithEmptyQuery
+ | Error::AbortedTask
| Error::Dump(_)
| Error::Heed(_)
| Error::Milli(_)
@@ -236,6 +239,9 @@ impl ErrorCode for Error {
Error::TaskDatabaseUpdate(_) => Code::Internal,
Error::CreateBatch(_) => Code::Internal,
+ // This one should never be seen by the end user
+ Error::AbortedTask => Code::Internal,
+
#[cfg(test)]
Error::PlannedFailure => Code::Internal,
}
diff --git a/index-scheduler/src/features.rs b/index-scheduler/src/features.rs
index 1db27bcd5..ae2823c30 100644
--- a/index-scheduler/src/features.rs
+++ b/index-scheduler/src/features.rs
@@ -81,6 +81,19 @@ impl RoFeatures {
.into())
}
}
+
+ pub fn check_proximity_precision(&self) -> Result<()> {
+ if self.runtime.proximity_precision {
+ Ok(())
+ } else {
+ Err(FeatureNotEnabledError {
+ disabled_action: "Using `proximityPrecision` index setting",
+ feature: "proximity precision",
+ issue_link: "https://github.com/orgs/meilisearch/discussions/710",
+ }
+ .into())
+ }
+ }
}
impl FeatureData {
diff --git a/index-scheduler/src/index_mapper/index_map.rs b/index-scheduler/src/index_mapper/index_map.rs
index a24213558..f8080d23b 100644
--- a/index-scheduler/src/index_mapper/index_map.rs
+++ b/index-scheduler/src/index_mapper/index_map.rs
@@ -1,12 +1,8 @@
-/// the map size to use when we don't succeed in reading it in indexes.
-const DEFAULT_MAP_SIZE: usize = 10 * 1024 * 1024 * 1024; // 10 GiB
-
use std::collections::BTreeMap;
use std::path::Path;
use std::time::Duration;
-use meilisearch_types::heed::flags::Flags;
-use meilisearch_types::heed::{EnvClosingEvent, EnvOpenOptions};
+use meilisearch_types::heed::{EnvClosingEvent, EnvFlags, EnvOpenOptions};
use meilisearch_types::milli::Index;
use time::OffsetDateTime;
use uuid::Uuid;
@@ -236,7 +232,7 @@ impl IndexMap {
enable_mdb_writemap: bool,
map_size_growth: usize,
) {
- let map_size = index.map_size().unwrap_or(DEFAULT_MAP_SIZE) + map_size_growth;
+ let map_size = index.map_size() + map_size_growth;
let closing_event = index.prepare_for_closing();
let generation = self.next_generation();
self.unavailable.insert(
@@ -309,7 +305,7 @@ fn create_or_open_index(
options.map_size(clamp_to_page_size(map_size));
options.max_readers(1024);
if enable_mdb_writemap {
- unsafe { options.flag(Flags::MdbWriteMap) };
+ unsafe { options.flags(EnvFlags::WRITE_MAP) };
}
if let Some((created, updated)) = date {
@@ -388,7 +384,7 @@ mod tests {
fn assert_index_size(index: Index, expected: usize) {
let expected = clamp_to_page_size(expected);
- let index_map_size = index.map_size().unwrap();
+ let index_map_size = index.map_size();
assert_eq!(index_map_size, expected);
}
}
diff --git a/index-scheduler/src/insta_snapshot.rs b/index-scheduler/src/insta_snapshot.rs
index 6096bad38..bd8fa5148 100644
--- a/index-scheduler/src/insta_snapshot.rs
+++ b/index-scheduler/src/insta_snapshot.rs
@@ -1,7 +1,7 @@
use std::collections::BTreeSet;
use std::fmt::Write;
-use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
+use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{Database, RoTxn};
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Details, Task};
@@ -30,6 +30,7 @@ pub fn snapshot_index_scheduler(scheduler: &IndexScheduler) -> String {
index_mapper,
features: _,
max_number_of_tasks: _,
+ max_number_of_batched_tasks: _,
puffin_frame: _,
wake_up: _,
dumps_path: _,
@@ -115,7 +116,7 @@ pub fn snapshot_bitmap(r: &RoaringBitmap) -> String {
snap
}
-pub fn snapshot_all_tasks(rtxn: &RoTxn, db: Database, SerdeJson>) -> String {
+pub fn snapshot_all_tasks(rtxn: &RoTxn, db: Database>) -> String {
let mut snap = String::new();
let iter = db.iter(rtxn).unwrap();
for next in iter {
@@ -125,10 +126,7 @@ pub fn snapshot_all_tasks(rtxn: &RoTxn, db: Database, SerdeJson
snap
}
-pub fn snapshot_date_db(
- rtxn: &RoTxn,
- db: Database, CboRoaringBitmapCodec>,
-) -> String {
+pub fn snapshot_date_db(rtxn: &RoTxn, db: Database) -> String {
let mut snap = String::new();
let iter = db.iter(rtxn).unwrap();
for next in iter {
@@ -248,10 +246,7 @@ pub fn snapshot_index_tasks(rtxn: &RoTxn, db: Database)
}
snap
}
-pub fn snapshot_canceled_by(
- rtxn: &RoTxn,
- db: Database, RoaringBitmapCodec>,
-) -> String {
+pub fn snapshot_canceled_by(rtxn: &RoTxn, db: Database) -> String {
let mut snap = String::new();
let iter = db.iter(rtxn).unwrap();
for next in iter {
diff --git a/index-scheduler/src/lib.rs b/index-scheduler/src/lib.rs
index 95902aa15..a1b6497d9 100644
--- a/index-scheduler/src/lib.rs
+++ b/index-scheduler/src/lib.rs
@@ -47,8 +47,9 @@ pub use features::RoFeatures;
use file_store::FileStore;
use meilisearch_types::error::ResponseError;
use meilisearch_types::features::{InstanceTogglableFeatures, RuntimeTogglableFeatures};
-use meilisearch_types::heed::types::{OwnedType, SerdeBincode, SerdeJson, Str};
-use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn};
+use meilisearch_types::heed::byteorder::BE;
+use meilisearch_types::heed::types::{SerdeBincode, SerdeJson, Str, I128};
+use meilisearch_types::heed::{self, Database, Env, PutFlags, RoTxn, RwTxn};
use meilisearch_types::milli::documents::DocumentsBatchBuilder;
use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::{self, CboRoaringBitmapCodec, Index, RoaringBitmapCodec, BEU32};
@@ -64,8 +65,7 @@ use uuid::Uuid;
use crate::index_mapper::IndexMapper;
use crate::utils::{check_index_swap_validity, clamp_to_page_size};
-pub(crate) type BEI128 =
- meilisearch_types::heed::zerocopy::I128;
+pub(crate) type BEI128 = I128;
/// Defines a subset of tasks to be retrieved from the [`IndexScheduler`].
///
@@ -258,6 +258,9 @@ pub struct IndexSchedulerOptions {
/// The maximum number of tasks stored in the task queue before starting
/// to auto schedule task deletions.
pub max_number_of_tasks: usize,
+ /// If the autobatcher is allowed to automatically batch tasks
+ /// it will only batch this defined number of tasks at once.
+ pub max_number_of_batched_tasks: usize,
/// The experimental features enabled for this instance.
pub instance_features: InstanceTogglableFeatures,
}
@@ -278,7 +281,7 @@ pub struct IndexScheduler {
pub(crate) file_store: FileStore,
// The main database, it contains all the tasks accessible by their Id.
- pub(crate) all_tasks: Database, SerdeJson>,
+ pub(crate) all_tasks: Database>,
/// All the tasks ids grouped by their status.
// TODO we should not be able to serialize a `Status::Processing` in this database.
@@ -289,16 +292,16 @@ pub struct IndexScheduler {
pub(crate) index_tasks: Database,
/// Store the tasks that were canceled by a task uid
- pub(crate) canceled_by: Database, RoaringBitmapCodec>,
+ pub(crate) canceled_by: Database,
/// Store the task ids of tasks which were enqueued at a specific date
- pub(crate) enqueued_at: Database, CboRoaringBitmapCodec>,
+ pub(crate) enqueued_at: Database,
/// Store the task ids of finished tasks which started being processed at a specific date
- pub(crate) started_at: Database, CboRoaringBitmapCodec>,
+ pub(crate) started_at: Database,
/// Store the task ids of tasks which finished at a specific date
- pub(crate) finished_at: Database, CboRoaringBitmapCodec>,
+ pub(crate) finished_at: Database,
/// In charge of creating, opening, storing and returning indexes.
pub(crate) index_mapper: IndexMapper,
@@ -316,6 +319,9 @@ pub struct IndexScheduler {
/// the finished tasks automatically.
pub(crate) max_number_of_tasks: usize,
+ /// The maximum number of tasks that will be batched together.
+ pub(crate) max_number_of_batched_tasks: usize,
+
/// A frame to output the indexation profiling files to disk.
pub(crate) puffin_frame: Arc,
@@ -373,6 +379,7 @@ impl IndexScheduler {
wake_up: self.wake_up.clone(),
autobatching_enabled: self.autobatching_enabled,
max_number_of_tasks: self.max_number_of_tasks,
+ max_number_of_batched_tasks: self.max_number_of_batched_tasks,
puffin_frame: self.puffin_frame.clone(),
snapshots_path: self.snapshots_path.clone(),
dumps_path: self.dumps_path.clone(),
@@ -471,6 +478,7 @@ impl IndexScheduler {
puffin_frame: Arc::new(puffin::GlobalFrameView::default()),
autobatching_enabled: options.autobatching_enabled,
max_number_of_tasks: options.max_number_of_tasks,
+ max_number_of_batched_tasks: options.max_number_of_batched_tasks,
dumps_path: options.dumps_path,
snapshots_path: options.snapshots_path,
auth_path: options.auth_path,
@@ -730,9 +738,7 @@ impl IndexScheduler {
if let Some(canceled_by) = &query.canceled_by {
let mut all_canceled_tasks = RoaringBitmap::new();
for cancel_task_uid in canceled_by {
- if let Some(canceled_by_uid) =
- self.canceled_by.get(rtxn, &BEU32::new(*cancel_task_uid))?
- {
+ if let Some(canceled_by_uid) = self.canceled_by.get(rtxn, cancel_task_uid)? {
all_canceled_tasks |= canceled_by_uid;
}
}
@@ -983,7 +989,7 @@ impl IndexScheduler {
// if the task doesn't delete anything and 50% of the task queue is full, we must refuse to enqueue the incomming task
if !matches!(&kind, KindWithContent::TaskDeletion { tasks, .. } if !tasks.is_empty())
- && (self.env.non_free_pages_size()? * 100) / self.env.map_size()? as u64 > 50
+ && (self.env.non_free_pages_size()? * 100) / self.env.info().map_size as u64 > 50
{
return Err(Error::NoSpaceLeftInTaskQueue);
}
@@ -1009,7 +1015,7 @@ impl IndexScheduler {
// Get rid of the mutability.
let task = task;
- self.all_tasks.append(&mut wtxn, &BEU32::new(task.uid), &task)?;
+ self.all_tasks.put_with_flags(&mut wtxn, PutFlags::APPEND, &task.uid, &task)?;
for index in task.indexes() {
self.update_index(&mut wtxn, index, |bitmap| {
@@ -1183,10 +1189,11 @@ impl IndexScheduler {
// If we have an abortion error we must stop the tick here and re-schedule tasks.
Err(Error::Milli(milli::Error::InternalError(
milli::InternalError::AbortedIndexation,
- ))) => {
+ )))
+ | Err(Error::AbortedTask) => {
#[cfg(test)]
self.breakpoint(Breakpoint::AbortedIndexation);
- wtxn.abort().map_err(Error::HeedTransaction)?;
+ wtxn.abort();
// We make sure that we don't call `stop_processing` on the `processing_tasks`,
// this is because we want to let the next tick call `create_next_batch` and keep
@@ -1207,7 +1214,7 @@ impl IndexScheduler {
let index_uid = index_uid.unwrap();
// fixme: handle error more gracefully? not sure when this could happen
self.index_mapper.resize_index(&wtxn, &index_uid)?;
- wtxn.abort().map_err(Error::HeedTransaction)?;
+ wtxn.abort();
return Ok(TickOutcome::TickAgain(0));
}
@@ -1353,7 +1360,7 @@ impl IndexScheduler {
pub struct Dump<'a> {
index_scheduler: &'a IndexScheduler,
- wtxn: RwTxn<'a, 'a>,
+ wtxn: RwTxn<'a>,
indexes: HashMap,
statuses: HashMap,
@@ -1468,7 +1475,7 @@ impl<'a> Dump<'a> {
},
};
- self.index_scheduler.all_tasks.put(&mut self.wtxn, &BEU32::new(task.uid), &task)?;
+ self.index_scheduler.all_tasks.put(&mut self.wtxn, &task.uid, &task)?;
for index in task.indexes() {
match self.indexes.get_mut(index) {
@@ -1510,8 +1517,8 @@ impl<'a> Dump<'a> {
}
}
- self.statuses.entry(task.status).or_insert(RoaringBitmap::new()).insert(task.uid);
- self.kinds.entry(task.kind.as_kind()).or_insert(RoaringBitmap::new()).insert(task.uid);
+ self.statuses.entry(task.status).or_default().insert(task.uid);
+ self.kinds.entry(task.kind.as_kind()).or_default().insert(task.uid);
Ok(task)
}
@@ -1639,6 +1646,7 @@ mod tests {
indexer_config,
autobatching_enabled: true,
max_number_of_tasks: 1_000_000,
+ max_number_of_batched_tasks: usize::MAX,
instance_features: Default::default(),
};
configuration(&mut options);
@@ -4339,4 +4347,26 @@ mod tests {
}
"###);
}
+
+ #[test]
+ fn cancel_processing_dump() {
+ let (index_scheduler, mut handle) = IndexScheduler::test(true, vec![]);
+
+ let dump_creation = KindWithContent::DumpCreation { keys: Vec::new(), instance_uid: None };
+ let dump_cancellation = KindWithContent::TaskCancelation {
+ query: "cancel dump".to_owned(),
+ tasks: RoaringBitmap::from_iter([0]),
+ };
+ let _ = index_scheduler.register(dump_creation).unwrap();
+ snapshot!(snapshot_index_scheduler(&index_scheduler), name: "after_dump_register");
+ handle.advance_till([Start, BatchCreated, InsideProcessBatch]);
+
+ let _ = index_scheduler.register(dump_cancellation).unwrap();
+ snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_registered");
+
+ snapshot!(format!("{:?}", handle.advance()), @"AbortedIndexation");
+
+ handle.advance_one_successful_batch();
+ snapshot!(snapshot_index_scheduler(&index_scheduler), name: "cancel_processed");
+ }
}
diff --git a/index-scheduler/src/snapshots/lib.rs/cancel_processing_dump/after_dump_register.snap b/index-scheduler/src/snapshots/lib.rs/cancel_processing_dump/after_dump_register.snap
new file mode 100644
index 000000000..ce0343975
--- /dev/null
+++ b/index-scheduler/src/snapshots/lib.rs/cancel_processing_dump/after_dump_register.snap
@@ -0,0 +1,35 @@
+---
+source: index-scheduler/src/lib.rs
+---
+### Autobatching Enabled = true
+### Processing Tasks:
+[]
+----------------------------------------------------------------------
+### All Tasks:
+0 {uid: 0, status: enqueued, details: { dump_uid: None }, kind: DumpCreation { keys: [], instance_uid: None }}
+----------------------------------------------------------------------
+### Status:
+enqueued [0,]
+----------------------------------------------------------------------
+### Kind:
+"dumpCreation" [0,]
+----------------------------------------------------------------------
+### Index Tasks:
+----------------------------------------------------------------------
+### Index Mapper:
+
+----------------------------------------------------------------------
+### Canceled By:
+
+----------------------------------------------------------------------
+### Enqueued At:
+[timestamp] [0,]
+----------------------------------------------------------------------
+### Started At:
+----------------------------------------------------------------------
+### Finished At:
+----------------------------------------------------------------------
+### File Store:
+
+----------------------------------------------------------------------
+
diff --git a/index-scheduler/src/snapshots/lib.rs/cancel_processing_dump/cancel_processed.snap b/index-scheduler/src/snapshots/lib.rs/cancel_processing_dump/cancel_processed.snap
new file mode 100644
index 000000000..f3d7b363f
--- /dev/null
+++ b/index-scheduler/src/snapshots/lib.rs/cancel_processing_dump/cancel_processed.snap
@@ -0,0 +1,45 @@
+---
+source: index-scheduler/src/lib.rs
+---
+### Autobatching Enabled = true
+### Processing Tasks:
+[]
+----------------------------------------------------------------------
+### All Tasks:
+0 {uid: 0, status: canceled, canceled_by: 1, details: { dump_uid: None }, kind: DumpCreation { keys: [], instance_uid: None }}
+1 {uid: 1, status: succeeded, details: { matched_tasks: 1, canceled_tasks: Some(0), original_filter: "cancel dump" }, kind: TaskCancelation { query: "cancel dump", tasks: RoaringBitmap<[0]> }}
+----------------------------------------------------------------------
+### Status:
+enqueued []
+succeeded [1,]
+canceled [0,]
+----------------------------------------------------------------------
+### Kind:
+"taskCancelation" [1,]
+"dumpCreation" [0,]
+----------------------------------------------------------------------
+### Index Tasks:
+----------------------------------------------------------------------
+### Index Mapper:
+
+----------------------------------------------------------------------
+### Canceled By:
+1 [0,]
+
+----------------------------------------------------------------------
+### Enqueued At:
+[timestamp] [0,]
+[timestamp] [1,]
+----------------------------------------------------------------------
+### Started At:
+[timestamp] [0,]
+[timestamp] [1,]
+----------------------------------------------------------------------
+### Finished At:
+[timestamp] [0,]
+[timestamp] [1,]
+----------------------------------------------------------------------
+### File Store:
+
+----------------------------------------------------------------------
+
diff --git a/index-scheduler/src/snapshots/lib.rs/cancel_processing_dump/cancel_registered.snap b/index-scheduler/src/snapshots/lib.rs/cancel_processing_dump/cancel_registered.snap
new file mode 100644
index 000000000..72ae58e00
--- /dev/null
+++ b/index-scheduler/src/snapshots/lib.rs/cancel_processing_dump/cancel_registered.snap
@@ -0,0 +1,38 @@
+---
+source: index-scheduler/src/lib.rs
+---
+### Autobatching Enabled = true
+### Processing Tasks:
+[0,]
+----------------------------------------------------------------------
+### All Tasks:
+0 {uid: 0, status: enqueued, details: { dump_uid: None }, kind: DumpCreation { keys: [], instance_uid: None }}
+1 {uid: 1, status: enqueued, details: { matched_tasks: 1, canceled_tasks: None, original_filter: "cancel dump" }, kind: TaskCancelation { query: "cancel dump", tasks: RoaringBitmap<[0]> }}
+----------------------------------------------------------------------
+### Status:
+enqueued [0,1,]
+----------------------------------------------------------------------
+### Kind:
+"taskCancelation" [1,]
+"dumpCreation" [0,]
+----------------------------------------------------------------------
+### Index Tasks:
+----------------------------------------------------------------------
+### Index Mapper:
+
+----------------------------------------------------------------------
+### Canceled By:
+
+----------------------------------------------------------------------
+### Enqueued At:
+[timestamp] [0,]
+[timestamp] [1,]
+----------------------------------------------------------------------
+### Started At:
+----------------------------------------------------------------------
+### Finished At:
+----------------------------------------------------------------------
+### File Store:
+
+----------------------------------------------------------------------
+
diff --git a/index-scheduler/src/utils.rs b/index-scheduler/src/utils.rs
index 3971d9116..9f6f90db2 100644
--- a/index-scheduler/src/utils.rs
+++ b/index-scheduler/src/utils.rs
@@ -3,9 +3,9 @@
use std::collections::{BTreeSet, HashSet};
use std::ops::Bound;
-use meilisearch_types::heed::types::{DecodeIgnore, OwnedType};
+use meilisearch_types::heed::types::DecodeIgnore;
use meilisearch_types::heed::{Database, RoTxn, RwTxn};
-use meilisearch_types::milli::{CboRoaringBitmapCodec, BEU32};
+use meilisearch_types::milli::CboRoaringBitmapCodec;
use meilisearch_types::tasks::{Details, IndexSwap, Kind, KindWithContent, Status};
use roaring::{MultiOps, RoaringBitmap};
use time::OffsetDateTime;
@@ -18,7 +18,7 @@ impl IndexScheduler {
}
pub(crate) fn last_task_id(&self, rtxn: &RoTxn) -> Result