2005: auto batching r=MarinPostma a=MarinPostma

This pr implements auto batching. The basic functioning of this is that all updates that can be batched together are batched together while the previous batch is being processed.

For now, the only updates that can be batched together are the document addition updates (both update and replace), for a single index.

The batching is disabled by default for multiple reasons:
- We need more experimentation with the scheduling techniques
- Right now, if one task fails in a batch, the whole batch fails. We need more permissive error handling when processing document indexation.

There are four CLI options, for now, to interact with how the batch is scheduled:
- `enable-autobatching`: enable the autobatching feature.
- `debounce-duration-sec`: When an update is received, wait that number of seconds before batching and performing the updates. Defaults to 0s.
- `max-batch-size`: the maximum number of tasks per batch, defaults to unlimited.
- `max-documents-per-batch`: the maximum number of documents in a batch, defaults to unlimited. The batch will always contain a least 1 task, no matter the number of documents in that task.

# Implementation

The current implementation is made of 3 major components:

## TaskStore
The `TaskStore` contains all the tasks. When a task is pushed, it is directly registered to the task store.

## Scheduler
The scheduler is in charge of making the batches. At its core, there is a `TaskQueue` and a job queue. `Job`s are always processed first. They are *volatile* tasks, that is, they don't have a TaskId and are not persisted to disk. Snapshots and dumps are examples of Jobs.

If no `Job` is available for processing, then the scheduler attempts to make a `Task` batch from the `TaskQueue`. The first step is to gather new tasks from the `TaskStore` to populate the `TaskQueue`. When this is done, we can prepare our batch. The `TaskQueue` is itself a `BinaryHeap` of `Tasklist`. Each `index_uid` is associated with a `TaskList` that contains all the updates associated with that index uid. Each `TaskList` in the `TaskQueue` is ordered by the id of its first task.

When preparing a batch, the `TaskList` at the top of the `TaskQueue` is popped, and the tasks are popped from the list to make the next batch. If there are remaining tasks in the list, the list is inserted back in the `TaskQueue`.

## UpdateLoop
The `UpdateLoop` role is to perform batch sequentially. Each time updates are pushed to the update store, the scheduler is notified, and will in turn notify the update loop that work can be performed. When notified, the update loop waits some time to wait for more incoming update and then asks the scheduler for the next batch to perform and perform it. When it is done, the status of the task is put back into the store, and the next batch is processed.


Co-authored-by: mpostma <postma.marin@protonmail.com>
This commit is contained in:
bors[bot] 2022-02-02 11:04:30 +00:00 committed by GitHub
commit 9448ca58aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1181 additions and 777 deletions

141
Cargo.lock generated
View File

@ -79,7 +79,7 @@ version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "465a6172cf69b960917811022d8f29bc0b7fa1398bc4f78b3c466673db1213b6"
dependencies = [
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
]
@ -202,7 +202,7 @@ dependencies = [
"serde_urlencoded",
"smallvec",
"socket2",
"time 0.3.6",
"time 0.3.7",
"url",
]
@ -214,7 +214,7 @@ checksum = "98a793e4a7bd059e06e1bc1bd9943b57a47f806de3599d2437441682292c333e"
dependencies = [
"actix-router",
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
]
@ -287,9 +287,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.52"
version = "1.0.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84450d0b4a8bd1ba4144ce8ce718fbc5d071358b1e5384bace6536b3d1f2d5b3"
checksum = "94a45b455c14666b85fc40a019e8ab9eb75e3a124e05494f5397122bc9eb06e0"
dependencies = [
"backtrace",
]
@ -339,7 +339,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308"
dependencies = [
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
]
@ -350,10 +350,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "061a7acccaa286c011ddc30970520b98fa40e00c9d644633fb26b5fc63a265e3"
dependencies = [
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
]
[[package]]
name = "atomic_refcell"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73b5e5f48b927f04e952dedc932f31995a65a0bf65ec971c74436e51bf6e970d"
[[package]]
name = "atty"
version = "0.2.14"
@ -400,9 +406,9 @@ checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd"
[[package]]
name = "bimap"
version = "0.6.1"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50ae17cabbc8a38a1e3e4c1a6a664e9a09672dc14d0896fa8d865d3a5a446b07"
checksum = "bc0455254eb5c6964c4545d8bac815e1a1be4f3afe0ae695ea539c12d728d44b"
dependencies = [
"serde",
]
@ -526,7 +532,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e215f8c2f9f79cb53c8335e687ffd07d5bfcb6fe5fc80723762d0be46e7cc54"
dependencies = [
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
]
@ -648,9 +654,9 @@ dependencies = [
[[package]]
name = "clap"
version = "3.0.10"
version = "3.0.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a30c3bf9ff12dfe5dae53f0a96e0febcd18420d1c0e7fad77796d9d5c4b5375"
checksum = "08799f92c961c7a1cf0cc398a9073da99e21ce388b46372c37f3191f2f3eed3e"
dependencies = [
"atty",
"bitflags",
@ -665,14 +671,14 @@ dependencies = [
[[package]]
name = "clap_derive"
version = "3.0.6"
version = "3.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "517358c28fcef6607bf6f76108e02afad7e82297d132a6b846dcc1fc3efcd153"
checksum = "0fd2078197a22f338bd4fbf7d6387eb6f0d6a3c69e6cbc09f5c93e97321fd92a"
dependencies = [
"heck",
"proc-macro-error",
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
]
@ -683,7 +689,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1df715824eb382e34b7afb7463b0247bf41538aeba731fba05241ecdb5dc3747"
dependencies = [
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
]
@ -700,7 +706,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94d4706de1b0fa5b132270cddffa8585166037822e260a944fe161acd137ca05"
dependencies = [
"percent-encoding",
"time 0.3.6",
"time 0.3.7",
"version_check",
]
@ -835,7 +841,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b"
dependencies = [
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
]
@ -847,7 +853,7 @@ checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321"
dependencies = [
"convert_case",
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"rustc_version",
"syn 1.0.86",
]
@ -942,7 +948,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c134c37760b27a871ba422106eedbb8247da973a09e82558bf26d619c882b159"
dependencies = [
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
]
@ -961,9 +967,9 @@ dependencies = [
[[package]]
name = "fastrand"
version = "1.6.0"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "779d043b6a0b90cc4c0ed7ee380a6504394cee7efd7db050e3774eee387324b2"
checksum = "c3fcf0cee53519c866c09b5de1f6c56ff9d647101f81c1964fa632e148896cdf"
dependencies = [
"instant",
]
@ -983,7 +989,7 @@ dependencies = [
[[package]]
name = "filter-parser"
version = "0.1.0"
source = "git+https://github.com/meilisearch/milli.git?tag=v0.21.1#7f50ca9a20090fc4fe2abae0394c1e6fdd351ebd"
source = "git+https://github.com/meilisearch/milli.git?tag=v0.22.0#9f2ff71581ec1e0a54a3c9be030537705c27ec2d"
dependencies = [
"nom",
"nom_locate",
@ -1105,7 +1111,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6dbd947adfffb0efc70599b3ddcf7b5597bb5fa9e245eb99f62b3a5f7bb8bd3c"
dependencies = [
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
]
@ -1201,7 +1207,7 @@ checksum = "e45727250e75cc04ff2846a66397da8ef2b3db8e40e0cef4df67950a07621eb9"
dependencies = [
"proc-macro-error",
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
]
@ -1243,9 +1249,9 @@ dependencies = [
[[package]]
name = "h2"
version = "0.3.10"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c9de88456263e249e241fcd211d3954e2c9b0ef7ccfc235a444eb367cae3689"
checksum = "d9f1f717ddc7b2ba36df7e871fd88db79326551d3d6f1fc406fbfd28b582ff8e"
dependencies = [
"bytes",
"fnv",
@ -1564,9 +1570,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.113"
version = "0.2.114"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eef78b64d87775463c549fbd80e19249ef436ea3bf1de2a1eb7e717ec7fab1e9"
checksum = "b0005d08a8f7b65fb8073cb697aa0b12b631ed251ce73d862ce50eeb52ce3b50"
[[package]]
name = "libgit2-sys"
@ -1662,7 +1668,7 @@ checksum = "10a9062912d7952c5588cc474795e0b9ee008e7e6781127945b85413d4b99d81"
dependencies = [
"log",
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
]
@ -1787,6 +1793,7 @@ dependencies = [
"anyhow",
"async-stream",
"async-trait",
"atomic_refcell",
"byte-unit",
"bytes",
"chrono",
@ -1841,8 +1848,8 @@ dependencies = [
[[package]]
name = "meilisearch-tokenizer"
version = "0.2.6"
source = "git+https://github.com/meilisearch/tokenizer.git?tag=v0.2.6#a69bb0cf442ea6357464d71bdf5d28273e5153ba"
version = "0.2.7"
source = "git+https://github.com/meilisearch/tokenizer.git?tag=v0.2.7#e14f64f2482d8f57e9aae8dc37dcb1469099f6f3"
dependencies = [
"character_converter",
"cow-utils",
@ -1881,8 +1888,8 @@ dependencies = [
[[package]]
name = "milli"
version = "0.21.1"
source = "git+https://github.com/meilisearch/milli.git?tag=v0.21.1#7f50ca9a20090fc4fe2abae0394c1e6fdd351ebd"
version = "0.22.0"
source = "git+https://github.com/meilisearch/milli.git?tag=v0.22.0#9f2ff71581ec1e0a54a3c9be030537705c27ec2d"
dependencies = [
"bimap",
"bincode",
@ -2013,14 +2020,14 @@ checksum = "e7e25b214433f669161f414959594216d8e6ba83b6679d3db96899c0b4639033"
dependencies = [
"cfg-if 1.0.0",
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
]
[[package]]
name = "nelson"
version = "0.1.0"
source = "git+https://github.com/MarinPostma/nelson.git?rev=e5f4ff046c21e7e986c7cb31550d1c9e7f0b693b#e5f4ff046c21e7e986c7cb31550d1c9e7f0b693b"
source = "git+https://github.com/MarinPostma/nelson.git?rev=675f13885548fb415ead8fbb447e9e6d9314000a#675f13885548fb415ead8fbb447e9e6d9314000a"
[[package]]
name = "nom"
@ -2101,9 +2108,9 @@ dependencies = [
[[package]]
name = "num_threads"
version = "0.1.2"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "71a1eb3a36534514077c1e079ada2fb170ef30c47d203aa6916138cf882ecd52"
checksum = "97ba99ba6393e2c3734791401b66902d981cb03bf190af674ca69949b6d5fb15"
dependencies = [
"libc",
]
@ -2286,7 +2293,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "744b6f092ba29c3650faf274db506afd39944f48420f6c86b17cfe0ee1cb36bb"
dependencies = [
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
]
@ -2360,7 +2367,7 @@ checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
"version_check",
]
@ -2372,7 +2379,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"version_check",
]
@ -2448,9 +2455,9 @@ dependencies = [
[[package]]
name = "quote"
version = "1.0.14"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47aa80447ce4daf1717500037052af176af5d38cc3e571d9ec1c7353fc10c87d"
checksum = "864d3e96a899863136fc6e99f3d7cae289dafe43bf2c5ac19b70df7210c0a145"
dependencies = [
"proc-macro2 1.0.36",
]
@ -2793,29 +2800,29 @@ checksum = "568a8e6258aa33c13358f81fd834adb854c6f7c9468520910a9b1e8fac068012"
[[package]]
name = "serde"
version = "1.0.134"
version = "1.0.136"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96b3c34c1690edf8174f5b289a336ab03f568a4460d8c6df75f2f3a692b3bc6a"
checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.134"
version = "1.0.136"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "784ed1fbfa13fe191077537b0d70ec8ad1e903cfe04831da608aa36457cb653d"
checksum = "08597e7152fcd306f41838ed3e37be9eaeed2b61c42e2117266a554fab4662f9"
dependencies = [
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
]
[[package]]
name = "serde_json"
version = "1.0.75"
version = "1.0.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c059c05b48c5c0067d4b4b2b4f0732dd65feb52daf7e0ea09cd87e7dadc1af79"
checksum = "d23c1ba4cf0efd44be32017709280b32d1cea5c3f1275c3b6d9e8bc54f758085"
dependencies = [
"indexmap",
"itoa 1.0.1",
@ -2938,9 +2945,9 @@ checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
[[package]]
name = "socket2"
version = "0.4.3"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f82496b90c36d70af5fcd482edaa2e0bd16fade569de1330405fecbbdac736b"
checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0"
dependencies = [
"libc",
"winapi",
@ -2993,7 +3000,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a65b3f4ffa0092e9887669db0eae07941f023991ab58ea44da8fe8e2d511c6b"
dependencies = [
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"unicode-xid 0.2.2",
]
@ -3013,7 +3020,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f"
dependencies = [
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
"unicode-xid 0.2.2",
]
@ -3095,7 +3102,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa32fd3f627f367fe16f893e2597ae3c05020f8bba2666a4e6ea73d377e5714b"
dependencies = [
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
]
@ -3133,9 +3140,9 @@ dependencies = [
[[package]]
name = "time"
version = "0.3.6"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8d54b9298e05179c335de2b9645d061255bcd5155f843b3e328d2cfe0a5b413"
checksum = "004cbc98f30fa233c61a38bc77e96a9106e65c88f2d3bef182ae952027e5753d"
dependencies = [
"itoa 1.0.1",
"libc",
@ -3190,7 +3197,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7"
dependencies = [
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
]
@ -3443,7 +3450,7 @@ dependencies = [
"lazy_static",
"log",
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
"wasm-bindgen-shared",
]
@ -3466,7 +3473,7 @@ version = "0.2.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f4203d69e40a52ee523b2529a773d5ffc1dc0071801c87b3d270b471b80ed01"
dependencies = [
"quote 1.0.14",
"quote 1.0.15",
"wasm-bindgen-macro-support",
]
@ -3477,7 +3484,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa8a30d46208db204854cadbb5d4baf5fcf8071ba5bf48190c3e59937962ebc"
dependencies = [
"proc-macro2 1.0.36",
"quote 1.0.14",
"quote 1.0.15",
"syn 1.0.86",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
@ -3633,18 +3640,18 @@ dependencies = [
[[package]]
name = "zstd"
version = "0.9.3+zstd.1.5.2"
version = "0.9.2+zstd.1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "377f9c6801b6cbb254c3055266be42698b5bc6563c56b37e5fcca637a68eba95"
checksum = "2390ea1bf6c038c39674f22d95f0564725fc06034a47129179810b2fc58caa54"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "4.1.4+zstd.1.5.2"
version = "4.1.3+zstd.1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f7cd17c9af1a4d6c24beb1cc54b17e2ef7b593dc92f19e9d9acad8b182bbaee"
checksum = "e99d81b99fb3c2c2c794e3fe56c305c63d5173a16a46b5850b07c935ffc7db79"
dependencies = [
"libc",
"zstd-sys",
@ -3652,9 +3659,9 @@ dependencies = [
[[package]]
name = "zstd-sys"
version = "1.6.3+zstd.1.5.2"
version = "1.6.2+zstd.1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc49afa5c8d634e75761feda8c592051e7eeb4683ba827211eb0d731d3402ea8"
checksum = "2daf2f248d9ea44454bfcb2516534e8b8ad2fc91bf818a1885495fc42bc8ac9f"
dependencies = [
"cc",
"libc",

View File

@ -9,7 +9,7 @@ pub mod helpers;
pub mod option;
pub mod routes;
use std::sync::Arc;
use std::sync::{atomic::AtomicBool, Arc};
use std::time::Duration;
use crate::error::MeilisearchHttpError;
@ -25,8 +25,17 @@ use extractors::payload::PayloadConfig;
use meilisearch_auth::AuthController;
use meilisearch_lib::MeiliSearch;
pub static AUTOBATCHING_ENABLED: AtomicBool = AtomicBool::new(false);
pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<MeiliSearch> {
let mut meilisearch = MeiliSearch::builder();
// enable autobatching?
let _ = AUTOBATCHING_ENABLED.store(
opt.scheduler_options.enable_autobatching,
std::sync::atomic::Ordering::Relaxed,
);
meilisearch
.set_max_index_size(opt.max_index_size.get_bytes() as usize)
.set_max_task_store_size(opt.max_task_db_size.get_bytes() as usize)
@ -52,7 +61,11 @@ pub fn setup_meilisearch(opt: &Opt) -> anyhow::Result<MeiliSearch> {
meilisearch.set_schedule_snapshot();
}
meilisearch.build(opt.db_path.clone(), opt.indexer_options.clone())
meilisearch.build(
opt.db_path.clone(),
opt.indexer_options.clone(),
opt.scheduler_options.clone(),
)
}
pub fn configure_data(

View File

@ -5,7 +5,7 @@ use std::sync::Arc;
use byte_unit::Byte;
use clap::Parser;
use meilisearch_lib::options::IndexerOpts;
use meilisearch_lib::options::{IndexerOpts, SchedulerConfig};
use rustls::{
server::{
AllowAnyAnonymousOrAuthenticatedClient, AllowAnyAuthenticatedClient,
@ -147,9 +147,18 @@ pub struct Opt {
#[serde(skip)]
#[clap(skip)]
pub indexer_options: IndexerOpts,
#[clap(flatten)]
pub scheduler_options: SchedulerConfig,
}
impl Opt {
/// Wether analytics should be enabled or not.
#[cfg(all(not(debug_assertions), feature = "analytics"))]
pub fn analytics(&self) -> bool {
!self.no_analytics
}
pub fn get_ssl_config(&self) -> anyhow::Result<Option<rustls::ServerConfig>> {
if let (Some(cert_path), Some(key_path)) = (&self.ssl_cert_path, &self.ssl_key_path) {
let config = rustls::ServerConfig::builder().with_safe_defaults();

View File

@ -2,11 +2,14 @@ use chrono::{DateTime, Duration, Utc};
use meilisearch_error::ResponseError;
use meilisearch_lib::index::{Settings, Unchecked};
use meilisearch_lib::milli::update::IndexDocumentsMethod;
use meilisearch_lib::tasks::batch::BatchId;
use meilisearch_lib::tasks::task::{
DocumentDeletion, Task, TaskContent, TaskEvent, TaskId, TaskResult,
};
use serde::{Serialize, Serializer};
use crate::AUTOBATCHING_ENABLED;
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
enum TaskType {
@ -106,6 +109,8 @@ pub struct TaskView {
enqueued_at: DateTime<Utc>,
started_at: Option<DateTime<Utc>>,
finished_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
batch_uid: Option<Option<BatchId>>,
}
impl From<Task> for TaskView {
@ -252,6 +257,16 @@ impl From<Task> for TaskView {
let duration = finished_at.zip(started_at).map(|(tf, ts)| (tf - ts));
let batch_uid = if AUTOBATCHING_ENABLED.load(std::sync::atomic::Ordering::Relaxed) {
let id = events.iter().find_map(|e| match e {
TaskEvent::Batched { batch_id, .. } => Some(*batch_id),
_ => None,
});
Some(id)
} else {
None
};
Self {
uid: id,
index_uid: index_uid.into_inner(),
@ -263,6 +278,7 @@ impl From<Task> for TaskView {
enqueued_at,
started_at,
finished_at,
batch_uid,
}
}
}

View File

@ -156,5 +156,6 @@ pub fn default_settings(dir: impl AsRef<Path>) -> Opt {
..Default::default()
},
log_level: "off".into(),
scheduler_options: meilisearch_lib::options::SchedulerConfig::default(),
}
}

View File

@ -28,7 +28,7 @@ lazy_static = "1.4.0"
log = "0.4.14"
meilisearch-error = { path = "../meilisearch-error" }
meilisearch-auth = { path = "../meilisearch-auth" }
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.21.1" }
milli = { git = "https://github.com/meilisearch/milli.git", tag = "v0.22.0" }
mime = "0.3.16"
num_cpus = "1.13.0"
once_cell = "1.8.0"
@ -55,12 +55,13 @@ reqwest = { version = "0.11.4", features = ["json", "rustls-tls"], default-featu
sysinfo = "0.20.2"
derivative = "2.2.0"
fs_extra = "1.2.0"
atomic_refcell = "0.1.8"
[dev-dependencies]
actix-rt = "2.2.0"
mockall = "0.10.2"
paste = "1.0.5"
nelson = { git = "https://github.com/MarinPostma/nelson.git", rev = "e5f4ff046c21e7e986c7cb31550d1c9e7f0b693b"}
nelson = { git = "https://github.com/MarinPostma/nelson.git", rev = "675f13885548fb415ead8fbb447e9e6d9314000a"}
meilisearch-error = { path = "../meilisearch-error", features = ["test-traits"] }
proptest = "1.0.0"
proptest-derive = "0.3.0"

View File

@ -17,4 +17,3 @@ cc 3a01c78db082434b8a4f8914abf0d1059d39f4426d16df20d72e1bd7ebb94a6a # shrinks to
cc c450806df3921d1e6fe9b6af93d999e8196d0175b69b64f1810802582421e94a # shrinks to task = Task { id: 0, index_uid: IndexUid("a"), content: CreateIndex { primary_key: Some("") }, events: [] }, index_exists = false, index_op_fails = false, any_int = 0
cc fb6b98947cbdbdee05ed3c0bf2923aad2c311edc276253642eb43a0c0ec4888a # shrinks to task = Task { id: 0, index_uid: IndexUid("A"), content: CreateIndex { primary_key: Some("") }, events: [] }, index_exists = false, index_op_fails = true, any_int = 0
cc 1aa59d8e22484e9915efbb5818e1e1ab684aa61b166dc82130d6221663ba00bf # shrinks to task = Task { id: 0, index_uid: IndexUid("a"), content: DocumentDeletion(Clear), events: [] }, index_exists = true, index_op_fails = false, any_int = 0
cc 2e8644e6397b5f76e0b79f961fa125e2f45f42f26e03c453c9a174dfb427500d # shrinks to task = Task { id: 0, index_uid: IndexUid("0"), content: SettingsUpdate { settings: Settings { displayed_attributes: NotSet, searchable_attributes: NotSet, filterable_attributes: NotSet, sortable_attributes: NotSet, ranking_rules: NotSet, stop_words: NotSet, synonyms: NotSet, distinct_attribute: NotSet, _kind: PhantomData }, is_deletion: false, allow_index_creation: false }, events: [] }, index_exists = false, index_op_fails = false, any_int = 0

View File

@ -6,10 +6,10 @@ use anyhow::Context;
use heed::{EnvOpenOptions, RoTxn};
use indexmap::IndexMap;
use milli::documents::DocumentBatchReader;
use milli::update::{IndexDocumentsConfig, IndexerConfig};
use serde::{Deserialize, Serialize};
use crate::document_formats::read_ndjson;
use crate::index::update_handler::UpdateHandler;
use crate::index::updates::apply_settings_to_builder;
use super::error::Result;
@ -85,7 +85,7 @@ impl Index {
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
size: usize,
update_handler: &UpdateHandler,
indexer_config: &IndexerConfig,
) -> anyhow::Result<()> {
let dir_name = src
.as_ref()
@ -110,8 +110,7 @@ impl Index {
let mut txn = index.write_txn()?;
// Apply settings first
let builder = update_handler.update_builder();
let mut builder = builder.settings(&mut txn, &index);
let mut builder = milli::update::Settings::new(&mut txn, &index, indexer_config);
if let Some(primary_key) = primary_key {
builder.set_primary_key(primary_key);
@ -140,12 +139,16 @@ impl Index {
//If the document file is empty, we don't perform the document addition, to prevent
//a primary key error to be thrown.
if !documents_reader.is_empty() {
let builder = update_handler
.update_builder()
.index_documents(&mut txn, &index);
builder.execute(documents_reader, |_| ())?;
}
let config = IndexDocumentsConfig::default();
let mut builder = milli::update::IndexDocuments::new(
&mut txn,
&index,
indexer_config,
config,
|_| (),
);
builder.add_documents(documents_reader)?;
builder.execute()?;
}
txn.commit()?;

View File

@ -3,7 +3,7 @@ use std::error::Error;
use meilisearch_error::{internal_error, Code, ErrorCode};
use serde_json::Value;
use crate::error::MilliError;
use crate::{error::MilliError, update_file_store};
pub type Result<T> = std::result::Result<T, IndexError>;
@ -23,7 +23,9 @@ internal_error!(
IndexError: std::io::Error,
heed::Error,
fst::Error,
serde_json::Error
serde_json::Error,
update_file_store::UpdateFileStoreError,
milli::documents::Error
);
impl ErrorCode for IndexError {

View File

@ -7,7 +7,7 @@ use std::sync::Arc;
use chrono::{DateTime, Utc};
use heed::{EnvOpenOptions, RoTxn};
use milli::update::Setting;
use milli::update::{IndexerConfig, Setting};
use milli::{obkv_to_json, FieldDistribution, FieldId};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
@ -17,7 +17,6 @@ use crate::EnvSizer;
use super::error::IndexError;
use super::error::Result;
use super::update_handler::UpdateHandler;
use super::{Checked, Settings};
pub type Document = Map<String, Value>;
@ -68,7 +67,7 @@ pub struct Index {
#[derivative(Debug = "ignore")]
pub inner: Arc<milli::Index>,
#[derivative(Debug = "ignore")]
pub update_handler: Arc<UpdateHandler>,
pub indexer_config: Arc<IndexerConfig>,
}
impl Deref for Index {
@ -84,7 +83,7 @@ impl Index {
path: impl AsRef<Path>,
size: usize,
uuid: Uuid,
update_handler: Arc<UpdateHandler>,
update_handler: Arc<IndexerConfig>,
) -> Result<Self> {
log::debug!("opening index in {}", path.as_ref().display());
create_dir_all(&path)?;
@ -94,7 +93,7 @@ impl Index {
Ok(Index {
inner,
uuid,
update_handler,
indexer_config: update_handler,
})
}

View File

@ -4,7 +4,6 @@ pub use updates::{apply_settings_to_builder, Checked, Facets, Settings, Unchecke
mod dump;
pub mod error;
mod search;
pub mod update_handler;
pub mod updates;
#[allow(clippy::module_inception)]
@ -26,6 +25,7 @@ pub mod test {
use std::path::PathBuf;
use std::sync::Arc;
use milli::update::IndexerConfig;
use milli::update::{DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod};
use nelson::Mocker;
use serde_json::{Map, Value};
@ -33,7 +33,6 @@ pub mod test {
use super::error::Result;
use super::index::Index;
use super::update_handler::UpdateHandler;
use super::{Checked, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings};
use crate::update_file_store::UpdateFileStore;
@ -52,7 +51,7 @@ pub mod test {
path: impl AsRef<Path>,
size: usize,
uuid: Uuid,
update_handler: Arc<UpdateHandler>,
update_handler: Arc<IndexerConfig>,
) -> Result<Self> {
let index = Index::open(path, size, uuid, update_handler)?;
Ok(Self::Real(index))
@ -62,7 +61,7 @@ pub mod test {
src: impl AsRef<Path>,
dst: impl AsRef<Path>,
size: usize,
update_handler: &UpdateHandler,
update_handler: &IndexerConfig,
) -> anyhow::Result<()> {
Index::load_dump(src, dst, size, update_handler)
}
@ -157,21 +156,18 @@ pub mod test {
pub fn update_documents(
&self,
method: IndexDocumentsMethod,
content_uuid: Uuid,
primary_key: Option<String>,
file_store: UpdateFileStore,
contents: impl Iterator<Item = Uuid>,
) -> Result<DocumentAdditionResult> {
match self {
MockIndex::Real(index) => {
index.update_documents(method, content_uuid, primary_key, file_store)
index.update_documents(method, primary_key, file_store, contents)
}
MockIndex::Mock(mocker) => unsafe {
mocker.get("update_documents").call((
method,
content_uuid,
primary_key,
file_store,
))
mocker
.get("update_documents")
.call((method, primary_key, file_store, contents))
},
}
}

View File

@ -295,7 +295,7 @@ fn compute_value_matches<'a, A: AsRef<[u8]>>(
let mut start = 0;
for (word, token) in analyzed.reconstruct() {
if token.is_word() {
if let Some(length) = matcher.matches(token.text()) {
if let Some(length) = matcher.matches(&token) {
infos.push(MatchInfo { start, length });
}
}
@ -486,18 +486,18 @@ fn format_fields<A: AsRef<[u8]>>(
/// trait to allow unit testing of `format_fields`
trait Matcher {
fn matches(&self, w: &str) -> Option<usize>;
fn matches(&self, w: &Token) -> Option<usize>;
}
#[cfg(test)]
impl Matcher for BTreeMap<&str, Option<usize>> {
fn matches(&self, w: &str) -> Option<usize> {
self.get(w).cloned().flatten()
fn matches(&self, w: &Token) -> Option<usize> {
self.get(w.text()).cloned().flatten()
}
}
impl Matcher for MatchingWords {
fn matches(&self, w: &str) -> Option<usize> {
fn matches(&self, w: &Token) -> Option<usize> {
self.matching_bytes(w)
}
}
@ -579,7 +579,7 @@ impl<'a, A: AsRef<[u8]>> Formatter<'a, A> {
let mut tokens = analyzed.reconstruct().peekable();
while let Some((word, token)) =
tokens.next_if(|(_, token)| matcher.matches(token.text()).is_none())
tokens.next_if(|(_, token)| matcher.matches(token).is_none())
{
buffer.push((word, token));
}
@ -623,7 +623,7 @@ impl<'a, A: AsRef<[u8]>> Formatter<'a, A> {
// Check if we need to do highlighting or computed matches before calling
// Matcher::match since the call is expensive.
if format_options.highlight && token.is_word() {
if let Some(length) = matcher.matches(token.text()) {
if let Some(length) = matcher.matches(&token) {
match word.get(..length).zip(word.get(length..)) {
Some((head, tail)) => {
out.push_str(&self.marks.0);
@ -653,7 +653,7 @@ fn parse_filter(facets: &Value) -> Result<Option<Filter>> {
match facets {
Value::String(expr) => {
let condition = Filter::from_str(expr)?;
Ok(Some(condition))
Ok(condition)
}
Value::Array(arr) => parse_filter_array(arr),
v => Err(FacetError::InvalidExpression(&["Array"], v.clone()).into()),

View File

@ -1,49 +0,0 @@
use milli::update::UpdateBuilder;
use milli::CompressionType;
use rayon::ThreadPool;
use crate::options::IndexerOpts;
pub struct UpdateHandler {
max_nb_chunks: Option<usize>,
chunk_compression_level: Option<u32>,
thread_pool: ThreadPool,
log_frequency: usize,
max_memory: Option<usize>,
chunk_compression_type: CompressionType,
}
impl UpdateHandler {
pub fn new(opt: &IndexerOpts) -> anyhow::Result<Self> {
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(opt.indexing_jobs.unwrap_or(num_cpus::get() / 2))
.build()?;
Ok(Self {
max_nb_chunks: opt.max_nb_chunks,
chunk_compression_level: opt.chunk_compression_level,
thread_pool,
log_frequency: opt.log_every_n,
max_memory: opt.max_memory.map(|m| m.get_bytes() as usize),
chunk_compression_type: opt.chunk_compression_type,
})
}
pub fn update_builder(&self) -> UpdateBuilder {
// We prepare the update by using the update builder.
let mut update_builder = UpdateBuilder::new();
if let Some(max_nb_chunks) = self.max_nb_chunks {
update_builder.max_nb_chunks(max_nb_chunks);
}
if let Some(chunk_compression_level) = self.chunk_compression_level {
update_builder.chunk_compression_level(chunk_compression_level);
}
update_builder.thread_pool(&self.thread_pool);
update_builder.log_every_n(self.log_frequency);
if let Some(max_memory) = self.max_memory {
update_builder.max_memory(max_memory);
}
update_builder.chunk_compression_type(self.chunk_compression_type);
update_builder
}
}

View File

@ -5,7 +5,8 @@ use std::num::NonZeroUsize;
use log::{debug, info, trace};
use milli::documents::DocumentBatchReader;
use milli::update::{
DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsMethod, Setting,
DocumentAdditionResult, DocumentDeletionResult, IndexDocumentsConfig, IndexDocumentsMethod,
Setting,
};
use serde::{Deserialize, Serialize, Serializer};
use uuid::Uuid;
@ -178,7 +179,7 @@ impl Index {
txn: &mut heed::RwTxn<'a, 'b>,
primary_key: String,
) -> Result<IndexMeta> {
let mut builder = self.update_handler.update_builder().settings(txn, self);
let mut builder = milli::update::Settings::new(txn, self, self.indexer_config.as_ref());
builder.set_primary_key(primary_key);
builder.execute(|_| ())?;
let meta = IndexMeta::new_txn(self, txn)?;
@ -197,10 +198,7 @@ impl Index {
/// Deletes `ids` from the index, and returns how many documents were deleted.
pub fn delete_documents(&self, ids: &[String]) -> Result<DocumentDeletionResult> {
let mut txn = self.write_txn()?;
let mut builder = self
.update_handler
.update_builder()
.delete_documents(&mut txn, self)?;
let mut builder = milli::update::DeleteDocuments::new(&mut txn, self)?;
// We ignore unexisting document ids
ids.iter().for_each(|id| {
@ -216,11 +214,7 @@ impl Index {
pub fn clear_documents(&self) -> Result<()> {
let mut txn = self.write_txn()?;
self.update_handler
.update_builder()
.clear_documents(&mut txn, self)
.execute()?;
milli::update::ClearDocuments::new(&mut txn, self).execute()?;
txn.commit()?;
Ok(())
@ -229,9 +223,9 @@ impl Index {
pub fn update_documents(
&self,
method: IndexDocumentsMethod,
content_uuid: Uuid,
primary_key: Option<String>,
file_store: UpdateFileStore,
contents: impl IntoIterator<Item = Uuid>,
) -> Result<DocumentAdditionResult> {
trace!("performing document addition");
let mut txn = self.write_txn()?;
@ -242,17 +236,27 @@ impl Index {
}
}
let config = IndexDocumentsConfig {
update_method: method,
..Default::default()
};
let indexing_callback = |indexing_step| debug!("update: {:?}", indexing_step);
let mut builder = milli::update::IndexDocuments::new(
&mut txn,
self,
self.indexer_config.as_ref(),
config,
indexing_callback,
);
let content_file = file_store.get_update(content_uuid).unwrap();
let reader = DocumentBatchReader::from_reader(content_file).unwrap();
for content_uuid in contents.into_iter() {
let content_file = file_store.get_update(content_uuid)?;
let reader = DocumentBatchReader::from_reader(content_file)?;
builder.add_documents(reader)?;
}
let mut builder = self
.update_handler
.update_builder()
.index_documents(&mut txn, self);
builder.index_documents_method(method);
let addition = builder.execute(reader, indexing_callback)?;
let addition = builder.execute()?;
txn.commit()?;
@ -264,10 +268,8 @@ impl Index {
pub fn update_settings(&self, settings: &Settings<Checked>) -> Result<()> {
// We must use the write transaction of the update here.
let mut txn = self.write_txn()?;
let mut builder = self
.update_handler
.update_builder()
.settings(&mut txn, self);
let mut builder =
milli::update::Settings::new(&mut txn, self, self.indexer_config.as_ref());
apply_settings_to_builder(settings, &mut builder);

View File

@ -10,7 +10,7 @@ use tokio::sync::{mpsc, oneshot, RwLock};
use super::error::{DumpActorError, Result};
use super::{DumpInfo, DumpJob, DumpMsg, DumpStatus};
use crate::tasks::TaskStore;
use crate::tasks::Scheduler;
use crate::update_file_store::UpdateFileStore;
pub const CONCURRENT_DUMP_MSG: usize = 10;
@ -18,7 +18,7 @@ pub const CONCURRENT_DUMP_MSG: usize = 10;
pub struct DumpActor {
inbox: Option<mpsc::Receiver<DumpMsg>>,
update_file_store: UpdateFileStore,
task_store: TaskStore,
scheduler: Arc<RwLock<Scheduler>>,
dump_path: PathBuf,
analytics_path: PathBuf,
lock: Arc<Mutex<()>>,
@ -36,7 +36,7 @@ impl DumpActor {
pub fn new(
inbox: mpsc::Receiver<DumpMsg>,
update_file_store: UpdateFileStore,
task_store: TaskStore,
scheduler: Arc<RwLock<Scheduler>>,
dump_path: impl AsRef<Path>,
analytics_path: impl AsRef<Path>,
index_db_size: usize,
@ -46,7 +46,7 @@ impl DumpActor {
let lock = Arc::new(Mutex::new(()));
Self {
inbox: Some(inbox),
task_store,
scheduler,
update_file_store,
dump_path: dump_path.as_ref().into(),
analytics_path: analytics_path.as_ref().into(),
@ -118,13 +118,13 @@ impl DumpActor {
dump_path: self.dump_path.clone(),
db_path: self.analytics_path.clone(),
update_file_store: self.update_file_store.clone(),
task_store: self.task_store.clone(),
scheduler: self.scheduler.clone(),
uid: uid.clone(),
update_db_size: self.update_db_size,
index_db_size: self.index_db_size,
};
let task_result = tokio::task::spawn(task.run()).await;
let task_result = tokio::task::spawn_local(task.run()).await;
let mut dump_infos = self.dump_infos.write().await;
let dump_infos = dump_infos

View File

@ -1,5 +1,6 @@
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::bail;
use chrono::{DateTime, Utc};
@ -12,7 +13,7 @@ use meilisearch_auth::AuthController;
pub use message::DumpMsg;
use tempfile::TempDir;
use tokio::fs::create_dir_all;
use tokio::sync::oneshot;
use tokio::sync::{oneshot, RwLock};
use crate::analytics;
use crate::compression::{from_tar_gz, to_tar_gz};
@ -20,7 +21,7 @@ use crate::index_controller::dump_actor::error::DumpActorError;
use crate::index_controller::dump_actor::loaders::{v2, v3, v4};
use crate::options::IndexerOpts;
use crate::tasks::task::Job;
use crate::tasks::TaskStore;
use crate::tasks::Scheduler;
use crate::update_file_store::UpdateFileStore;
use error::Result;
@ -319,7 +320,7 @@ struct DumpJob {
dump_path: PathBuf,
db_path: PathBuf,
update_file_store: UpdateFileStore,
task_store: TaskStore,
scheduler: Arc<RwLock<Scheduler>>,
uid: String,
update_db_size: usize,
index_db_size: usize,
@ -344,21 +345,28 @@ impl DumpJob {
let (sender, receiver) = oneshot::channel();
self.task_store
.register_job(Job::Dump {
self.scheduler
.write()
.await
.schedule_job(Job::Dump {
ret: sender,
path: temp_dump_path.clone(),
})
.await;
receiver.await??;
self.task_store
.dump(&temp_dump_path, self.update_file_store.clone())
.await?;
// wait until the job has started performing before finishing the dump process
let sender = receiver.await??;
AuthController::dump(&self.db_path, &temp_dump_path)?;
//TODO(marin): this is not right, the scheduler should dump itself, not do it here...
self.scheduler
.read()
.await
.dump(&temp_dump_path, self.update_file_store.clone())
.await?;
let dump_path = tokio::task::spawn_blocking(move || -> Result<PathBuf> {
let _ = &self;
// for now we simply copy the updates/updates_files
// FIXME: We may copy more files than necessary, if new files are added while we are
// performing the dump. We need a way to filter them out.
@ -374,6 +382,9 @@ impl DumpJob {
})
.await??;
// notify the update loop that we are finished performing the dump.
let _ = sender.send(());
info!("Created dump in {:?}.", dump_path);
Ok(())
@ -382,19 +393,15 @@ impl DumpJob {
#[cfg(test)]
mod test {
use std::collections::HashSet;
use futures::future::{err, ok};
use nelson::Mocker;
use once_cell::sync::Lazy;
use uuid::Uuid;
use super::*;
use crate::index::error::Result as IndexResult;
use crate::index::Index;
use crate::index_resolver::error::IndexResolverError;
use crate::index_resolver::index_store::MockIndexStore;
use crate::index_resolver::meta_store::MockIndexMetaStore;
use crate::options::SchedulerConfig;
use crate::tasks::error::Result as TaskResult;
use crate::tasks::task::{Task, TaskId};
use crate::tasks::{MockTaskPerformer, TaskFilter, TaskStore};
use crate::update_file_store::UpdateFileStore;
fn setup() {
@ -411,86 +418,91 @@ mod test {
}
#[actix_rt::test]
#[ignore]
async fn test_dump_normal() {
setup();
let tmp = tempfile::tempdir().unwrap();
let uuids = std::iter::repeat_with(Uuid::new_v4)
.take(4)
.collect::<HashSet<_>>();
let mut uuid_store = MockIndexMetaStore::new();
uuid_store
.expect_dump()
.once()
.returning(move |_| Box::pin(ok(())));
let mut index_store = MockIndexStore::new();
index_store.expect_get().times(4).returning(move |uuid| {
let mocker = Mocker::default();
let uuids_clone = uuids.clone();
mocker.when::<(), Uuid>("uuid").once().then(move |_| {
assert!(uuids_clone.contains(&uuid));
uuid
});
mocker
.when::<&Path, IndexResult<()>>("dump")
.once()
.then(move |_| Ok(()));
Box::pin(ok(Some(Index::mock(mocker))))
});
let mocker = Mocker::default();
let update_file_store = UpdateFileStore::mock(mocker);
//let update_sender =
// create_update_handler(index_resolver.clone(), tmp.path(), 4096 * 100).unwrap();
//TODO: fix dump tests
let mut performer = MockTaskPerformer::new();
performer
.expect_process_job()
.once()
.returning(|j| match j {
Job::Dump { ret, .. } => {
let (sender, _receiver) = oneshot::channel();
ret.send(Ok(sender)).unwrap();
}
_ => unreachable!(),
});
let performer = Arc::new(performer);
let mocker = Mocker::default();
let task_store = TaskStore::mock(mocker);
mocker
.when::<(&Path, UpdateFileStore), TaskResult<()>>("dump")
.then(|_| Ok(()));
mocker
.when::<(Option<TaskId>, Option<TaskFilter>, Option<usize>), TaskResult<Vec<Task>>>(
"list_tasks",
)
.then(|_| Ok(Vec::new()));
let store = TaskStore::mock(mocker);
let config = SchedulerConfig::default();
let scheduler = Scheduler::new(store, performer, config).unwrap();
let task = DumpJob {
dump_path: tmp.path().into(),
// this should do nothing
update_file_store,
db_path: tmp.path().into(),
task_store,
uid: String::from("test"),
update_db_size: 4096 * 10,
index_db_size: 4096 * 10,
scheduler,
};
task.run().await.unwrap();
}
#[actix_rt::test]
#[ignore]
async fn error_performing_dump() {
let tmp = tempfile::tempdir().unwrap();
let mut uuid_store = MockIndexMetaStore::new();
uuid_store
.expect_dump()
.once()
.returning(move |_| Box::pin(err(IndexResolverError::ExistingPrimaryKey)));
let mocker = Mocker::default();
let file_store = UpdateFileStore::mock(mocker);
let mocker = Mocker::default();
mocker
.when::<(Option<TaskId>, Option<TaskFilter>, Option<usize>), TaskResult<Vec<Task>>>(
"list_tasks",
)
.then(|_| Ok(Vec::new()));
let task_store = TaskStore::mock(mocker);
let mut performer = MockTaskPerformer::new();
performer
.expect_process_job()
.once()
.returning(|job| match job {
Job::Dump { ret, .. } => drop(ret.send(Err(IndexResolverError::BadlyFormatted(
"blabla".to_string(),
)))),
_ => unreachable!(),
});
let performer = Arc::new(performer);
let scheduler = Scheduler::new(task_store, performer, SchedulerConfig::default()).unwrap();
let task = DumpJob {
dump_path: tmp.path().into(),
// this should do nothing
db_path: tmp.path().into(),
update_file_store: file_store,
task_store,
uid: String::from("test"),
update_db_size: 4096 * 10,
index_db_size: 4096 * 10,
scheduler,
};
assert!(task.run().await.is_err());

View File

@ -13,7 +13,7 @@ use futures::Stream;
use futures::StreamExt;
use milli::update::IndexDocumentsMethod;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tokio::sync::{mpsc, RwLock};
use tokio::task::spawn_blocking;
use tokio::time::sleep;
use uuid::Uuid;
@ -23,12 +23,11 @@ use crate::index::{
Checked, Document, IndexMeta, IndexStats, SearchQuery, SearchResult, Settings, Unchecked,
};
use crate::index_controller::dump_actor::{load_dump, DumpActor, DumpActorHandleImpl};
use crate::options::IndexerOpts;
use crate::options::{IndexerOpts, SchedulerConfig};
use crate::snapshot::{load_snapshot, SnapshotService};
use crate::tasks::create_task_store;
use crate::tasks::error::TaskError;
use crate::tasks::task::{DocumentDeletion, Task, TaskContent, TaskId};
use crate::tasks::{TaskFilter, TaskStore};
use crate::tasks::{Scheduler, TaskFilter, TaskStore};
use error::Result;
use self::dump_actor::{DumpActorHandle, DumpInfo};
@ -68,6 +67,7 @@ pub struct IndexSettings {
pub struct IndexController<U, I> {
index_resolver: Arc<IndexResolver<U, I>>,
scheduler: Arc<RwLock<Scheduler>>,
task_store: TaskStore,
dump_handle: dump_actor::DumpActorHandleImpl,
update_file_store: UpdateFileStore,
@ -78,9 +78,10 @@ impl<U, I> Clone for IndexController<U, I> {
fn clone(&self) -> Self {
Self {
index_resolver: self.index_resolver.clone(),
task_store: self.task_store.clone(),
scheduler: self.scheduler.clone(),
dump_handle: self.dump_handle.clone(),
update_file_store: self.update_file_store.clone(),
task_store: self.task_store.clone(),
}
}
}
@ -160,6 +161,7 @@ impl IndexControllerBuilder {
self,
db_path: impl AsRef<Path>,
indexer_options: IndexerOpts,
scheduler_config: SchedulerConfig,
) -> anyhow::Result<MeiliSearch> {
let index_size = self
.max_index_size
@ -217,8 +219,9 @@ impl IndexControllerBuilder {
update_file_store.clone(),
)?);
let task_store =
create_task_store(meta_env, index_resolver.clone()).map_err(|e| anyhow::anyhow!(e))?;
let task_store = TaskStore::new(meta_env)?;
let scheduler =
Scheduler::new(task_store.clone(), index_resolver.clone(), scheduler_config)?;
let dump_path = self
.dump_dst
@ -229,14 +232,14 @@ impl IndexControllerBuilder {
let actor = DumpActor::new(
receiver,
update_file_store.clone(),
task_store.clone(),
scheduler.clone(),
dump_path,
analytics_path,
index_size,
task_store_size,
);
tokio::task::spawn(actor.run());
tokio::task::spawn_local(actor.run());
DumpActorHandleImpl { sender }
};
@ -255,17 +258,18 @@ impl IndexControllerBuilder {
snapshot_path,
index_size,
meta_env_size: task_store_size,
task_store: task_store.clone(),
scheduler: scheduler.clone(),
};
tokio::task::spawn(snapshot_service.run());
tokio::task::spawn_local(snapshot_service.run());
}
Ok(IndexController {
index_resolver,
task_store,
scheduler,
dump_handle,
update_file_store,
task_store,
})
}
@ -415,12 +419,13 @@ where
};
let task = self.task_store.register(uid, content).await?;
self.scheduler.read().await.notify();
Ok(task)
}
pub async fn get_task(&self, id: TaskId, filter: Option<TaskFilter>) -> Result<Task> {
let task = self.task_store.get_task(id, filter).await?;
let task = self.scheduler.read().await.get_task(id, filter).await?;
Ok(task)
}
@ -435,7 +440,12 @@ where
let mut filter = TaskFilter::default();
filter.filter_index(index_uid);
let task = self.task_store.get_task(task_id, Some(filter)).await?;
let task = self
.scheduler
.read()
.await
.get_task(task_id, Some(filter))
.await?;
Ok(task)
}
@ -446,7 +456,12 @@ where
limit: Option<usize>,
offset: Option<TaskId>,
) -> Result<Vec<Task>> {
let tasks = self.task_store.list_tasks(offset, filter, limit).await?;
let tasks = self
.scheduler
.read()
.await
.list_tasks(offset, filter, limit)
.await?;
Ok(tasks)
}
@ -466,7 +481,9 @@ where
filter.filter_index(index_uid);
let tasks = self
.task_store
.scheduler
.read()
.await
.list_tasks(
Some(offset.unwrap_or_default() + task_id),
Some(filter),
@ -547,10 +564,11 @@ where
}
pub async fn get_index_stats(&self, uid: String) -> Result<IndexStats> {
let last_task = self.task_store.get_processing_task().await?;
let processing_tasks = self.scheduler.read().await.get_processing_tasks().await?;
// Check if the currently indexing update is from our index.
let is_indexing = last_task
.map(|task| task.index_uid.into_inner() == uid)
let is_indexing = processing_tasks
.first()
.map(|task| task.index_uid.as_str() == uid)
.unwrap_or_default();
let index = self.index_resolver.get_index(uid).await?;
@ -564,7 +582,7 @@ where
let mut last_task: Option<DateTime<_>> = None;
let mut indexes = BTreeMap::new();
let mut database_size = 0;
let processing_task = self.task_store.get_processing_task().await?;
let processing_tasks = self.scheduler.read().await.get_processing_tasks().await?;
for (index_uid, index) in self.index_resolver.list().await? {
if !search_rules.is_index_authorized(&index_uid) {
@ -584,8 +602,8 @@ where
});
// Check if the currently indexing update is from our index.
stats.is_indexing = processing_task
.as_ref()
stats.is_indexing = processing_tasks
.first()
.map(|p| p.index_uid.as_str() == index_uid)
.or(Some(false));
@ -637,16 +655,18 @@ mod test {
impl IndexController<MockIndexMetaStore, MockIndexStore> {
pub fn mock(
index_resolver: IndexResolver<MockIndexMetaStore, MockIndexStore>,
index_resolver: Arc<IndexResolver<MockIndexMetaStore, MockIndexStore>>,
task_store: TaskStore,
update_file_store: UpdateFileStore,
dump_handle: DumpActorHandleImpl,
scheduler: Arc<RwLock<Scheduler>>,
) -> Self {
IndexController {
index_resolver: Arc::new(index_resolver),
index_resolver,
task_store,
dump_handle,
update_file_store,
scheduler,
}
}
}
@ -719,13 +739,27 @@ mod test {
let task_store_mocker = nelson::Mocker::default();
let mocker = Mocker::default();
let update_file_store = UpdateFileStore::mock(mocker);
let index_resolver = IndexResolver::new(uuid_store, index_store, update_file_store.clone());
let index_resolver = Arc::new(IndexResolver::new(
uuid_store,
index_store,
update_file_store.clone(),
));
let task_store = TaskStore::mock(task_store_mocker);
// let dump_actor = MockDumpActorHandle::new();
let scheduler = Scheduler::new(
task_store.clone(),
index_resolver.clone(),
SchedulerConfig::default(),
)
.unwrap();
let (sender, _) = mpsc::channel(1);
let dump_handle = DumpActorHandleImpl { sender };
let index_controller =
IndexController::mock(index_resolver, task_store, update_file_store, dump_handle);
let index_controller = IndexController::mock(
index_resolver,
task_store,
update_file_store,
dump_handle,
scheduler,
);
let r = index_controller
.search(index_uid.to_owned(), query.clone())

View File

@ -1,14 +1,15 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use milli::update::IndexerConfig;
use tokio::fs;
use tokio::sync::RwLock;
use tokio::task::spawn_blocking;
use uuid::Uuid;
use super::error::{IndexResolverError, Result};
use crate::index::update_handler::UpdateHandler;
use crate::index::Index;
use crate::options::IndexerOpts;
@ -26,7 +27,7 @@ pub struct MapIndexStore {
index_store: AsyncMap<Uuid, Index>,
path: PathBuf,
index_size: usize,
update_handler: Arc<UpdateHandler>,
indexer_config: Arc<IndexerConfig>,
}
impl MapIndexStore {
@ -35,14 +36,14 @@ impl MapIndexStore {
index_size: usize,
indexer_opts: &IndexerOpts,
) -> anyhow::Result<Self> {
let update_handler = Arc::new(UpdateHandler::new(indexer_opts)?);
let indexer_config = Arc::new(IndexerConfig::try_from(indexer_opts)?);
let path = path.as_ref().join("indexes/");
let index_store = Arc::new(RwLock::new(HashMap::new()));
Ok(Self {
index_store,
path,
index_size,
update_handler,
indexer_config,
})
}
}
@ -63,7 +64,7 @@ impl IndexStore for MapIndexStore {
}
let index_size = self.index_size;
let update_handler = self.update_handler.clone();
let update_handler = self.indexer_config.clone();
let index = spawn_blocking(move || -> Result<Index> {
let index = Index::open(path, index_size, uuid, update_handler)?;
Ok(index)
@ -88,7 +89,7 @@ impl IndexStore for MapIndexStore {
}
let index_size = self.index_size;
let update_handler = self.update_handler.clone();
let update_handler = self.indexer_config.clone();
let index =
spawn_blocking(move || Index::open(path, index_size, uuid, update_handler))
.await??;

View File

@ -2,7 +2,7 @@ pub mod error;
pub mod index_store;
pub mod meta_store;
use std::convert::TryInto;
use std::convert::{TryFrom, TryInto};
use std::path::Path;
use std::sync::Arc;
@ -12,16 +12,17 @@ use heed::Env;
use index_store::{IndexStore, MapIndexStore};
use meilisearch_error::ResponseError;
use meta_store::{HeedMetaStore, IndexMetaStore};
use milli::update::DocumentDeletionResult;
use milli::update::{DocumentDeletionResult, IndexerConfig};
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
use tokio::task::spawn_blocking;
use uuid::Uuid;
use crate::index::{error::Result as IndexResult, update_handler::UpdateHandler, Index};
use crate::index::{error::Result as IndexResult, Index};
use crate::options::IndexerOpts;
use crate::tasks::batch::Batch;
use crate::tasks::task::{DocumentDeletion, Job, Task, TaskContent, TaskEvent, TaskId, TaskResult};
use crate::tasks::{Pending, TaskPerformer};
use crate::tasks::TaskPerformer;
use crate::update_file_store::UpdateFileStore;
use self::meta_store::IndexMeta;
@ -96,14 +97,24 @@ where
U: IndexMetaStore + Send + Sync + 'static,
I: IndexStore + Send + Sync + 'static,
{
type Error = ResponseError;
async fn process_batch(&self, mut batch: Batch) -> Batch {
// If a batch contains multiple tasks, then it must be a document addition batch
if let Some(Task {
content: TaskContent::DocumentAddition { .. },
..
}) = batch.tasks.first()
{
debug_assert!(batch.tasks.iter().all(|t| matches!(
t,
Task {
content: TaskContent::DocumentAddition { .. },
..
}
)));
async fn process(&self, mut batch: Batch) -> Batch {
// Until batching is implemented, all batch should contain only one update.
debug_assert_eq!(batch.len(), 1);
match batch.tasks.first_mut() {
Some(Pending::Task(task)) => {
self.process_document_addition_batch(batch).await
} else {
if let Some(task) = batch.tasks.first_mut() {
task.events.push(TaskEvent::Processing(Utc::now()));
match self.process_task(task).await {
@ -119,15 +130,12 @@ where
}),
}
}
Some(Pending::Job(job)) => {
let job = std::mem::take(job);
self.process_job(job).await;
}
None => (),
batch
}
}
batch
async fn process_job(&self, job: Job) {
self.process_job(job).await;
}
async fn finish(&self, batch: &Batch) {
@ -158,9 +166,9 @@ impl IndexResolver<HeedMetaStore, MapIndexStore> {
HeedMetaStore::load_dump(&src, env)?;
let indexes_path = src.as_ref().join("indexes");
let indexes = indexes_path.read_dir()?;
let update_handler = UpdateHandler::new(indexer_opts)?;
let indexer_config = IndexerConfig::try_from(indexer_opts)?;
for index in indexes {
Index::load_dump(&index?.path(), &dst, index_db_size, &update_handler)?;
Index::load_dump(&index?.path(), &dst, index_db_size, &indexer_config)?;
}
Ok(())
@ -180,33 +188,100 @@ where
}
}
async fn process_task(&self, task: &Task) -> Result<TaskResult> {
let index_uid = task.index_uid.clone();
match &task.content {
TaskContent::DocumentAddition {
content_uuid,
merge_strategy,
primary_key,
allow_index_creation,
async fn process_document_addition_batch(&self, mut batch: Batch) -> Batch {
fn get_content_uuid(task: &Task) -> Uuid {
match task {
Task {
content: TaskContent::DocumentAddition { content_uuid, .. },
..
} => *content_uuid,
_ => panic!("unexpected task in the document addition batch"),
}
}
let content_uuids = batch.tasks.iter().map(get_content_uuid).collect::<Vec<_>>();
match batch.tasks.first() {
Some(Task {
index_uid,
id,
content:
TaskContent::DocumentAddition {
merge_strategy,
primary_key,
allow_index_creation,
..
},
..
} => {
}) => {
let primary_key = primary_key.clone();
let content_uuid = *content_uuid;
let method = *merge_strategy;
let index = if *allow_index_creation {
self.get_or_create_index(index_uid, task.id).await?
self.get_or_create_index(index_uid.clone(), *id).await
} else {
self.get_index(index_uid.into_inner()).await?
self.get_index(index_uid.as_str().to_string()).await
};
// If the index doesn't exist and we are not allowed to create it with the first
// task, we must fails the whole batch.
let now = Utc::now();
let index = match index {
Ok(index) => index,
Err(e) => {
let error = ResponseError::from(e);
for task in batch.tasks.iter_mut() {
task.events.push(TaskEvent::Failed {
error: error.clone(),
timestamp: now,
});
}
return batch;
}
};
let file_store = self.file_store.clone();
let result = spawn_blocking(move || {
index.update_documents(method, content_uuid, primary_key, file_store)
index.update_documents(
method,
primary_key,
file_store,
content_uuids.into_iter(),
)
})
.await??;
.await;
Ok(result.into())
let event = match result {
Ok(Ok(result)) => TaskEvent::Succeded {
timestamp: Utc::now(),
result: TaskResult::DocumentAddition {
indexed_documents: result.indexed_documents,
},
},
Ok(Err(e)) => TaskEvent::Failed {
timestamp: Utc::now(),
error: e.into(),
},
Err(e) => TaskEvent::Failed {
timestamp: Utc::now(),
error: IndexResolverError::from(e).into(),
},
};
for task in batch.tasks.iter_mut() {
task.events.push(event.clone());
}
batch
}
_ => panic!("invalid batch!"),
}
}
async fn process_task(&self, task: &Task) -> Result<TaskResult> {
let index_uid = task.index_uid.clone();
match &task.content {
TaskContent::DocumentAddition { .. } => panic!("updates should be handled by batch"),
TaskContent::DocumentDeletion(DocumentDeletion::Ids(ids)) => {
let ids = ids.clone();
let index = self.get_index(index_uid.into_inner()).await?;
@ -282,9 +357,13 @@ where
Job::Dump { ret, path } => {
log::trace!("The Dump task is getting executed");
if ret.send(self.dump(path).await).is_err() {
let (sender, receiver) = oneshot::channel();
if ret.send(self.dump(path).await.map(|_| sender)).is_err() {
log::error!("The dump actor died.");
}
// wait until the dump has finished performing.
let _ = receiver.await;
}
Job::Empty => log::error!("Tried to process an empty task."),
Job::Snapshot(job) => {
@ -404,7 +483,7 @@ where
#[cfg(test)]
mod test {
use std::collections::BTreeMap;
use std::{collections::BTreeMap, vec::IntoIter};
use super::*;
@ -447,7 +526,7 @@ mod test {
mocker.when::<String, IndexResult<IndexMeta>>("update_primary_key")
.then(move |_| Ok(IndexMeta{ created_at: Utc::now(), updated_at: Utc::now(), primary_key: None }));
}
mocker.when::<(IndexDocumentsMethod, Uuid, Option<String>, UpdateFileStore), IndexResult<DocumentAdditionResult>>("update_documents")
mocker.when::<(IndexDocumentsMethod, Option<String>, UpdateFileStore, IntoIter<Uuid>), IndexResult<DocumentAdditionResult>>("update_documents")
.then(move |(_, _, _, _)| result());
}
TaskContent::SettingsUpdate{..} => {
@ -462,13 +541,13 @@ mod test {
}
TaskContent::DocumentDeletion(DocumentDeletion::Ids(_ids)) => {
let result = move || if !index_op_fails {
Ok(any_int as u64)
Ok(DocumentDeletionResult { deleted_documents: any_int as u64, remaining_documents: any_int as u64 })
} else {
// return this error because it's easy to generate...
Err(IndexError::DocumentNotFound("a doc".into()))
};
mocker.when::<&[String], IndexResult<u64>>("delete_documents")
mocker.when::<&[String], IndexResult<DocumentDeletionResult>>("delete_documents")
.then(move |_| result());
},
TaskContent::DocumentDeletion(DocumentDeletion::Clear) => {
@ -561,7 +640,8 @@ mod test {
let update_file_store = UpdateFileStore::mock(mocker);
let index_resolver = IndexResolver::new(uuid_store, index_store, update_file_store);
let result = index_resolver.process_task(&task).await;
let batch = Batch { id: 1, created_at: Utc::now(), tasks: vec![task.clone()] };
let result = index_resolver.process_batch(batch).await;
// Test for some expected output scenarios:
// Index creation and deletion cannot fail because of a failed index op, since they
@ -575,9 +655,9 @@ mod test {
| TaskContent::DocumentAddition { allow_index_creation: false, ..}
| TaskContent::IndexUpdate { .. } ))
{
assert!(result.is_err(), "{:?}", result);
assert!(matches!(result.tasks[0].events.last().unwrap(), TaskEvent::Failed { .. }), "{:?}", result);
} else {
assert!(result.is_ok(), "{:?}", result);
assert!(matches!(result.tasks[0].events.last().unwrap(), TaskEvent::Succeded { .. }), "{:?}", result);
}
});
}

View File

@ -1,9 +1,10 @@
use core::fmt;
use std::{ops::Deref, str::FromStr};
use std::{convert::TryFrom, ops::Deref, str::FromStr};
use byte_unit::{Byte, ByteError};
use clap::Parser;
use milli::CompressionType;
use milli::{update::IndexerConfig, CompressionType};
use serde::Serialize;
use sysinfo::{RefreshKind, System, SystemExt};
#[derive(Debug, Clone, Parser)]
@ -43,6 +44,52 @@ pub struct IndexerOpts {
pub indexing_jobs: Option<usize>,
}
#[derive(Debug, Clone, Parser, Default, Serialize)]
pub struct SchedulerConfig {
/// enable the autobatching experimental feature
#[clap(long, hide = true)]
pub enable_autobatching: bool,
// The maximum number of updates of the same type that can be batched together.
// If unspecified, this is unlimited. A value of 0 is interpreted as 1.
#[clap(long, requires = "enable-autobatching", hide = true)]
pub max_batch_size: Option<usize>,
// The maximum number of documents in a document batch. Since batches must contain at least one
// update for the scheduler to make progress, the number of documents in a batch will be at
// least the number of documents of its first update.
#[clap(long, requires = "enable-autobatching", hide = true)]
pub max_documents_per_batch: Option<usize>,
/// Debounce duration in seconds
///
/// When a new task is enqueued, the scheduler waits for `debounce_duration_sec` seconds for new updates before
/// starting to process a batch of updates.
#[clap(long, requires = "enable-autobatching", hide = true)]
pub debounce_duration_sec: Option<u64>,
}
impl TryFrom<&IndexerOpts> for IndexerConfig {
type Error = anyhow::Error;
fn try_from(other: &IndexerOpts) -> Result<Self, Self::Error> {
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(other.indexing_jobs.unwrap_or(num_cpus::get() / 2))
.build()?;
Ok(Self {
log_every_n: Some(other.log_every_n),
max_nb_chunks: other.max_nb_chunks,
max_memory: (*other.max_memory).map(|b| b.get_bytes() as usize),
chunk_compression_type: other.chunk_compression_type,
chunk_compression_level: other.chunk_compression_level,
thread_pool: Some(thread_pool),
max_positions_per_attributes: None,
..Default::default()
})
}
}
impl Default for IndexerOpts {
fn default() -> Self {
Self {

View File

@ -1,17 +1,19 @@
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use anyhow::bail;
use fs_extra::dir::{self, CopyOptions};
use log::{info, trace};
use tokio::sync::RwLock;
use tokio::time::sleep;
use walkdir::WalkDir;
use crate::compression::from_tar_gz;
use crate::index_controller::versioning::VERSION_FILE_NAME;
use crate::tasks::task::Job;
use crate::tasks::TaskStore;
use crate::tasks::Scheduler;
pub struct SnapshotService {
pub(crate) db_path: PathBuf,
@ -19,7 +21,7 @@ pub struct SnapshotService {
pub(crate) snapshot_path: PathBuf,
pub(crate) index_size: usize,
pub(crate) meta_env_size: usize,
pub(crate) task_store: TaskStore,
pub(crate) scheduler: Arc<RwLock<Scheduler>>,
}
impl SnapshotService {
@ -36,7 +38,7 @@ impl SnapshotService {
index_size: self.index_size,
};
let job = Job::Snapshot(snapshot_job);
self.task_store.register_job(job).await;
self.scheduler.write().await.schedule_job(job).await;
sleep(self.snapshot_period).await;
}

View File

@ -1,14 +1,14 @@
use chrono::{DateTime, Utc};
use super::{task::Task, task_store::Pending};
use super::task::Task;
pub type BatchId = u32;
pub type BatchId = u64;
#[derive(Debug)]
pub struct Batch {
pub id: BatchId,
pub created_at: DateTime<Utc>,
pub tasks: Vec<Pending<Task>>,
pub tasks: Vec<Task>,
}
impl Batch {

View File

@ -1,47 +1,38 @@
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
pub use scheduler::Scheduler;
pub use task_store::TaskFilter;
#[cfg(test)]
pub use task_store::test::MockTaskStore as TaskStore;
#[cfg(not(test))]
pub use task_store::TaskStore;
pub use task_store::{Pending, TaskFilter};
use batch::Batch;
use error::Result;
use scheduler::Scheduler;
use self::task::Job;
pub mod batch;
pub mod error;
pub mod scheduler;
mod scheduler;
pub mod task;
mod task_store;
pub mod update_loop;
#[cfg_attr(test, mockall::automock(type Error=test::DebugError;))]
#[async_trait]
pub trait TaskPerformer: Sync + Send + 'static {
type Error: Serialize + for<'de> Deserialize<'de> + std::error::Error + Sync + Send + 'static;
/// Processes the `Task` batch returning the batch with the `Task` updated.
async fn process(&self, batch: Batch) -> Batch;
async fn process_batch(&self, batch: Batch) -> Batch;
async fn process_job(&self, job: Job);
/// `finish` is called when the result of `process` has been commited to the task store. This
/// method can be used to perform cleanup after the update has been completed for example.
async fn finish(&self, batch: &Batch);
}
pub fn create_task_store<P>(env: Arc<heed::Env>, performer: Arc<P>) -> Result<TaskStore>
where
P: TaskPerformer,
{
let task_store = TaskStore::new(env)?;
let scheduler = Scheduler::new(task_store.clone(), performer, Duration::from_millis(1));
tokio::task::spawn_local(scheduler.run());
Ok(task_store)
}
#[cfg(test)]
mod test {
use serde::{Deserialize, Serialize};

View File

@ -1,253 +1,526 @@
use std::cmp::Ordering;
use std::collections::{hash_map::Entry, BinaryHeap, HashMap, VecDeque};
use std::ops::{Deref, DerefMut};
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use atomic_refcell::AtomicRefCell;
use chrono::Utc;
use serde::{Deserialize, Serialize};
use milli::update::IndexDocumentsMethod;
use tokio::sync::{watch, RwLock};
use crate::options::SchedulerConfig;
use crate::update_file_store::UpdateFileStore;
use super::batch::Batch;
use super::error::Result;
#[cfg(test)]
use super::task_store::test::MockTaskStore as TaskStore;
use super::task_store::Pending;
#[cfg(not(test))]
use super::task_store::TaskStore;
use super::TaskPerformer;
use crate::tasks::task::TaskEvent;
use super::task::{Job, Task, TaskContent, TaskEvent, TaskId};
use super::update_loop::UpdateLoop;
use super::{TaskFilter, TaskPerformer, TaskStore};
/// The scheduler roles is to perform batches of tasks one at a time. It will monitor the TaskStore
/// for new tasks, put them in a batch, and process the batch as soon as possible.
///
/// When a batch is currently processing, the scheduler is just waiting.
pub struct Scheduler<P: TaskPerformer> {
store: TaskStore,
performer: Arc<P>,
/// The interval at which the the `TaskStore` should be checked for new updates
task_store_check_interval: Duration,
#[derive(Eq, Debug, Clone, Copy)]
enum TaskType {
DocumentAddition { number: usize },
DocumentUpdate { number: usize },
Other,
}
impl<P> Scheduler<P>
where
P: TaskPerformer + Send + Sync + 'static,
P::Error: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
{
pub fn new(store: TaskStore, performer: Arc<P>, task_store_check_interval: Duration) -> Self {
/// Two tasks are equal if they have the same type.
impl PartialEq for TaskType {
fn eq(&self, other: &Self) -> bool {
matches!(
(self, other),
(Self::DocumentAddition { .. }, Self::DocumentAddition { .. })
| (Self::DocumentUpdate { .. }, Self::DocumentUpdate { .. })
)
}
}
#[derive(Eq, Debug, Clone, Copy)]
struct PendingTask {
kind: TaskType,
id: TaskId,
}
impl PartialEq for PendingTask {
fn eq(&self, other: &Self) -> bool {
self.id.eq(&other.id)
}
}
impl PartialOrd for PendingTask {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl Ord for PendingTask {
fn cmp(&self, other: &Self) -> Ordering {
self.id.cmp(&other.id).reverse()
}
}
#[derive(Debug)]
struct TaskList {
index: String,
tasks: BinaryHeap<PendingTask>,
}
impl Deref for TaskList {
type Target = BinaryHeap<PendingTask>;
fn deref(&self) -> &Self::Target {
&self.tasks
}
}
impl DerefMut for TaskList {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.tasks
}
}
impl TaskList {
fn new(index: String) -> Self {
Self {
store,
performer,
task_store_check_interval,
index,
tasks: Default::default(),
}
}
}
pub async fn run(self) {
loop {
if let Err(e) = self.process_next_batch().await {
log::error!("an error occured while processing an update batch: {}", e);
impl PartialEq for TaskList {
fn eq(&self, other: &Self) -> bool {
self.index == other.index
}
}
impl Eq for TaskList {}
impl Ord for TaskList {
fn cmp(&self, other: &Self) -> Ordering {
match (self.peek(), other.peek()) {
(None, None) => Ordering::Equal,
(None, Some(_)) => Ordering::Less,
(Some(_), None) => Ordering::Greater,
(Some(lhs), Some(rhs)) => lhs.cmp(rhs),
}
}
}
impl PartialOrd for TaskList {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
#[derive(Default)]
struct TaskQueue {
/// Maps index uids to their TaskList, for quick access
index_tasks: HashMap<String, Arc<AtomicRefCell<TaskList>>>,
/// A queue that orders TaskList by the priority of their fist update
queue: BinaryHeap<Arc<AtomicRefCell<TaskList>>>,
}
impl TaskQueue {
fn insert(&mut self, task: Task) {
let uid = task.index_uid.into_inner();
let id = task.id;
let kind = match task.content {
TaskContent::DocumentAddition {
documents_count,
merge_strategy: IndexDocumentsMethod::ReplaceDocuments,
..
} => TaskType::DocumentAddition {
number: documents_count,
},
TaskContent::DocumentAddition {
documents_count,
merge_strategy: IndexDocumentsMethod::UpdateDocuments,
..
} => TaskType::DocumentUpdate {
number: documents_count,
},
_ => TaskType::Other,
};
let task = PendingTask { kind, id };
match self.index_tasks.entry(uid) {
Entry::Occupied(entry) => {
// A task list already exists for this index, all we have to to is to push the new
// update to the end of the list. This won't change the order since ids are
// monotically increasing.
let mut list = entry.get().borrow_mut();
// We only need the first element to be lower than the one we want to
// insert to preserve the order in the queue.
assert!(list.peek().map(|old_id| id >= old_id.id).unwrap_or(true));
list.push(task);
}
Entry::Vacant(entry) => {
let mut task_list = TaskList::new(entry.key().to_owned());
task_list.push(task);
let task_list = Arc::new(AtomicRefCell::new(task_list));
entry.insert(task_list.clone());
self.queue.push(task_list);
}
}
}
async fn process_next_batch(&self) -> Result<()> {
match self.prepare_batch().await? {
Some(mut batch) => {
for task in &mut batch.tasks {
match task {
Pending::Task(task) => task.events.push(TaskEvent::Processing(Utc::now())),
Pending::Job(_) => (),
/// Passes a context with a view to the task list of the next index to schedule. It is
/// guaranteed that the first id from task list will be the lowest pending task id.
fn head_mut<R>(&mut self, mut f: impl FnMut(&mut TaskList) -> R) -> Option<R> {
let head = self.queue.pop()?;
let result = {
let mut ref_head = head.borrow_mut();
f(&mut *ref_head)
};
if !head.borrow().tasks.is_empty() {
// After being mutated, the head is reinserted to the correct position.
self.queue.push(head);
} else {
self.index_tasks.remove(&head.borrow().index);
}
Some(result)
}
pub fn is_empty(&self) -> bool {
self.queue.is_empty() && self.index_tasks.is_empty()
}
}
pub struct Scheduler {
jobs: VecDeque<Job>,
tasks: TaskQueue,
store: TaskStore,
processing: Vec<TaskId>,
next_fetched_task_id: TaskId,
config: SchedulerConfig,
/// Notifies the update loop that a new task was received
notifier: watch::Sender<()>,
}
impl Scheduler {
pub fn new<P>(
store: TaskStore,
performer: Arc<P>,
mut config: SchedulerConfig,
) -> Result<Arc<RwLock<Self>>>
where
P: TaskPerformer,
{
let (notifier, rcv) = watch::channel(());
let debounce_time = config.debounce_duration_sec;
// Disable autobatching
if !config.enable_autobatching {
config.max_batch_size = Some(1);
}
let this = Self {
jobs: VecDeque::new(),
tasks: TaskQueue::default(),
store,
processing: Vec::new(),
next_fetched_task_id: 0,
config,
notifier,
};
// Notify update loop to start processing pending updates immediately after startup.
this.notify();
let this = Arc::new(RwLock::new(this));
let update_loop = UpdateLoop::new(
this.clone(),
performer,
debounce_time.filter(|&v| v > 0).map(Duration::from_secs),
rcv,
);
tokio::task::spawn_local(update_loop.run());
Ok(this)
}
pub async fn dump(&self, path: &Path, file_store: UpdateFileStore) -> Result<()> {
self.store.dump(path, file_store).await
}
fn register_task(&mut self, task: Task) {
assert!(!task.is_finished());
self.tasks.insert(task);
}
/// Clears the processing list, this method should be called when the processing of a batch is finished.
pub fn finish(&mut self) {
self.processing.clear();
}
pub fn notify(&self) {
let _ = self.notifier.send(());
}
fn notify_if_not_empty(&self) {
if !self.jobs.is_empty() || !self.tasks.is_empty() {
self.notify();
}
}
pub async fn update_tasks(&self, tasks: Vec<Task>) -> Result<Vec<Task>> {
self.store.update_tasks(tasks).await
}
pub async fn get_task(&self, id: TaskId, filter: Option<TaskFilter>) -> Result<Task> {
self.store.get_task(id, filter).await
}
pub async fn list_tasks(
&self,
offset: Option<TaskId>,
filter: Option<TaskFilter>,
limit: Option<usize>,
) -> Result<Vec<Task>> {
self.store.list_tasks(offset, filter, limit).await
}
pub async fn get_processing_tasks(&self) -> Result<Vec<Task>> {
let mut tasks = Vec::new();
for id in self.processing.iter() {
let task = self.store.get_task(*id, None).await?;
tasks.push(task);
}
Ok(tasks)
}
pub async fn schedule_job(&mut self, job: Job) {
self.jobs.push_back(job);
self.notify();
}
async fn fetch_pending_tasks(&mut self) -> Result<()> {
// We must NEVER re-enqueue an already processed task! It's content uuid would point to an unexisting file.
//
// TODO(marin): This may create some latency when the first batch lazy loads the pending updates.
let mut filter = TaskFilter::default();
filter.filter_fn(|task| !task.is_finished());
self.store
.list_tasks(Some(self.next_fetched_task_id), Some(filter), None)
.await?
.into_iter()
// The tasks arrive in reverse order, and we need to insert them in order.
.rev()
.for_each(|t| {
self.next_fetched_task_id = t.id + 1;
self.register_task(t);
});
Ok(())
}
/// Prepare the next batch, and set `processing` to the ids in that batch.
pub async fn prepare(&mut self) -> Result<Pending> {
// If there is a job to process, do it first.
if let Some(job) = self.jobs.pop_front() {
// There is more work to do, notify the update loop
self.notify_if_not_empty();
return Ok(Pending::Job(job));
}
// Try to fill the queue with pending tasks.
self.fetch_pending_tasks().await?;
make_batch(&mut self.tasks, &mut self.processing, &self.config);
log::debug!("prepared batch with {} tasks", self.processing.len());
if !self.processing.is_empty() {
let ids = std::mem::take(&mut self.processing);
let (ids, mut tasks) = self.store.get_pending_tasks(ids).await?;
// The batch id is the id of the first update it contains
let id = match tasks.first() {
Some(Task { id, .. }) => *id,
_ => panic!("invalid batch"),
};
tasks.iter_mut().for_each(|t| {
t.events.push(TaskEvent::Batched {
batch_id: id,
timestamp: Utc::now(),
})
});
self.processing = ids;
let batch = Batch {
id,
created_at: Utc::now(),
tasks,
};
// There is more work to do, notify the update loop
self.notify_if_not_empty();
Ok(Pending::Batch(batch))
} else {
Ok(Pending::Nothing)
}
}
}
#[derive(Debug)]
pub enum Pending {
Batch(Batch),
Job(Job),
Nothing,
}
fn make_batch(tasks: &mut TaskQueue, processing: &mut Vec<TaskId>, config: &SchedulerConfig) {
processing.clear();
let mut doc_count = 0;
tasks.head_mut(|list| match list.peek().copied() {
Some(PendingTask {
kind: TaskType::Other,
id,
}) => {
processing.push(id);
list.pop();
}
Some(PendingTask { kind, .. }) => loop {
match list.peek() {
Some(pending) if pending.kind == kind => {
// We always need to process at least one task for the scheduler to make progress.
if processing.len() >= config.max_batch_size.unwrap_or(usize::MAX).max(1) {
break;
}
let pending = list.pop().unwrap();
processing.push(pending.id);
// We add the number of documents to the count if we are scheduling document additions and
// stop adding if we already have enough.
//
// We check that bound only after adding the current task to the batch, so that a batch contains at least one task.
match pending.kind {
TaskType::DocumentUpdate { number }
| TaskType::DocumentAddition { number } => {
doc_count += number;
if doc_count >= config.max_documents_per_batch.unwrap_or(usize::MAX) {
break;
}
}
_ => (),
}
}
// the jobs are ignored
batch.tasks = self.store.update_tasks(batch.tasks).await?;
let performer = self.performer.clone();
let batch_result = performer.process(batch).await;
self.handle_batch_result(batch_result).await?;
_ => break,
}
None => {
// No update found to create a batch we wait a bit before we retry.
tokio::time::sleep(self.task_store_check_interval).await;
}
}
Ok(())
}
/// Checks for pending tasks and groups them in a batch. If there are no pending update,
/// return Ok(None)
///
/// Until batching is properly implemented, the batches contain only one task.
async fn prepare_batch(&self) -> Result<Option<Batch>> {
match self.store.peek_pending_task().await {
Some(Pending::Task(next_task_id)) => {
let mut task = self.store.get_task(next_task_id, None).await?;
task.events.push(TaskEvent::Batched {
timestamp: Utc::now(),
batch_id: 0,
});
let batch = Batch {
id: 0,
// index_uid: task.index_uid.clone(),
created_at: Utc::now(),
tasks: vec![Pending::Task(task)],
};
Ok(Some(batch))
}
Some(Pending::Job(job)) => Ok(Some(Batch {
id: 0,
created_at: Utc::now(),
tasks: vec![Pending::Job(job)],
})),
None => Ok(None),
}
}
/// Handles the result from a batch processing.
///
/// When a task is processed, the result of the processing is pushed to its event list. The
/// handle batch result make sure that the new state is save into its store.
/// The tasks are then removed from the processing queue.
async fn handle_batch_result(&self, mut batch: Batch) -> Result<()> {
let tasks = self.store.update_tasks(batch.tasks).await?;
batch.tasks = tasks;
self.store.delete_pending(&batch.tasks[0]).await;
self.performer.finish(&batch).await;
Ok(())
}
},
None => (),
});
}
#[cfg(test)]
mod test {
use nelson::Mocker;
use milli::update::IndexDocumentsMethod;
use uuid::Uuid;
use crate::index_resolver::IndexUid;
use crate::tasks::task::Task;
use crate::tasks::task_store::TaskFilter;
use crate::{index_resolver::IndexUid, tasks::task::TaskContent};
use super::super::task::{TaskContent, TaskEvent, TaskId, TaskResult};
use super::super::MockTaskPerformer;
use super::*;
#[tokio::test]
async fn test_prepare_batch_full() {
let mocker = Mocker::default();
mocker
.when::<(TaskId, Option<TaskFilter>), Result<Option<Task>>>("get_task")
.once()
.then(|(id, _filter)| {
let task = Task {
id,
index_uid: IndexUid::new("Test".to_string()).unwrap(),
content: TaskContent::IndexDeletion,
events: vec![TaskEvent::Created(Utc::now())],
};
Ok(Some(task))
});
mocker
.when::<(), Option<Pending<TaskId>>>("peek_pending_task")
.then(|()| Some(Pending::Task(1)));
let store = TaskStore::mock(mocker);
let performer = Arc::new(MockTaskPerformer::new());
let scheduler = Scheduler {
store,
performer,
task_store_check_interval: Duration::from_millis(1),
};
let batch = scheduler.prepare_batch().await.unwrap().unwrap();
assert_eq!(batch.tasks.len(), 1);
assert!(
matches!(batch.tasks[0], Pending::Task(Task { id: 1, .. })),
"{:?}",
batch.tasks[0]
);
}
#[tokio::test]
async fn test_prepare_batch_empty() {
let mocker = Mocker::default();
mocker
.when::<(), Option<Pending<TaskId>>>("peek_pending_task")
.then(|()| None);
let store = TaskStore::mock(mocker);
let performer = Arc::new(MockTaskPerformer::new());
let scheduler = Scheduler {
store,
performer,
task_store_check_interval: Duration::from_millis(1),
};
assert!(scheduler.prepare_batch().await.unwrap().is_none());
}
#[tokio::test]
async fn test_loop_run_normal() {
let mocker = Mocker::default();
let mut id = Some(1);
mocker
.when::<(), Option<Pending<TaskId>>>("peek_pending_task")
.then(move |()| id.take().map(Pending::Task));
mocker
.when::<(TaskId, Option<TaskFilter>), Result<Task>>("get_task")
.once()
.then(|(id, _)| {
let task = Task {
id,
index_uid: IndexUid::new("Test".to_string()).unwrap(),
content: TaskContent::IndexDeletion,
events: vec![TaskEvent::Created(Utc::now())],
};
Ok(task)
});
mocker
.when::<Vec<Pending<Task>>, Result<Vec<Pending<Task>>>>("update_tasks")
.times(2)
.then(|tasks| {
assert_eq!(tasks.len(), 1);
Ok(tasks)
});
mocker.when::<(), ()>("delete_pending").once().then(|_| ());
let store = TaskStore::mock(mocker);
let mut performer = MockTaskPerformer::new();
performer.expect_process().once().returning(|mut batch| {
batch.tasks.iter_mut().for_each(|t| match t {
Pending::Task(Task { ref mut events, .. }) => events.push(TaskEvent::Succeded {
result: TaskResult::Other,
timestamp: Utc::now(),
}),
_ => panic!("expected a task, found a job"),
});
batch
});
performer.expect_finish().once().returning(|_| ());
let performer = Arc::new(performer);
let scheduler = Scheduler {
store,
performer,
task_store_check_interval: Duration::from_millis(1),
};
let handle = tokio::spawn(scheduler.run());
if let Ok(r) = tokio::time::timeout(Duration::from_millis(100), handle).await {
r.unwrap();
fn gen_task(id: TaskId, index_uid: &str, content: TaskContent) -> Task {
Task {
id,
index_uid: IndexUid::new_unchecked(index_uid.to_owned()),
content,
events: vec![],
}
}
#[test]
fn register_updates_multiples_indexes() {
let mut queue = TaskQueue::default();
queue.insert(gen_task(0, "test1", TaskContent::IndexDeletion));
queue.insert(gen_task(1, "test2", TaskContent::IndexDeletion));
queue.insert(gen_task(2, "test2", TaskContent::IndexDeletion));
queue.insert(gen_task(3, "test2", TaskContent::IndexDeletion));
queue.insert(gen_task(4, "test1", TaskContent::IndexDeletion));
queue.insert(gen_task(5, "test1", TaskContent::IndexDeletion));
queue.insert(gen_task(6, "test2", TaskContent::IndexDeletion));
let test1_tasks = queue
.head_mut(|tasks| tasks.drain().map(|t| t.id).collect::<Vec<_>>())
.unwrap();
assert_eq!(test1_tasks, &[0, 4, 5]);
let test2_tasks = queue
.head_mut(|tasks| tasks.drain().map(|t| t.id).collect::<Vec<_>>())
.unwrap();
assert_eq!(test2_tasks, &[1, 2, 3, 6]);
assert!(queue.index_tasks.is_empty());
assert!(queue.queue.is_empty());
}
#[test]
fn test_make_batch() {
let mut queue = TaskQueue::default();
let content = TaskContent::DocumentAddition {
content_uuid: Uuid::new_v4(),
merge_strategy: IndexDocumentsMethod::ReplaceDocuments,
primary_key: Some("test".to_string()),
documents_count: 0,
allow_index_creation: true,
};
queue.insert(gen_task(0, "test1", content.clone()));
queue.insert(gen_task(1, "test2", content.clone()));
queue.insert(gen_task(2, "test2", TaskContent::IndexDeletion));
queue.insert(gen_task(3, "test2", content.clone()));
queue.insert(gen_task(4, "test1", content.clone()));
queue.insert(gen_task(5, "test1", TaskContent::IndexDeletion));
queue.insert(gen_task(6, "test2", content.clone()));
queue.insert(gen_task(7, "test1", content));
let mut batch = Vec::new();
let config = SchedulerConfig::default();
make_batch(&mut queue, &mut batch, &config);
assert_eq!(batch, &[0, 4]);
batch.clear();
make_batch(&mut queue, &mut batch, &config);
assert_eq!(batch, &[1]);
batch.clear();
make_batch(&mut queue, &mut batch, &config);
assert_eq!(batch, &[2]);
batch.clear();
make_batch(&mut queue, &mut batch, &config);
assert_eq!(batch, &[3, 6]);
batch.clear();
make_batch(&mut queue, &mut batch, &config);
assert_eq!(batch, &[5]);
batch.clear();
make_batch(&mut queue, &mut batch, &config);
assert_eq!(batch, &[7]);
assert!(queue.is_empty());
}
}

View File

@ -97,7 +97,7 @@ impl Task {
pub enum Job {
Dump {
#[derivative(PartialEq = "ignore")]
ret: oneshot::Sender<Result<(), IndexResolverError>>,
ret: oneshot::Sender<Result<oneshot::Sender<()>, IndexResolverError>>,
path: PathBuf,
},
Snapshot(#[derivative(PartialEq = "ignore")] SnapshotJob),

View File

@ -1,7 +1,6 @@
mod store;
use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashSet};
use std::collections::HashSet;
use std::io::{BufWriter, Write};
use std::path::Path;
use std::sync::Arc;
@ -9,11 +8,9 @@ use std::sync::Arc;
use chrono::Utc;
use heed::{Env, RwTxn};
use log::debug;
use tokio::sync::RwLock;
use uuid::Uuid;
use super::error::TaskError;
use super::task::{Job, Task, TaskContent, TaskId};
use super::task::{Task, TaskContent, TaskId};
use super::Result;
use crate::index_resolver::IndexUid;
use crate::tasks::task::TaskEvent;
@ -25,9 +22,10 @@ pub use store::test::MockStore as Store;
pub use store::Store;
/// Defines constraints to be applied when querying for Tasks from the store.
#[derive(Default, Debug)]
#[derive(Default)]
pub struct TaskFilter {
indexes: Option<HashSet<String>>,
filter_fn: Option<Box<dyn Fn(&Task) -> bool + Sync + Send + 'static>>,
}
impl TaskFilter {
@ -44,85 +42,28 @@ impl TaskFilter {
.get_or_insert_with(Default::default)
.insert(index);
}
}
/// You can't clone a job because of its volatile nature.
/// If you need to take the `Job` with you though. You can call the method
/// `Pending::take`. It'll return the `Pending` as-is but `Empty` the original.
#[derive(Debug, PartialEq)]
pub enum Pending<T> {
/// A task stored on disk that must be processed.
Task(T),
/// Job always have a higher priority over normal tasks and are not stored on disk.
/// It can be refered as `Volatile job`.
Job(Job),
}
impl Pending<TaskId> {
/// Makes a copy of the task or take the content of the volatile job.
pub(crate) fn take(&mut self) -> Self {
match self {
Self::Task(id) => Self::Task(*id),
Self::Job(job) => Self::Job(job.take()),
}
}
}
impl Eq for Pending<TaskId> {}
impl PartialOrd for Pending<TaskId> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match (self, other) {
// in case of two tasks we want to return the lowest taskId first.
(Pending::Task(lhs), Pending::Task(rhs)) => Some(lhs.cmp(rhs).reverse()),
// A job is always better than a task.
(Pending::Task(_), Pending::Job(_)) => Some(Ordering::Less),
(Pending::Job(_), Pending::Task(_)) => Some(Ordering::Greater),
// When there is two jobs we consider them equals.
(Pending::Job(_), Pending::Job(_)) => Some(Ordering::Equal),
}
}
}
impl Pending<Task> {
pub fn get_content_uuid(&self) -> Option<Uuid> {
match self {
Pending::Task(task) => task.get_content_uuid(),
_ => None,
}
}
}
impl Ord for Pending<TaskId> {
fn cmp(&self, other: &Self) -> Ordering {
self.partial_cmp(other).unwrap()
pub fn filter_fn(&mut self, f: impl Fn(&Task) -> bool + Sync + Send + 'static) {
self.filter_fn.replace(Box::new(f));
}
}
pub struct TaskStore {
store: Arc<Store>,
pending_queue: Arc<RwLock<BinaryHeap<Pending<TaskId>>>>,
}
impl Clone for TaskStore {
fn clone(&self) -> Self {
Self {
store: self.store.clone(),
pending_queue: self.pending_queue.clone(),
}
}
}
impl TaskStore {
pub fn new(env: Arc<heed::Env>) -> Result<Self> {
let mut store = Store::new(env)?;
let unfinished_tasks = store.reset_and_return_unfinished_tasks()?;
let store = Arc::new(store);
Ok(Self {
store,
pending_queue: Arc::new(RwLock::new(unfinished_tasks)),
})
let store = Arc::new(Store::new(env)?);
Ok(Self { store })
}
pub async fn register(&self, index_uid: IndexUid, content: TaskContent) -> Result<Task> {
@ -146,11 +87,6 @@ impl TaskStore {
})
.await??;
self.pending_queue
.write()
.await
.push(Pending::Task(task.id));
Ok(task)
}
@ -159,35 +95,6 @@ impl TaskStore {
Ok(())
}
/// Register an update that applies on multiple indexes.
/// Currently the update is considered as a priority.
pub async fn register_job(&self, content: Job) {
debug!("registering a job: {:?}", content);
self.pending_queue.write().await.push(Pending::Job(content));
}
/// Returns the next task to process.
pub async fn peek_pending_task(&self) -> Option<Pending<TaskId>> {
let mut pending_queue = self.pending_queue.write().await;
loop {
match pending_queue.peek()? {
Pending::Job(Job::Empty) => drop(pending_queue.pop()),
_ => return Some(pending_queue.peek_mut()?.take()),
}
}
}
/// Returns the next task to process if there is one.
pub async fn get_processing_task(&self) -> Result<Option<Task>> {
match self.peek_pending_task().await {
Some(Pending::Task(tid)) => {
let task = self.get_task(tid, None).await?;
Ok(matches!(task.events.last(), Some(TaskEvent::Processing(_))).then(|| task))
}
_ => Ok(None),
}
}
pub async fn get_task(&self, id: TaskId, filter: Option<TaskFilter>) -> Result<Task> {
let store = self.store.clone();
let task = tokio::task::spawn_blocking(move || -> Result<_> {
@ -207,17 +114,33 @@ impl TaskStore {
}
}
pub async fn update_tasks(&self, tasks: Vec<Pending<Task>>) -> Result<Vec<Pending<Task>>> {
pub async fn get_pending_tasks(&self, ids: Vec<TaskId>) -> Result<(Vec<TaskId>, Vec<Task>)> {
let store = self.store.clone();
let tasks = tokio::task::spawn_blocking(move || -> Result<_> {
let mut tasks = Vec::new();
let txn = store.rtxn()?;
for id in ids.iter() {
let task = store
.get(&txn, *id)?
.ok_or(TaskError::UnexistingTask(*id))?;
tasks.push(task);
}
Ok((ids, tasks))
})
.await??;
Ok(tasks)
}
pub async fn update_tasks(&self, tasks: Vec<Task>) -> Result<Vec<Task>> {
let store = self.store.clone();
let tasks = tokio::task::spawn_blocking(move || -> Result<_> {
let mut txn = store.wtxn()?;
for task in &tasks {
match task {
Pending::Task(task) => store.put(&mut txn, task)?,
Pending::Job(_) => (),
}
store.put(&mut txn, task)?;
}
txn.commit()?;
@ -229,21 +152,6 @@ impl TaskStore {
Ok(tasks)
}
/// Delete one task from the queue and remove all `Empty` job.
pub async fn delete_pending(&self, to_delete: &Pending<Task>) {
if let Pending::Task(Task { id: pending_id, .. }) = to_delete {
let mut pending_queue = self.pending_queue.write().await;
*pending_queue = std::mem::take(&mut *pending_queue)
.into_iter()
.filter(|pending| match pending {
Pending::Job(Job::Empty) => false,
Pending::Task(id) => pending_id != id,
_ => true,
})
.collect::<BinaryHeap<Pending<TaskId>>>();
}
}
pub async fn list_tasks(
&self,
offset: Option<TaskId>,
@ -348,23 +256,15 @@ pub mod test {
Self::Mock(Arc::new(mocker))
}
pub async fn update_tasks(&self, tasks: Vec<Pending<Task>>) -> Result<Vec<Pending<Task>>> {
pub async fn update_tasks(&self, tasks: Vec<Task>) -> Result<Vec<Task>> {
match self {
Self::Real(s) => s.update_tasks(tasks).await,
Self::Mock(m) => unsafe {
m.get::<_, Result<Vec<Pending<Task>>>>("update_tasks")
.call(tasks)
m.get::<_, Result<Vec<Task>>>("update_tasks").call(tasks)
},
}
}
pub async fn delete_pending(&self, to_delete: &Pending<Task>) {
match self {
Self::Real(s) => s.delete_pending(to_delete).await,
Self::Mock(m) => unsafe { m.get("delete_pending").call(to_delete) },
}
}
pub async fn get_task(&self, id: TaskId, filter: Option<TaskFilter>) -> Result<Task> {
match self {
Self::Real(s) => s.get_task(id, filter).await,
@ -372,23 +272,13 @@ pub mod test {
}
}
pub async fn get_processing_task(&self) -> Result<Option<Task>> {
pub async fn get_pending_tasks(
&self,
tasks: Vec<TaskId>,
) -> Result<(Vec<TaskId>, Vec<Task>)> {
match self {
Self::Real(s) => s.get_processing_task().await,
Self::Mock(m) => unsafe {
m.get::<_, Result<Option<Task>>>("get_pending_task")
.call(())
},
}
}
pub async fn peek_pending_task(&self) -> Option<Pending<TaskId>> {
match self {
Self::Real(s) => s.peek_pending_task().await,
Self::Mock(m) => unsafe {
m.get::<_, Option<Pending<TaskId>>>("peek_pending_task")
.call(())
},
Self::Real(s) => s.get_pending_tasks(tasks).await,
Self::Mock(m) => unsafe { m.get("get_pending_task").call(tasks) },
}
}
@ -400,14 +290,18 @@ pub mod test {
) -> Result<Vec<Task>> {
match self {
Self::Real(s) => s.list_tasks(from, filter, limit).await,
Self::Mock(_m) => todo!(),
Self::Mock(m) => unsafe { m.get("list_tasks").call((from, filter, limit)) },
}
}
pub async fn dump(&self, path: &Path, update_file_store: UpdateFileStore) -> Result<()> {
pub async fn dump(
&self,
path: impl AsRef<Path>,
update_file_store: UpdateFileStore,
) -> Result<()> {
match self {
Self::Real(s) => s.dump(path, update_file_store).await,
Self::Mock(_m) => todo!(),
Self::Mock(m) => unsafe { m.get("dump").call((path, update_file_store)) },
}
}
@ -425,13 +319,6 @@ pub mod test {
}
}
pub async fn register_job(&self, content: Job) {
match self {
Self::Real(s) => s.register_job(content).await,
Self::Mock(_m) => todo!(),
}
}
pub fn load_dump(path: impl AsRef<Path>, env: Arc<Env>) -> anyhow::Result<()> {
TaskStore::load_dump(path, env)
}

View File

@ -19,7 +19,7 @@ use crate::tasks::task::{Task, TaskId};
use super::super::Result;
use super::{Pending, TaskFilter};
use super::TaskFilter;
enum IndexUidTaskIdCodec {}
@ -84,41 +84,6 @@ impl Store {
})
}
/// This function should be called *right after* creating the store.
/// It put back all unfinished update in the `Created` state. This
/// allow us to re-enqueue an update that didn't had the time to finish
/// when Meilisearch closed.
pub fn reset_and_return_unfinished_tasks(&mut self) -> Result<BinaryHeap<Pending<TaskId>>> {
let mut unfinished_tasks: BinaryHeap<Pending<TaskId>> = BinaryHeap::new();
let mut wtxn = self.wtxn()?;
let mut iter = self.tasks.rev_iter_mut(&mut wtxn)?;
while let Some(entry) = iter.next() {
let entry = entry?;
let (id, mut task): (BEU64, Task) = entry;
// Since all tasks are ordered, we can stop iterating when we encounter our first non-finished task.
if task.is_finished() {
break;
}
// we only keep the first state. Its supposed to be a `Created` state.
task.events.drain(1..);
unfinished_tasks.push(Pending::Task(id.get()));
// Since we own the id and the task this is a safe operation.
unsafe {
iter.put_current(&id, &task)?;
}
}
drop(iter);
wtxn.commit()?;
Ok(unfinished_tasks)
}
pub fn wtxn(&self) -> Result<RwTxn> {
Ok(self.env.write_txn()?)
}
@ -166,7 +131,11 @@ impl Store {
.map(|limit| (limit as u64).saturating_add(from))
.unwrap_or(u64::MAX);
let iter: Box<dyn Iterator<Item = StdResult<_, heed::Error>>> = match filter {
Some(filter) => {
Some(
ref filter @ TaskFilter {
indexes: Some(_), ..
},
) => {
let iter = self
.compute_candidates(txn, filter, range)?
.into_iter()
@ -174,15 +143,24 @@ impl Store {
Box::new(iter)
}
None => Box::new(
_ => Box::new(
self.tasks
.rev_range(txn, &(BEU64::new(range.start)..BEU64::new(range.end)))?
.map(|r| r.map(|(_, t)| t)),
),
};
let apply_fitler = |task: &StdResult<_, heed::Error>| match task {
Ok(ref t) => filter
.as_ref()
.and_then(|filter| filter.filter_fn.as_ref())
.map(|f| f(t))
.unwrap_or(true),
Err(_) => true,
};
// Collect 'limit' task if it exists or all of them.
let tasks = iter
.filter(apply_fitler)
.take(limit.unwrap_or(usize::MAX))
.try_fold::<_, _, StdResult<_, heed::Error>>(Vec::new(), |mut v, task| {
v.push(task?);
@ -195,11 +173,11 @@ impl Store {
fn compute_candidates(
&self,
txn: &heed::RoTxn,
filter: TaskFilter,
filter: &TaskFilter,
range: Range<TaskId>,
) -> Result<BinaryHeap<TaskId>> {
let mut candidates = BinaryHeap::new();
if let Some(indexes) = filter.indexes {
if let Some(ref indexes) = filter.indexes {
for index in indexes {
// We need to prefix search the null terminated string to make sure that we only
// get exact matches for the index, and not other uids that would share the same
@ -290,13 +268,6 @@ pub mod test {
Ok(Self::Real(Store::new(env)?))
}
pub fn reset_and_return_unfinished_tasks(&mut self) -> Result<BinaryHeap<Pending<TaskId>>> {
match self {
MockStore::Real(index) => index.reset_and_return_unfinished_tasks(),
MockStore::Fake(_) => todo!(),
}
}
pub fn wtxn(&self) -> Result<RwTxn> {
match self {
MockStore::Real(index) => index.wtxn(),

View File

@ -0,0 +1,107 @@
use std::sync::Arc;
use std::time::Duration;
use chrono::Utc;
use tokio::sync::{watch, RwLock};
use tokio::time::interval_at;
use super::batch::Batch;
use super::error::Result;
use super::scheduler::Pending;
use super::{Scheduler, TaskPerformer};
use crate::tasks::task::TaskEvent;
/// The update loop sequentially performs batches of updates by asking the scheduler for a batch,
/// and handing it to the `TaskPerformer`.
pub struct UpdateLoop<P: TaskPerformer> {
scheduler: Arc<RwLock<Scheduler>>,
performer: Arc<P>,
notifier: Option<watch::Receiver<()>>,
debounce_duration: Option<Duration>,
}
impl<P> UpdateLoop<P>
where
P: TaskPerformer + Send + Sync + 'static,
{
pub fn new(
scheduler: Arc<RwLock<Scheduler>>,
performer: Arc<P>,
debuf_duration: Option<Duration>,
notifier: watch::Receiver<()>,
) -> Self {
Self {
scheduler,
performer,
debounce_duration: debuf_duration,
notifier: Some(notifier),
}
}
pub async fn run(mut self) {
let mut notifier = self.notifier.take().unwrap();
loop {
if notifier.changed().await.is_err() {
break;
}
if let Some(t) = self.debounce_duration {
let mut interval = interval_at(tokio::time::Instant::now() + t, t);
interval.tick().await;
};
if let Err(e) = self.process_next_batch().await {
log::error!("an error occured while processing an update batch: {}", e);
}
}
}
async fn process_next_batch(&self) -> Result<()> {
let pending = { self.scheduler.write().await.prepare().await? };
match pending {
Pending::Batch(mut batch) => {
for task in &mut batch.tasks {
task.events.push(TaskEvent::Processing(Utc::now()));
}
batch.tasks = {
self.scheduler
.read()
.await
.update_tasks(batch.tasks)
.await?
};
let performer = self.performer.clone();
let batch = performer.process_batch(batch).await;
self.handle_batch_result(batch).await?;
}
Pending::Job(job) => {
let performer = self.performer.clone();
performer.process_job(job).await;
}
Pending::Nothing => (),
}
Ok(())
}
/// Handles the result from a processed batch.
///
/// When a task is processed, the result of the process is pushed to its event list. The
/// `handle_batch_result` make sure that the new state is saved to the store.
/// The tasks are then removed from the processing queue.
async fn handle_batch_result(&self, mut batch: Batch) -> Result<()> {
let mut scheduler = self.scheduler.write().await;
let tasks = scheduler.update_tasks(batch.tasks).await?;
scheduler.finish();
drop(scheduler);
batch.tasks = tasks;
self.performer.finish(&batch).await;
Ok(())
}
}