From 0a7843cfe45d0fc2a8b48dd577b597fc13520654 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Mon, 1 Jul 2024 16:38:52 +0200 Subject: [PATCH] Squash in a single commit and rebase --- Cargo.lock | 287 +++++------------ crates/fuzzers/src/bin/fuzz-indexing.rs | 2 +- crates/index-scheduler/src/batch.rs | 10 +- crates/index-scheduler/src/lib.rs | 302 +++++++++++++++--- crates/meilisearch-types/src/lib.rs | 2 +- crates/meilisearch/Cargo.toml | 2 +- .../src/routes/indexes/documents.rs | 71 ++-- crates/meilisearch/src/search/mod.rs | 14 + crates/meilitool/src/main.rs | 9 +- crates/milli/Cargo.toml | 1 + .../src/heed_codec/compressed_obkv_codec.rs | 98 ++++++ crates/milli/src/heed_codec/mod.rs | 4 + crates/milli/src/index.rs | 126 ++++++-- crates/milli/src/lib.rs | 2 +- crates/milli/src/search/new/tests/mod.rs | 9 +- crates/milli/src/snapshot_tests.rs | 10 +- crates/milli/src/update/clear_documents.rs | 1 + .../milli/src/update/index_documents/mod.rs | 235 ++++++++++++-- .../src/update/index_documents/transform.rs | 35 +- .../src/update/index_documents/typed_chunk.rs | 14 +- crates/milli/src/update/new/document.rs | 13 +- .../milli/src/update/new/extract/documents.rs | 2 + crates/milli/src/update/settings.rs | 57 +++- crates/milli/tests/search/query_criteria.rs | 15 +- milli/examples/search.rs | 0 25 files changed, 935 insertions(+), 386 deletions(-) create mode 100644 crates/milli/src/heed_codec/compressed_obkv_codec.rs create mode 100644 milli/examples/search.rs diff --git a/Cargo.lock b/Cargo.lock index de7dabc36..dbca8816b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -80,7 +80,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" dependencies = [ "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -216,7 +216,7 @@ dependencies = [ "actix-router", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -234,17 +234,6 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" -[[package]] -name = "aes" -version = "0.8.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" -dependencies = [ - "cfg-if", - "cipher", - "cpufeatures", -] - [[package]] name = "ahash" version = "0.7.8" @@ -296,9 +285,9 @@ dependencies = [ [[package]] name = "allocator-api2" -version = "0.2.18" +version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" [[package]] name = "anes" @@ -441,7 +430,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -557,7 +546,7 @@ dependencies = [ "regex", "rustc-hash 1.1.0", "shlex", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -651,7 +640,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", "syn_derive", ] @@ -715,7 +704,7 @@ dependencies = [ "allocator-api2", "bitpacking", "bumpalo", - "hashbrown 0.15.1", + "hashbrown 0.15.2", "serde", "serde_json", ] @@ -761,9 +750,9 @@ checksum = "2c676a478f63e9fa2dd5368a42f28bba0d6c560b775f38583c8bbaa7fcd67c9c" [[package]] name = "bytemuck" -version = "1.19.0" +version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8334215b81e418a0a7bdb8ef0849474f40bb10c8b71f1c4ed315cff49f32494d" +checksum = "8b37c88a63ffd85d15b406896cc343916d7cf57838a847b3a6f2ca5d39a5695a" dependencies = [ "bytemuck_derive", ] @@ -776,7 +765,7 @@ checksum = "4da9a32f3fed317401fa3c862968128267c3106685286e15d5aaa3d7389c2f60" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -800,27 +789,6 @@ dependencies = [ "bytes", ] -[[package]] -name = "bzip2" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdb116a6ef3f6c3698828873ad02c3014b3c85cadb88496095628e3ef1e347f8" -dependencies = [ - "bzip2-sys", - "libc", -] - -[[package]] -name = "bzip2-sys" -version = "0.1.11+1.0.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "736a955f3fa7875102d57c82b8cac37ec45224a07fd32d58f9f7a186b6cd4cdc" -dependencies = [ - "cc", - "libc", - "pkg-config", -] - [[package]] name = "camino" version = "1.1.6" @@ -937,9 +905,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.104" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "74b6a57f98764a267ff415d50a25e6e166f3831a5071af4995296ea97d210490" +checksum = "eaff6f8ce506b9773fa786672d63fc7a191ffea1be33f72bbd4aeacefca9ffc8" dependencies = [ "jobserver", "libc", @@ -1035,16 +1003,6 @@ dependencies = [ "half 1.8.2", ] -[[package]] -name = "cipher" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" -dependencies = [ - "crypto-common", - "inout", -] - [[package]] name = "clang-sys" version = "1.7.0" @@ -1087,7 +1045,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -1158,12 +1116,6 @@ dependencies = [ "tiny-keccak", ] -[[package]] -name = "constant_time_eq" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7144d30dcf0fafbce74250a3963025d8d52177934239851c917d29f1df280c2" - [[package]] name = "convert_case" version = "0.4.0" @@ -1205,21 +1157,6 @@ dependencies = [ "libc", ] -[[package]] -name = "crc" -version = "3.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "69e6e4d7b33a94f0991c26729976b10ebde1d34c3ee82408fb536164fa10d636" -dependencies = [ - "crc-catalog", -] - -[[package]] -name = "crc-catalog" -version = "2.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" - [[package]] name = "crc32fast" version = "1.4.2" @@ -1400,7 +1337,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.11.1", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -1422,7 +1359,7 @@ checksum = "733cabb43482b1a1b53eee8583c2b9e8684d592215ea83efd305dd31bc2f0178" dependencies = [ "darling_core 0.20.9", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -1452,12 +1389,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "deflate64" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83ace6c86376be0b6cdcf3fb41882e81d94b31587573d1cfa9d01cd06bba210d" - [[package]] name = "deranged" version = "0.3.11" @@ -1476,7 +1407,7 @@ checksum = "67e77553c4162a157adbf834ebae5b415acbecbeafc7a74b0e886657506a7611" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -1518,7 +1449,7 @@ dependencies = [ "darling 0.20.9", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -1538,7 +1469,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "206868b8242f27cecce124c19fd88157fbd0dd334df2587f36417bafbc85097b" dependencies = [ "derive_builder_core 0.20.0", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -1580,7 +1511,7 @@ dependencies = [ "convert_case 0.6.0", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -1644,7 +1575,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -1802,7 +1733,7 @@ dependencies = [ "heck 0.4.1", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -1822,7 +1753,7 @@ checksum = "a1ab991c1362ac86c61ab6f556cff143daa22e5a15e4e189df818b2fd19fe65b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -1833,9 +1764,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.8" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" +checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" dependencies = [ "libc", "windows-sys 0.52.0", @@ -1870,9 +1801,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "file-store" @@ -2022,7 +1953,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -2311,9 +2242,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.4.5" +version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" +checksum = "ccae279728d634d083c00f6099cb58f01cc99c145b84b8be2f6c74618d79922e" dependencies = [ "atomic-waker", "bytes", @@ -2378,9 +2309,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.15.1" +version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3" +checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" dependencies = [ "allocator-api2", "equivalent", @@ -2550,7 +2481,7 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "h2 0.4.5", + "h2 0.4.7", "http 1.1.0", "http-body", "httparse", @@ -2666,7 +2597,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62f822373a4fe84d4bb149bf54e584a7f4abec90e072ed49cda0edea5b95471f" dependencies = [ "equivalent", - "hashbrown 0.15.1", + "hashbrown 0.15.2", "serde", ] @@ -2683,15 +2614,6 @@ dependencies = [ "unicode-width", ] -[[package]] -name = "inout" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" -dependencies = [ - "generic-array", -] - [[package]] name = "insta" version = "1.39.0" @@ -2890,9 +2812,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.164" +version = "0.2.168" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" +checksum = "5aaeb2981e0606ca11d79718f8bb01164f1d6ed75080182d3abf017e6d244b6d" [[package]] name = "libgit2-sys" @@ -3319,7 +3241,7 @@ checksum = "915f6d0a2963a27cd5205c1902f32ddfe3bc035816afd268cf88c0fc0f8d287e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -3388,16 +3310,6 @@ version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" -[[package]] -name = "lzma-rs" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "297e814c836ae64db86b36cf2a557ba54368d03f6afcd7d947c266692f71115e" -dependencies = [ - "byteorder", - "crc", -] - [[package]] name = "macro_rules_attribute" version = "0.2.0" @@ -3423,7 +3335,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -3656,7 +3568,7 @@ dependencies = [ "fxhash", "geoutils", "grenad", - "hashbrown 0.15.1", + "hashbrown 0.15.2", "heed", "hf-hub", "indexmap", @@ -3698,6 +3610,7 @@ dependencies = [ "ureq", "url", "uuid", + "zstd", ] [[package]] @@ -3770,7 +3683,7 @@ checksum = "371717c0a5543d6a800cac822eac735aa7d2d2fbb41002e9856a4089532dbdce" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -3906,7 +3819,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -4054,16 +3967,6 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "498a099351efa4becc6a19c72aa9270598e8fd274ca47052e37455241c88b696" -[[package]] -name = "pbkdf2" -version = "0.12.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" -dependencies = [ - "digest", - "hmac", -] - [[package]] name = "pem" version = "3.0.3" @@ -4118,7 +4021,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -4172,7 +4075,7 @@ dependencies = [ "phf_shared", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -4201,7 +4104,7 @@ checksum = "266c042b60c9c76b8d53061e52b2e0d1116abc57cefc8c5cd671619a56ac3690" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -4318,9 +4221,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.89" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e" +checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" dependencies = [ "unicode-ident", ] @@ -4422,14 +4325,14 @@ dependencies = [ [[package]] name = "quinn-proto" -version = "0.11.8" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fadfaed2cd7f389d0161bb73eeb07b7b78f8691047a6f3e73caaeae55310a4a6" +checksum = "ddf517c03a109db8100448a4be38d498df8a210a99fe0e1b9eaf39e78c640efe" dependencies = [ "bytes", "rand", "ring", - "rustc-hash 2.1.0", + "rustc-hash 1.1.0", "rustls", "slab", "thiserror", @@ -4614,9 +4517,9 @@ dependencies = [ [[package]] name = "regex-lite" -version = "0.1.6" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53a49587ad06b26609c52e423de037e7f57f20d53535d66e08c695f347df952a" +checksum = "30b661b2f27137bdbc16f00eda72866a92bb28af1753ffbd56744fb6e2e9cd8e" [[package]] name = "regex-syntax" @@ -4702,7 +4605,7 @@ source = "git+https://github.com/rhaiscript/rhai?rev=ef3df63121d27aacd838f366f2b dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -4751,9 +4654,9 @@ dependencies = [ [[package]] name = "roaring" -version = "0.10.7" +version = "0.10.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f81dc953b2244ddd5e7860cb0bb2a790494b898ef321d4aff8e260efab60cc88" +checksum = "395b0c39c00f9296f3937624c1fa4e0ee44f8c0e4b2c49408179ef381c6c2e6e" dependencies = [ "bytemuck", "byteorder", @@ -4817,9 +4720,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.41" +version = "0.38.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7f649912bc1495e167a6edee79151c84b1bad49748cb4f1f1167f459f6224f6" +checksum = "f93dc38ecbab2eb790ff964bb77fa94faf256fd3e73285fd7ba0903b76bedb85" dependencies = [ "bitflags 2.6.0", "errno", @@ -4944,9 +4847,9 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.214" +version = "1.0.216" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f55c3193aca71c12ad7890f1785d2b73e1b9f63a0bbc353c08ef26fe03fc56b5" +checksum = "0b9781016e935a97e8beecf0c933758c97a5520d32930e460142b4cd80c6338e" dependencies = [ "serde_derive", ] @@ -4962,13 +4865,13 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.214" +version = "1.0.216" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de523f781f095e28fa605cdce0f8307e451cc0fd14e2eb4cd2e98a355b147766" +checksum = "46f859dbbf73865c6627ed570e78961cd3ac92407a2d117204c49232485da55e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -5265,7 +5168,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -5287,9 +5190,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.87" +version = "2.0.90" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" +checksum = "919d3b74a5dd0ccd15aeb8f93e7006bd9e14c295087c9896a110f490752bcf31" dependencies = [ "proc-macro2", "quote", @@ -5305,7 +5208,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -5331,7 +5234,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -5437,7 +5340,7 @@ checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -5589,7 +5492,7 @@ checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -5721,7 +5624,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -6076,7 +5979,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", "wasm-bindgen-shared", ] @@ -6110,7 +6013,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -6441,13 +6344,13 @@ dependencies = [ [[package]] name = "wiremock" -version = "0.6.0" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec874e1eef0df2dcac546057fe5e29186f09c378181cd7b635b4b7bcc98e9d81" +checksum = "7fff469918e7ca034884c7fd8f93fe27bacb7fcb599fd879df6c7b429a29b646" dependencies = [ "assert-json-diff", "async-trait", - "base64 0.21.7", + "base64 0.22.1", "deadpool", "futures", "http 1.1.0", @@ -6543,7 +6446,7 @@ checksum = "9e6936f0cce458098a201c245a11bef556c6a0181129c7034d10d76d1ec3a2b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", "synstructure", ] @@ -6564,7 +6467,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", ] [[package]] @@ -6584,7 +6487,7 @@ checksum = "e6a647510471d372f2e6c2e6b7219e44d8c574d24fdc11c610a61455782f18c3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.87", + "syn 2.0.90", "synstructure", ] @@ -6593,20 +6496,6 @@ name = "zeroize" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" -dependencies = [ - "zeroize_derive", -] - -[[package]] -name = "zeroize_derive" -version = "1.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.87", -] [[package]] name = "zip" @@ -6629,27 +6518,15 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "775a2b471036342aa69bc5a602bc889cb0a06cda00477d0c69566757d5553d39" dependencies = [ - "aes", "arbitrary", - "bzip2", - "constant_time_eq", "crc32fast", "crossbeam-utils", - "deflate64", "displaydoc", "flate2", - "hmac", "indexmap", - "lzma-rs", "memchr", - "pbkdf2", - "rand", - "sha1", "thiserror", - "time", - "zeroize", "zopfli", - "zstd", ] [[package]] @@ -6668,18 +6545,18 @@ dependencies = [ [[package]] name = "zstd" -version = "0.13.2" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fcf2b778a664581e31e389454a7072dab1647606d44f7feea22cd5abb9c9f3f9" +checksum = "2d789b1514203a1120ad2429eae43a7bd32b90976a7bb8a05f7ec02fa88cc23a" dependencies = [ "zstd-safe", ] [[package]] name = "zstd-safe" -version = "7.2.0" +version = "7.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa556e971e7b568dc775c136fc9de8c779b1c2fc3a63defaafadffdbd3181afa" +checksum = "1cd99b45c6bc03a018c8b8a86025678c87e55526064e38f9df301989dce7ec0a" dependencies = [ "zstd-sys", ] diff --git a/crates/fuzzers/src/bin/fuzz-indexing.rs b/crates/fuzzers/src/bin/fuzz-indexing.rs index ee927940f..b5624c35c 100644 --- a/crates/fuzzers/src/bin/fuzz-indexing.rs +++ b/crates/fuzzers/src/bin/fuzz-indexing.rs @@ -149,7 +149,7 @@ fn main() { // after executing a batch we check if the database is corrupted let res = index.search(&wtxn).execute().unwrap(); - index.documents(&wtxn, res.documents_ids).unwrap(); + index.compressed_documents(&wtxn, res.documents_ids).unwrap(); progression.fetch_add(1, Ordering::Relaxed); } wtxn.abort(); diff --git a/crates/index-scheduler/src/batch.rs b/crates/index-scheduler/src/batch.rs index 93e9a1404..2efcefd94 100644 --- a/crates/index-scheduler/src/batch.rs +++ b/crates/index-scheduler/src/batch.rs @@ -830,7 +830,9 @@ impl IndexScheduler { let mut index_dumper = dump.create_index(uid, &metadata)?; let fields_ids_map = index.fields_ids_map(&rtxn)?; + let dictionary = index.document_decompression_dictionary(&rtxn)?; let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); + let mut buffer = Vec::new(); let embedding_configs = index .embedding_configs(&rtxn) .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; @@ -839,13 +841,17 @@ impl IndexScheduler { .all_documents(&rtxn) .map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; // 3.1. Dump the documents - for ret in documents { + for ret in index.all_compressed_documents(&rtxn)? { if self.must_stop_processing.get() { return Err(Error::AbortedTask); } - let (id, doc) = + let (id, compressed) = ret.map_err(|e| Error::from_milli(e, Some(uid.to_string())))?; + let doc = compressed.decompress_with_optional_dictionary( + &mut buffer, + dictionary.as_ref(), + )?; let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, doc) diff --git a/crates/index-scheduler/src/lib.rs b/crates/index-scheduler/src/lib.rs index e780b21a1..941405357 100644 --- a/crates/index-scheduler/src/lib.rs +++ b/crates/index-scheduler/src/lib.rs @@ -2970,12 +2970,20 @@ mod tests { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -3030,12 +3038,20 @@ mod tests { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -3520,12 +3536,20 @@ mod tests { // has everything being pushed successfully in milli? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -3571,12 +3595,20 @@ mod tests { // has everything being pushed successfully in milli? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -3627,12 +3659,20 @@ mod tests { // has everything being pushed successfully in milli? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -3747,12 +3787,20 @@ mod tests { // has everything being pushed successfully in milli? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -3802,12 +3850,20 @@ mod tests { // has everything being pushed successfully in milli? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -4984,12 +5040,20 @@ mod tests { // Has everything being pushed successfully in milli? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -5055,12 +5119,20 @@ mod tests { // Has everything being pushed successfully in milli? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -5123,12 +5195,20 @@ mod tests { // Has everything being pushed successfully in milli? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -5184,12 +5264,20 @@ mod tests { // Has everything being pushed successfully in milli? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -5243,6 +5331,8 @@ mod tests { // Is the primary key still what we expect? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let primary_key = index.primary_key(&rtxn).unwrap().unwrap(); snapshot!(primary_key, @"id"); @@ -5250,9 +5340,15 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -5303,6 +5399,8 @@ mod tests { // Is the primary key still what we expect? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let primary_key = index.primary_key(&rtxn).unwrap().unwrap(); snapshot!(primary_key, @"id"); @@ -5310,9 +5408,15 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -5387,6 +5491,8 @@ mod tests { // Is the primary key still what we expect? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let primary_key = index.primary_key(&rtxn).unwrap().unwrap(); snapshot!(primary_key, @"id"); @@ -5394,9 +5500,15 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -5473,6 +5585,8 @@ mod tests { // Is the primary key still what we expect? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let primary_key = index.primary_key(&rtxn).unwrap().unwrap(); snapshot!(primary_key, @"paw"); @@ -5480,9 +5594,15 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -5552,6 +5672,8 @@ mod tests { // Is the primary key still what we expect? let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let primary_key = index.primary_key(&rtxn).unwrap().unwrap(); snapshot!(primary_key, @"doggoid"); @@ -5559,9 +5681,15 @@ mod tests { let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string_pretty(&documents).unwrap(), name: "documents"); } @@ -6243,6 +6371,8 @@ mod tests { { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); // Ensure the document have been inserted into the relevant bitamp let configs = index.embedding_configs(&rtxn).unwrap(); @@ -6262,8 +6392,12 @@ mod tests { assert_json_snapshot!(embeddings[&simple_hf_name][0] == lab_embed, @"true"); assert_json_snapshot!(embeddings[&fakerest_name][0] == beagle_embed, @"true"); - let doc = index.documents(&rtxn, std::iter::once(0)).unwrap()[0].1; + let (_id, compressed_doc) = + index.compressed_documents(&rtxn, std::iter::once(0)).unwrap().remove(0); let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); let doc = obkv_to_json( &[ fields_ids_map.id("doggo").unwrap(), @@ -6317,6 +6451,8 @@ mod tests { { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); // Ensure the document have been inserted into the relevant bitamp let configs = index.embedding_configs(&rtxn).unwrap(); @@ -6339,8 +6475,12 @@ mod tests { // remained beagle assert_json_snapshot!(embeddings[&fakerest_name][0] == beagle_embed, @"true"); - let doc = index.documents(&rtxn, std::iter::once(0)).unwrap()[0].1; + let (_id, compressed_doc) = + index.compressed_documents(&rtxn, std::iter::once(0)).unwrap().remove(0); let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); let doc = obkv_to_json( &[ fields_ids_map.id("doggo").unwrap(), @@ -6432,12 +6572,20 @@ mod tests { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string(&documents).unwrap(), name: "documents after initial push"); @@ -6471,12 +6619,20 @@ mod tests { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); // the all the vectors linked to the new specified embedder have been removed // Only the unknown embedders stays in the document DB @@ -6563,9 +6719,15 @@ mod tests { // the document with the id 3 should have its original embedding updated let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let docid = index.external_documents_ids.get(&rtxn, "3").unwrap().unwrap(); - let doc = index.documents(&rtxn, Some(docid)).unwrap()[0]; - let doc = obkv_to_json(&field_ids, &field_ids_map, doc.1).unwrap(); + let (_id, compressed_doc) = + index.compressed_documents(&rtxn, Some(docid)).unwrap().remove(0); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + let doc = obkv_to_json(&field_ids, &field_ids_map, doc).unwrap(); snapshot!(json_string!(doc), @r###" { "id": 3, @@ -6677,12 +6839,20 @@ mod tests { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string(&documents).unwrap(), @r###"[{"id":0,"doggo":"kefir"}]"###); let conf = index.embedding_configs(&rtxn).unwrap(); @@ -6721,12 +6891,20 @@ mod tests { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string(&documents).unwrap(), @"[]"); let conf = index.embedding_configs(&rtxn).unwrap(); @@ -6841,12 +7019,20 @@ mod tests { { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string(&documents).unwrap(), @r###"[{"id":0,"doggo":"kefir"},{"id":1,"doggo":"intel"}]"###); } @@ -6876,12 +7062,20 @@ mod tests { { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); snapshot!(serde_json::to_string(&documents).unwrap(), @r###"[{"id":0,"doggo":"kefir","_vectors":{"manual":{"embeddings":[[0.0,0.0,0.0]],"regenerate":false}}},{"id":1,"doggo":"intel","_vectors":{"manual":{"embeddings":[[1.0,1.0,1.0]],"regenerate":false}}}]"###); } @@ -6909,12 +7103,20 @@ mod tests { { let index = index_scheduler.index("doggos").unwrap(); let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let field_ids_map = index.fields_ids_map(&rtxn).unwrap(); let field_ids = field_ids_map.ids().collect::>(); let documents = index - .all_documents(&rtxn) + .all_compressed_documents(&rtxn) .unwrap() - .map(|ret| obkv_to_json(&field_ids, &field_ids_map, ret.unwrap().1).unwrap()) + .map(|ret| { + let (_id, compressed_doc) = ret.unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + obkv_to_json(&field_ids, &field_ids_map, doc).unwrap() + }) .collect::>(); // FIXME: redaction diff --git a/crates/meilisearch-types/src/lib.rs b/crates/meilisearch-types/src/lib.rs index a1a57b7e6..f57429c85 100644 --- a/crates/meilisearch-types/src/lib.rs +++ b/crates/meilisearch-types/src/lib.rs @@ -15,7 +15,7 @@ pub mod star_or; pub mod task_view; pub mod tasks; pub mod versioning; -pub use milli::{heed, Index}; +pub use milli::{heed, zstd, Index}; use uuid::Uuid; pub use versioning::VERSION_FILE_NAME; pub use {milli, serde_cs}; diff --git a/crates/meilisearch/Cargo.toml b/crates/meilisearch/Cargo.toml index 68ca8e136..38bb0d7c7 100644 --- a/crates/meilisearch/Cargo.toml +++ b/crates/meilisearch/Cargo.toml @@ -129,7 +129,7 @@ reqwest = { version = "0.12.5", features = [ sha-1 = { version = "0.10.1", optional = true } static-files = { version = "0.2.4", optional = true } tempfile = { version = "3.10.1", optional = true } -zip = { version = "2.1.3", optional = true } +zip = { version = "2.1.3", default-features = false, features = ["deflate"], optional = true } [features] default = ["meilisearch-types/all-tokenizations", "mini-dashboard"] diff --git a/crates/meilisearch/src/routes/indexes/documents.rs b/crates/meilisearch/src/routes/indexes/documents.rs index 5f79000bd..fa78e0fcd 100644 --- a/crates/meilisearch/src/routes/indexes/documents.rs +++ b/crates/meilisearch/src/routes/indexes/documents.rs @@ -973,44 +973,51 @@ fn some_documents<'a, 't: 'a>( retrieve_vectors: RetrieveVectors, ) -> Result> + 'a, ResponseError> { let fields_ids_map = index.fields_ids_map(rtxn)?; + let dictionary = index.document_decompression_dictionary(rtxn)?; let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); let embedding_configs = index.embedding_configs(rtxn)?; + let mut buffer = Vec::new(); - Ok(index.iter_documents(rtxn, doc_ids)?.map(move |ret| { - ret.map_err(ResponseError::from).and_then(|(key, document)| -> Result<_, ResponseError> { - let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, document)?; - match retrieve_vectors { - RetrieveVectors::Ignore => {} - RetrieveVectors::Hide => { - document.remove("_vectors"); - } - RetrieveVectors::Retrieve => { - // Clippy is simply wrong - #[allow(clippy::manual_unwrap_or_default)] - let mut vectors = match document.remove("_vectors") { - Some(Value::Object(map)) => map, - _ => Default::default(), - }; - for (name, vector) in index.embeddings(rtxn, key)? { - let user_provided = embedding_configs - .iter() - .find(|conf| conf.name == name) - .is_some_and(|conf| conf.user_provided.contains(key)); - let embeddings = ExplicitVectors { - embeddings: Some(vector.into()), - regenerate: !user_provided, - }; - vectors.insert( - name, - serde_json::to_value(embeddings).map_err(MeilisearchHttpError::from)?, - ); + Ok(index.iter_compressed_documents(rtxn, doc_ids)?.map(move |ret| { + ret.map_err(ResponseError::from).and_then( + |(key, compressed_document)| -> Result<_, ResponseError> { + let document = compressed_document + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())?; + let mut document = milli::obkv_to_json(&all_fields, &fields_ids_map, document)?; + match retrieve_vectors { + RetrieveVectors::Ignore => {} + RetrieveVectors::Hide => { + document.remove("_vectors"); + } + RetrieveVectors::Retrieve => { + // Clippy is simply wrong + #[allow(clippy::manual_unwrap_or_default)] + let mut vectors = match document.remove("_vectors") { + Some(Value::Object(map)) => map, + _ => Default::default(), + }; + for (name, vector) in index.embeddings(rtxn, key)? { + let user_provided = embedding_configs + .iter() + .find(|conf| conf.name == name) + .is_some_and(|conf| conf.user_provided.contains(key)); + let embeddings = ExplicitVectors { + embeddings: Some(vector.into()), + regenerate: !user_provided, + }; + vectors.insert( + name, + serde_json::to_value(embeddings) + .map_err(MeilisearchHttpError::from)?, + ); + } + document.insert("_vectors".into(), vectors.into()); } - document.insert("_vectors".into(), vectors.into()); } - } - Ok(document) - }) + Ok(document) + }, + ) })) } diff --git a/crates/meilisearch/src/search/mod.rs b/crates/meilisearch/src/search/mod.rs index 674ae226b..9ee28a22a 100644 --- a/crates/meilisearch/src/search/mod.rs +++ b/crates/meilisearch/src/search/mod.rs @@ -1296,6 +1296,20 @@ impl<'a> HitMaker<'a> { let (_, obkv) = self.index.iter_documents(self.rtxn, std::iter::once(id))?.next().unwrap()?; + // let mut formatter_builder = MatcherBuilder::new(matching_words, tokenizer_builder.build()); + // formatter_builder.crop_marker(format.crop_marker); + // formatter_builder.highlight_prefix(format.highlight_pre_tag); + // formatter_builder.highlight_suffix(format.highlight_post_tag); + // let decompression_dictionary = index.document_decompression_dictionary(rtxn)?; + // let mut buffer = Vec::new(); + // let mut documents = Vec::new(); + // let embedding_configs = index.embedding_configs(rtxn)?; + // let documents_iter = index.compressed_documents(rtxn, documents_ids)?; + // for ((id, compressed), score) in documents_iter.into_iter().zip(document_scores.into_iter()) { + // let obkv = compressed + // .decompress_with_optional_dictionary(&mut buffer, decompression_dictionary.as_ref()) + // // TODO use a better error? + // .map_err(|e| MeilisearchHttpError::HeedError(e.into()))?; // First generate a document with all the displayed fields let displayed_document = make_document(&self.displayed_ids, &self.fields_ids_map, obkv)?; diff --git a/crates/meilitool/src/main.rs b/crates/meilitool/src/main.rs index 44eb4960e..4938e03eb 100644 --- a/crates/meilitool/src/main.rs +++ b/crates/meilitool/src/main.rs @@ -280,6 +280,7 @@ fn export_a_dump( // 4. Dump the indexes let mut count = 0; + let mut buffer = Vec::new(); for result in index_mapping.iter(&rtxn)? { let (uid, uuid) = result?; let index_path = db_path.join("indexes").join(uuid.to_string()); @@ -288,6 +289,7 @@ fn export_a_dump( })?; let rtxn = index.read_txn()?; + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let metadata = IndexMetadata { uid: uid.to_owned(), primary_key: index.primary_key(&rtxn)?.map(String::from), @@ -300,8 +302,11 @@ fn export_a_dump( let all_fields: Vec<_> = fields_ids_map.iter().map(|(id, _)| id).collect(); // 4.1. Dump the documents - for ret in index.all_documents(&rtxn)? { - let (_id, doc) = ret?; + for ret in index.all_compressed_documents(&rtxn)? { + let (_id, compressed_doc) = ret?; + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); let document = obkv_to_json(&all_fields, &fields_ids_map, doc)?; index_dumper.push_document(&document)?; } diff --git a/crates/milli/Cargo.toml b/crates/milli/Cargo.toml index 9f113e013..f2aaf149d 100644 --- a/crates/milli/Cargo.toml +++ b/crates/milli/Cargo.toml @@ -36,6 +36,7 @@ heed = { version = "0.20.3", default-features = false, features = [ indexmap = { version = "2.2.6", features = ["serde"] } json-depth-checker = { path = "../json-depth-checker" } levenshtein_automata = { version = "0.2.1", features = ["fst_automaton"] } +zstd = { version = "0.13.1", features = ["zdict_builder", "experimental"] } memchr = "2.5.0" memmap2 = "0.9.4" obkv = "0.3.0" diff --git a/crates/milli/src/heed_codec/compressed_obkv_codec.rs b/crates/milli/src/heed_codec/compressed_obkv_codec.rs new file mode 100644 index 000000000..8ddd94d92 --- /dev/null +++ b/crates/milli/src/heed_codec/compressed_obkv_codec.rs @@ -0,0 +1,98 @@ +use std::borrow::Cow; +use std::io; +use std::io::ErrorKind; + +use heed::BoxedError; +use obkv::KvReaderU16; +use zstd::bulk::{Compressor, Decompressor}; +use zstd::dict::{DecoderDictionary, EncoderDictionary}; + +pub struct CompressedObkvCodec; + +impl<'a> heed::BytesDecode<'a> for CompressedObkvCodec { + type DItem = CompressedKvReaderU16<'a>; + + fn bytes_decode(bytes: &'a [u8]) -> Result { + Ok(CompressedKvReaderU16(bytes)) + } +} + +impl heed::BytesEncode<'_> for CompressedObkvCodec { + type EItem = CompressedKvWriterU16; + + fn bytes_encode(item: &Self::EItem) -> Result, BoxedError> { + Ok(Cow::Borrowed(&item.0)) + } +} + +// TODO Make this an unsized slice wrapper instead? +// &'a CompressedKvReaderU16([u8]) +pub struct CompressedKvReaderU16<'a>(&'a [u8]); + +impl<'a> CompressedKvReaderU16<'a> { + /// Decompresses the KvReader into the buffer using the provided dictionnary. + pub fn decompress_with<'b>( + &self, + buffer: &'b mut Vec, + dictionary: &DecoderDictionary, + ) -> io::Result<&'b KvReaderU16> { + const TWO_GIGABYTES: usize = 2 * 1024 * 1024 * 1024; + + let mut decompressor = Decompressor::with_prepared_dictionary(dictionary)?; + let mut max_size = self.0.len() * 4; + let size = loop { + buffer.resize(max_size, 0); + match decompressor.decompress_to_buffer(self.0, &mut buffer[..max_size]) { + Ok(size) => break size, + // TODO don't do that !!! But what should I do? + Err(e) if e.kind() == ErrorKind::Other && max_size <= TWO_GIGABYTES => { + max_size *= 2 + } + Err(e) => return Err(e), + } + }; + Ok(KvReaderU16::from_slice(&buffer[..size])) + } + + /// Returns the KvReader like it is not compressed. + /// Happends when there is no dictionary yet. + pub fn as_non_compressed(&self) -> &'a KvReaderU16 { + KvReaderU16::from_slice(self.0) + } + + /// Decompresses this KvReader if necessary. + pub fn decompress_with_optional_dictionary<'b>( + &self, + buffer: &'b mut Vec, + dictionary: Option<&DecoderDictionary>, + ) -> io::Result<&'b KvReaderU16> + where + 'a: 'b, + { + match dictionary { + Some(dict) => self.decompress_with(buffer, dict), + None => Ok(self.as_non_compressed()), + } + } + + pub fn decompress_as_owned_with_optinal_dictionary( + &self, + dictionary: Option<&DecoderDictionary>, + ) -> io::Result> { + todo!("Impl owned version of KvReader") + } +} + +pub struct CompressedKvWriterU16(Vec); + +impl CompressedKvWriterU16 { + // TODO ask for a KvReaderU16 here + pub fn new_with_dictionary(input: &[u8], dictionary: &EncoderDictionary) -> io::Result { + let mut compressor = Compressor::with_prepared_dictionary(dictionary)?; + compressor.compress(input).map(CompressedKvWriterU16) + } + + pub fn as_bytes(&self) -> &[u8] { + &self.0 + } +} diff --git a/crates/milli/src/heed_codec/mod.rs b/crates/milli/src/heed_codec/mod.rs index 575b886bd..3ce9306dd 100644 --- a/crates/milli/src/heed_codec/mod.rs +++ b/crates/milli/src/heed_codec/mod.rs @@ -1,6 +1,7 @@ mod beu16_str_codec; mod beu32_str_codec; mod byte_slice_ref; +mod compressed_obkv_codec; pub mod facet; mod field_id_word_count_codec; mod fst_set_codec; @@ -18,6 +19,9 @@ use thiserror::Error; pub use self::beu16_str_codec::BEU16StrCodec; pub use self::beu32_str_codec::BEU32StrCodec; +pub use self::compressed_obkv_codec::{ + CompressedKvReaderU16, CompressedKvWriterU16, CompressedObkvCodec, +}; pub use self::field_id_word_count_codec::FieldIdWordCountCodec; pub use self::fst_set_codec::FstSetCodec; pub use self::obkv_codec::ObkvCodec; diff --git a/crates/milli/src/index.rs b/crates/milli/src/index.rs index 268d33cd9..a811a8d66 100644 --- a/crates/milli/src/index.rs +++ b/crates/milli/src/index.rs @@ -9,6 +9,7 @@ use heed::{CompactionOption, Database, RoTxn, RwTxn, Unspecified}; use roaring::RoaringBitmap; use rstar::RTree; use serde::{Deserialize, Serialize}; +use zstd::dict::{DecoderDictionary, EncoderDictionary}; use crate::documents::PrimaryKey; use crate::error::{InternalError, UserError}; @@ -17,7 +18,10 @@ use crate::heed_codec::facet::{ FacetGroupKeyCodec, FacetGroupValueCodec, FieldDocIdFacetF64Codec, FieldDocIdFacetStringCodec, FieldIdCodec, OrderedF64Codec, }; -use crate::heed_codec::{BEU16StrCodec, FstSetCodec, StrBEU16Codec, StrRefCodec}; +use crate::heed_codec::{ + BEU16StrCodec, CompressedKvReaderU16, CompressedObkvCodec, FstSetCodec, StrBEU16Codec, + StrRefCodec, +}; use crate::order_by_map::OrderByMap; use crate::proximity::ProximityPrecision; use crate::vector::parsed_vectors::RESERVED_VECTORS_FIELD_NAME; @@ -25,7 +29,7 @@ use crate::vector::{ArroyWrapper, Embedding, EmbeddingConfig}; use crate::{ default_criteria, CboRoaringBitmapCodec, Criterion, DocumentId, ExternalDocumentsIds, FacetDistribution, FieldDistribution, FieldId, FieldIdMapMissingEntry, FieldIdWordCountCodec, - FieldidsWeightsMap, GeoPoint, LocalizedAttributesRule, ObkvCodec, Result, RoaringBitmapCodec, + FieldidsWeightsMap, GeoPoint, LocalizedAttributesRule, Result, RoaringBitmapCodec, RoaringBitmapLenCodec, Search, U8StrStrCodec, Weight, BEU16, BEU32, BEU64, }; @@ -69,6 +73,7 @@ pub mod main_key { pub const PROXIMITY_PRECISION: &str = "proximity-precision"; pub const EMBEDDING_CONFIGS: &str = "embedding_configs"; pub const SEARCH_CUTOFF: &str = "search_cutoff"; + pub const DOCUMENT_COMPRESSION_DICTIONARY: &str = "document-compression-dictionary"; pub const LOCALIZED_ATTRIBUTES_RULES: &str = "localized_attributes_rules"; pub const FACET_SEARCH: &str = "facet_search"; pub const PREFIX_SEARCH: &str = "prefix_search"; @@ -167,7 +172,7 @@ pub struct Index { pub vector_arroy: arroy::Database, /// Maps the document id to the document as an obkv store. - pub(crate) documents: Database, + pub(crate) documents: Database, } impl Index { @@ -331,6 +336,50 @@ impl Index { self.env.prepare_for_closing() } + /* document compression dictionary */ + + /// Writes the dictionnary that will further be used to compress the documents. + pub fn put_document_compression_dictionary( + &self, + wtxn: &mut RwTxn, + dictionary: &[u8], + ) -> heed::Result<()> { + self.main.remap_types::().put( + wtxn, + main_key::DOCUMENT_COMPRESSION_DICTIONARY, + dictionary, + ) + } + + /// Deletes the document compression dictionary. + pub fn delete_document_compression_dictionary(&self, wtxn: &mut RwTxn) -> heed::Result { + self.main.remap_key_type::().delete(wtxn, main_key::DOCUMENT_COMPRESSION_DICTIONARY) + } + + /// Returns the optional raw bytes dictionary to be used when reading or writing the OBKV documents. + pub fn document_compression_raw_dictionary<'t>( + &self, + rtxn: &'t RoTxn, + ) -> heed::Result> { + self.main.remap_types::().get(rtxn, main_key::DOCUMENT_COMPRESSION_DICTIONARY) + } + + pub fn document_decompression_dictionary<'t>( + &self, + rtxn: &'t RoTxn, + ) -> heed::Result>> { + self.document_compression_raw_dictionary(rtxn).map(|opt| opt.map(DecoderDictionary::new)) + } + + pub fn document_compression_dictionary( + &self, + rtxn: &RoTxn, + ) -> heed::Result>> { + const COMPRESSION_LEVEL: i32 = 19; + self.document_compression_raw_dictionary(rtxn) + .map(|opt| opt.map(|bytes| EncoderDictionary::copy(bytes, COMPRESSION_LEVEL))) + } + /* documents ids */ /// Writes the documents ids that corresponds to the user-ids-documents-ids FST. @@ -1258,43 +1307,43 @@ impl Index { /* documents */ /// Returns a document by using the document id. - pub fn document<'t>(&self, rtxn: &'t RoTxn, id: DocumentId) -> Result<&'t obkv::KvReaderU16> { + pub fn compressed_document<'t>( + &self, + rtxn: &'t RoTxn, + id: DocumentId, + ) -> Result> { self.documents .get(rtxn, &id)? .ok_or(UserError::UnknownInternalDocumentId { document_id: id }) .map_err(Into::into) } - /// Returns an iterator over the requested documents. The next item will be an error if a document is missing. - pub fn iter_documents<'a, 't: 'a>( + /// Returns an iterator over the requested compressed documents. The next item will be an error if a document is missing. + pub fn iter_compressed_documents<'a, 't: 'a>( &'a self, rtxn: &'t RoTxn<'t>, ids: impl IntoIterator + 'a, - ) -> Result> + 'a> { - Ok(ids.into_iter().map(move |id| { - let kv = self - .documents - .get(rtxn, &id)? - .ok_or(UserError::UnknownInternalDocumentId { document_id: id })?; - Ok((id, kv)) - })) + ) -> Result)>> + 'a> { + Ok(ids + .into_iter() + .map(move |id| self.compressed_document(rtxn, id).map(|compressed| (id, compressed)))) } /// Returns a [`Vec`] of the requested documents. Returns an error if a document is missing. - pub fn documents<'t>( + pub fn compressed_documents<'t>( &self, rtxn: &'t RoTxn<'t>, ids: impl IntoIterator, - ) -> Result> { - self.iter_documents(rtxn, ids)?.collect() + ) -> Result)>> { + self.iter_compressed_documents(rtxn, ids)?.collect() } /// Returns an iterator over all the documents in the index. - pub fn all_documents<'a, 't: 'a>( + pub fn all_compressed_documents<'a, 't: 'a>( &'a self, rtxn: &'t RoTxn<'t>, - ) -> Result> + 'a> { - self.iter_documents(rtxn, self.documents_ids(rtxn)?) + ) -> Result)>> + 'a> { + self.iter_compressed_documents(rtxn, self.documents_ids(rtxn)?) } pub fn external_id_of<'a, 't: 'a>( @@ -1315,9 +1364,14 @@ impl Index { process: "external_id_of", }) })?; - Ok(self.iter_documents(rtxn, ids)?.map(move |entry| -> Result<_> { - let (_docid, obkv) = entry?; - match primary_key.document_id(obkv, &fields)? { + let dictionary = + self.document_compression_raw_dictionary(rtxn)?.map(DecoderDictionary::copy); + let mut buffer = Vec::new(); + Ok(self.iter_compressed_documents(rtxn, ids)?.map(move |entry| -> Result<_> { + let (_docid, compressed_obkv) = entry?; + let obkv = compressed_obkv + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())?; + match primary_key.document_id(&obkv, &fields)? { Ok(document_id) => Ok(document_id), Err(_) => Err(InternalError::DocumentsError( crate::documents::Error::InvalidDocumentFormat, @@ -2623,7 +2677,12 @@ pub(crate) mod tests { "###); let rtxn = index.read_txn().unwrap(); - let (_docid, obkv) = index.documents(&rtxn, [0]).unwrap()[0]; + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); + let (_docid, compressed_obkv) = index.compressed_documents(&rtxn, [0]).unwrap().remove(0); + let mut buffer = Vec::new(); + let obkv = compressed_obkv + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); let json = obkv_to_json(&[0, 1, 2], &index.fields_ids_map(&rtxn).unwrap(), obkv).unwrap(); insta::assert_debug_snapshot!(json, @r###" { @@ -2632,7 +2691,10 @@ pub(crate) mod tests { "###); // Furthermore, when we retrieve document 34, it is not the result of merging 35 with 34 - let (_docid, obkv) = index.documents(&rtxn, [2]).unwrap()[0]; + let (_docid, compressed_obkv) = index.compressed_documents(&rtxn, [2]).unwrap().remove(0); + let obkv = compressed_obkv + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); let json = obkv_to_json(&[0, 1, 2], &index.fields_ids_map(&rtxn).unwrap(), obkv).unwrap(); insta::assert_debug_snapshot!(json, @r###" { @@ -2641,6 +2703,7 @@ pub(crate) mod tests { } "###); + drop(dictionary); drop(rtxn); // Add new documents again @@ -2839,11 +2902,16 @@ pub(crate) mod tests { } = search.execute().unwrap(); let primary_key_id = index.fields_ids_map(&rtxn).unwrap().id("primary_key").unwrap(); documents_ids.sort_unstable(); - let docs = index.documents(&rtxn, documents_ids).unwrap(); + let compressed_docs = index.compressed_documents(&rtxn, documents_ids).unwrap(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); + let mut buffer = Vec::new(); let mut all_ids = HashSet::new(); - for (_docid, obkv) in docs { - let id = obkv.get(primary_key_id).unwrap(); - assert!(all_ids.insert(id)); + for (_docid, compressed) in compressed_docs { + let doc = compressed + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + let id = doc.get(primary_key_id).unwrap(); + assert!(all_ids.insert(id.to_vec())); } } diff --git a/crates/milli/src/lib.rs b/crates/milli/src/lib.rs index 1fc876f79..5f2b5fb57 100644 --- a/crates/milli/src/lib.rs +++ b/crates/milli/src/lib.rs @@ -47,7 +47,7 @@ pub use search::new::{ }; use serde_json::Value; pub use thread_pool_no_abort::{PanicCatched, ThreadPoolNoAbort, ThreadPoolNoAbortBuilder}; -pub use {charabia as tokenizer, heed, rhai}; +pub use {charabia as tokenizer, heed, rhai, zstd}; pub use self::asc_desc::{AscDesc, AscDescError, Member, SortError}; pub use self::criterion::{default_criteria, Criterion, CriterionError}; diff --git a/crates/milli/src/search/new/tests/mod.rs b/crates/milli/src/search/new/tests/mod.rs index 37bca7597..484418af9 100644 --- a/crates/milli/src/search/new/tests/mod.rs +++ b/crates/milli/src/search/new/tests/mod.rs @@ -25,8 +25,13 @@ fn collect_field_values( ) -> Vec { let mut values = vec![]; let fid = index.fields_ids_map(txn).unwrap().id(fid).unwrap(); - for doc in index.documents(txn, docids.iter().copied()).unwrap() { - if let Some(v) = doc.1.get(fid) { + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(txn).unwrap(); + for (_id, compressed_doc) in index.compressed_documents(txn, docids.iter().copied()).unwrap() { + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + if let Some(v) = doc.get(fid) { let v: serde_json::Value = serde_json::from_slice(v).unwrap(); let v = v.to_string(); values.push(v); diff --git a/crates/milli/src/snapshot_tests.rs b/crates/milli/src/snapshot_tests.rs index 6635ab2f4..a01ad1b1d 100644 --- a/crates/milli/src/snapshot_tests.rs +++ b/crates/milli/src/snapshot_tests.rs @@ -407,9 +407,15 @@ pub fn snap_documents(index: &Index) -> String { let rtxn = index.read_txn().unwrap(); let fields_ids_map = index.fields_ids_map(&rtxn).unwrap(); let display = fields_ids_map.ids().collect::>(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); + let mut buffer = Vec::new(); - for document in index.all_documents(&rtxn).unwrap() { - let doc = obkv_to_json(&display, &fields_ids_map, document.unwrap().1).unwrap(); + for result in index.all_compressed_documents(&rtxn).unwrap() { + let (_id, compressed_document) = result.unwrap(); + let document = compressed_document + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + let doc = obkv_to_json(&display, &fields_ids_map, document).unwrap(); snap.push_str(&serde_json::to_string(&doc).unwrap()); snap.push('\n'); } diff --git a/crates/milli/src/update/clear_documents.rs b/crates/milli/src/update/clear_documents.rs index 6c4efb859..2994dcb65 100644 --- a/crates/milli/src/update/clear_documents.rs +++ b/crates/milli/src/update/clear_documents.rs @@ -62,6 +62,7 @@ impl<'t, 'i> ClearDocuments<'t, 'i> { self.index.put_field_distribution(self.wtxn, &FieldDistribution::default())?; self.index.delete_geo_rtree(self.wtxn)?; self.index.delete_geo_faceted_documents_ids(self.wtxn)?; + self.index.delete_document_compression_dictionary(self.wtxn)?; // Remove all user-provided bits from the configs let mut configs = self.index.embedding_configs(self.wtxn)?; diff --git a/crates/milli/src/update/index_documents/mod.rs b/crates/milli/src/update/index_documents/mod.rs index 3988b311c..a377aaeda 100644 --- a/crates/milli/src/update/index_documents/mod.rs +++ b/crates/milli/src/update/index_documents/mod.rs @@ -4,8 +4,8 @@ mod helpers; mod transform; mod typed_chunk; -use std::collections::HashSet; -use std::io::{Read, Seek}; +use std::collections::{HashMap, HashSet}; +use std::io::{BufWriter, Read, Seek, Write}; use std::iter; use std::num::NonZeroU32; use std::sync::Arc; @@ -13,9 +13,8 @@ use std::sync::Arc; use crossbeam_channel::{Receiver, Sender}; use enrich::enrich_documents_batch; use grenad::{Merger, MergerBuilder}; -use hashbrown::HashMap; -use heed::types::Str; -use heed::Database; +use heed::types::{Bytes, Str}; +use heed::{Database, PutFlags}; use rand::SeedableRng as _; use roaring::RoaringBitmap; use serde::{Deserialize, Serialize}; @@ -28,7 +27,8 @@ pub use self::helpers::*; pub use self::transform::{Transform, TransformOutput}; use super::new::StdResult; use crate::documents::{obkv_to_object, DocumentsBatchReader}; -use crate::error::{Error, InternalError}; +use crate::error::{Error, InternalError, UserError}; +use crate::heed_codec::{CompressedKvWriterU16, CompressedObkvCodec}; use crate::index::{PrefixSearch, PrefixSettings}; use crate::thread_pool_no_abort::ThreadPoolNoAbortBuilder; pub use crate::update::index_documents::helpers::CursorClonableMmap; @@ -36,7 +36,7 @@ use crate::update::{ IndexerConfig, UpdateIndexingStep, WordPrefixDocids, WordPrefixIntegerDocids, WordsPrefixesFst, }; use crate::vector::{ArroyWrapper, EmbeddingConfigs}; -use crate::{CboRoaringBitmapCodec, Index, Result, UserError}; +use crate::{CboRoaringBitmapCodec, Index, Result, BEU32}; static MERGED_DATABASE_COUNT: usize = 7; static PREFIX_DATABASE_COUNT: usize = 4; @@ -201,7 +201,7 @@ where target = "indexing::details", name = "index_documents_raw" )] - pub fn execute_raw(self, output: TransformOutput) -> Result + pub fn execute_raw(mut self, output: TransformOutput) -> Result where FP: Fn(UpdateIndexingStep) + Sync, FA: Fn() -> bool + Sync, @@ -523,6 +523,10 @@ where word_fid_docids.map(MergerBuilder::build), )?; + // This call contains an internal condition to ensure we do not always + // generate compression dictionaries and always compress documents. + self.manage_compression_dictionary()?; + Ok(number_of_documents) } @@ -533,7 +537,7 @@ where name = "index_documents_prefix_databases" )] pub fn execute_prefix_databases( - self, + &mut self, word_docids: Option>, exact_word_docids: Option>, word_position_docids: Option>, @@ -723,6 +727,64 @@ where Ok(()) } + + /// Computes a new dictionay and compress the documents with it in the database. + /// + /// Documents still need to be directly compressed when being written in the database and a dictionary exists. + #[tracing::instrument( + level = "trace", + skip_all, + target = "indexing::compression", + name = "compress_documents_database" + )] + pub fn manage_compression_dictionary(&mut self) -> Result<()> { + /// The size of the dictionary generated from a sample of the documents already + /// in the database. It will be used when compressing and decompressing documents. + const COMPRESSION_DICTIONARY_SIZE: usize = 64_000; + /// The minimum number of documents to trigger the generation of the compression dictionary. + const COMPRESSION_ON_NUMBER_OF_DOCUMENTS: usize = 10_000; + + if self.index.number_of_documents(self.wtxn)? < COMPRESSION_ON_NUMBER_OF_DOCUMENTS as u64 + || self.index.document_compression_dictionary(self.wtxn)?.is_some() + { + return Ok(()); + } + + let mut sample_file = tempfile::tempfile().map(BufWriter::new)?; + let mut sample_sizes = Vec::new(); + // TODO make this 1_000 be 10k and const + let documents = self.index.documents.remap_types::(); + for result in documents.iter(self.wtxn)?.take(COMPRESSION_ON_NUMBER_OF_DOCUMENTS) { + let (_id, bytes) = result?; + sample_file.write_all(bytes)?; + sample_sizes.push(bytes.len()); + } + + let sample_file = sample_file.into_inner().map_err(|ie| ie.into_error())?; + let sample_data = unsafe { memmap2::Mmap::map(&sample_file)? }; + let dictionary = + zstd::dict::from_continuous(&sample_data, &sample_sizes, COMPRESSION_DICTIONARY_SIZE)?; + self.index.put_document_compression_dictionary(self.wtxn, &dictionary)?; + // safety: We just set the dictionary above. It must be there when we get it back. + let dictionary = self.index.document_compression_dictionary(self.wtxn)?.unwrap(); + + let mut iter = self.index.documents.iter_mut(self.wtxn)?; + while let Some(result) = iter.next() { + let (docid, document) = result?; + let document = document.as_non_compressed().as_bytes(); + let compressed = CompressedKvWriterU16::new_with_dictionary(document, &dictionary)?; + // safety: the compressed document is entirely owned + unsafe { + iter.put_current_with_options::( + PutFlags::empty(), + &docid, + &compressed, + )?; + } + } + + Ok(()) + } } /// Run the word prefix docids update operation. @@ -812,7 +874,7 @@ mod tests { let rtxn = index.read_txn().unwrap(); let count = index.number_of_documents(&rtxn).unwrap(); assert_eq!(count, 3); - let count = index.all_documents(&rtxn).unwrap().count(); + let count = index.all_compressed_documents(&rtxn).unwrap().count(); assert_eq!(count, 3); drop(rtxn); @@ -821,6 +883,7 @@ mod tests { #[test] fn simple_document_merge() { let mut index = TempIndex::new(); + let mut buffer = Vec::new(); index.index_documents_config.update_method = IndexDocumentsMethod::UpdateDocuments; // First we send 3 documents with duplicate ids and @@ -839,16 +902,21 @@ mod tests { assert_eq!(count, 1); // Check that we get only one document from the database. - let docs = index.documents(&rtxn, Some(0)).unwrap(); - assert_eq!(docs.len(), 1); - let (id, doc) = docs[0]; + let mut compressed_docs = index.compressed_documents(&rtxn, Some(0)).unwrap(); + assert_eq!(compressed_docs.len(), 1); + let (id, compressed_doc) = compressed_docs.remove(0); assert_eq!(id, 0); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); // Check that this document is equal to the last one sent. let mut doc_iter = doc.iter(); assert_eq!(doc_iter.next(), Some((0, &b"1"[..]))); assert_eq!(doc_iter.next(), Some((1, &br#""benoit""#[..]))); assert_eq!(doc_iter.next(), None); + drop(dictionary); drop(rtxn); // Second we send 1 document with id 1, to force it to be merged with the previous one. @@ -860,10 +928,14 @@ mod tests { assert_eq!(count, 1); // Check that we get only one document from the database. - let docs = index.documents(&rtxn, Some(0)).unwrap(); - assert_eq!(docs.len(), 1); - let (id, doc) = docs[0]; + let mut compressed_docs = index.compressed_documents(&rtxn, Some(0)).unwrap(); + assert_eq!(compressed_docs.len(), 1); + let (id, compressed_doc) = compressed_docs.remove(0); assert_eq!(id, 0); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); // Check that this document is equal to the last one sent. let mut doc_iter = doc.iter(); @@ -871,6 +943,129 @@ mod tests { assert_eq!(doc_iter.next(), Some((1, &br#""benoit""#[..]))); assert_eq!(doc_iter.next(), Some((2, &b"25"[..]))); assert_eq!(doc_iter.next(), None); + drop(dictionary); + drop(rtxn); + } + + #[test] + fn not_auto_generated_documents_ids() { + let index = TempIndex::new(); + + let result = index.add_documents(documents!([ + { "name": "kevin" }, + { "name": "kevina" }, + { "name": "benoit" } + ])); + assert!(result.is_err()); + + // Check that there is no document. + let rtxn = index.read_txn().unwrap(); + let count = index.number_of_documents(&rtxn).unwrap(); + assert_eq!(count, 0); + drop(rtxn); + } + + #[test] + fn simple_auto_generated_documents_ids() { + let mut index = TempIndex::new(); + let mut buffer = Vec::new(); + index.index_documents_config.autogenerate_docids = true; + // First we send 3 documents with ids from 1 to 3. + index + .add_documents(documents!([ + { "name": "kevin" }, + { "name": "kevina" }, + { "name": "benoit" } + ])) + .unwrap(); + + // Check that there is 3 documents now. + let rtxn = index.read_txn().unwrap(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); + let count = index.number_of_documents(&rtxn).unwrap(); + assert_eq!(count, 3); + + let compressed_docs = index.compressed_documents(&rtxn, vec![0, 1, 2]).unwrap(); + let (_id, compressed_obkv) = compressed_docs + .iter() + .find(|(_id, compressed_doc)| { + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + doc.get(0) == Some(br#""kevin""#) + }) + .unwrap(); + + let obkv = compressed_obkv + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + let kevin_uuid: String = serde_json::from_slice(obkv.get(1).unwrap()).unwrap(); + drop(dictionary); + drop(rtxn); + + // Second we send 1 document with the generated uuid, to erase the previous ones. + index.add_documents(documents!([ { "name": "updated kevin", "id": kevin_uuid } ])).unwrap(); + + // Check that there is **always** 3 documents. + let rtxn = index.read_txn().unwrap(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); + let count = index.number_of_documents(&rtxn).unwrap(); + assert_eq!(count, 3); + + // the document 0 has been deleted and reinserted with the id 3 + let mut compressed_docs = index.compressed_documents(&rtxn, vec![1, 2, 0]).unwrap(); + let kevin_position = compressed_docs + .iter() + .position(|(_, compressed_doc)| { + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + + doc.get(0).unwrap() == br#""updated kevin""# + }) + .unwrap(); + assert_eq!(kevin_position, 2); + let (_, compressed_doc) = compressed_docs.remove(kevin_position); + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + + // Check that this document is equal to the last + // one sent and that an UUID has been generated. + assert_eq!(doc.get(0), Some(&br#""updated kevin""#[..])); + // This is an UUID, it must be 36 bytes long plus the 2 surrounding string quotes ("). + assert_eq!(doc.get(1).unwrap().len(), 36 + 2); + drop(dictionary); + drop(rtxn); + } + + #[test] + fn reordered_auto_generated_documents_ids() { + let mut index = TempIndex::new(); + + // First we send 3 documents with ids from 1 to 3. + index + .add_documents(documents!([ + { "id": 1, "name": "kevin" }, + { "id": 2, "name": "kevina" }, + { "id": 3, "name": "benoit" } + ])) + .unwrap(); + + // Check that there is 3 documents now. + let rtxn = index.read_txn().unwrap(); + let count = index.number_of_documents(&rtxn).unwrap(); + assert_eq!(count, 3); + drop(rtxn); + + // Second we send 1 document without specifying the id. + index.index_documents_config.autogenerate_docids = true; + index.add_documents(documents!([ { "name": "new kevin" } ])).unwrap(); + + // Check that there is 4 documents now. + let rtxn = index.read_txn().unwrap(); + let count = index.number_of_documents(&rtxn).unwrap(); + assert_eq!(count, 4); drop(rtxn); } @@ -972,7 +1167,7 @@ mod tests { let rtxn = index.read_txn().unwrap(); let count = index.number_of_documents(&rtxn).unwrap(); assert_eq!(count, 6); - let count = index.all_documents(&rtxn).unwrap().count(); + let count = index.all_compressed_documents(&rtxn).unwrap().count(); assert_eq!(count, 6); db_snap!(index, word_docids, "updated"); @@ -1390,7 +1585,7 @@ mod tests { index.add_documents(documents!({ "a" : { "b" : { "c" : 1 }}})).unwrap(); let rtxn = index.read_txn().unwrap(); - let all_documents_count = index.all_documents(&rtxn).unwrap().count(); + let all_documents_count = index.all_compressed_documents(&rtxn).unwrap().count(); assert_eq!(all_documents_count, 1); let external_documents_ids = index.external_documents_ids(); assert!(external_documents_ids.get(&rtxn, "1").unwrap().is_some()); @@ -2869,7 +3064,7 @@ mod tests { // Ensuring all the returned IDs actually exists let rtxn = index.read_txn().unwrap(); let res = index.search(&rtxn).execute().unwrap(); - index.documents(&rtxn, res.documents_ids).unwrap(); + index.compressed_documents(&rtxn, res.documents_ids).unwrap(); } fn delete_documents<'t>( @@ -3248,7 +3443,7 @@ mod tests { let rtxn = index.read_txn().unwrap(); // list all documents - let results = index.all_documents(&rtxn).unwrap(); + let results = index.all_compressed_documents(&rtxn).unwrap(); for result in results { let (id, _) = result.unwrap(); assert!( diff --git a/crates/milli/src/update/index_documents/transform.rs b/crates/milli/src/update/index_documents/transform.rs index 7477b5667..421866f85 100644 --- a/crates/milli/src/update/index_documents/transform.rs +++ b/crates/milli/src/update/index_documents/transform.rs @@ -174,10 +174,12 @@ impl<'a, 'i> Transform<'a, 'i> { let external_documents_ids = self.index.external_documents_ids(); let mapping = create_fields_mapping(&mut self.fields_ids_map, &fields_index)?; + let dictionary = self.index.document_decompression_dictionary(wtxn)?; let primary_key = cursor.primary_key().to_string(); let primary_key_id = self.fields_ids_map.insert(&primary_key).ok_or(UserError::AttributeLimitReached)?; + let mut decompression_buffer = Vec::new(); let mut obkv_buffer = Vec::new(); let mut document_sorter_value_buffer = Vec::new(); let mut document_sorter_key_buffer = Vec::new(); @@ -253,18 +255,17 @@ impl<'a, 'i> Transform<'a, 'i> { let mut skip_insertion = false; if let Some(original_docid) = original_docid { let original_key = original_docid; - let base_obkv = self - .index - .documents - .remap_data_type::() - .get(wtxn, &original_key)? - .ok_or(InternalError::DatabaseMissingEntry { - db_name: db_name::DOCUMENTS, - key: None, - })?; + let base_compressed_obkv = self.index.documents.get(wtxn, &original_key)?.ok_or( + InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, + )?; + + let base_obkv = base_compressed_obkv.decompress_with_optional_dictionary( + &mut decompression_buffer, + dictionary.as_ref(), + )?; // we check if the two documents are exactly equal. If it's the case we can skip this document entirely - if base_obkv == obkv_buffer { + if base_obkv.as_bytes() == obkv_buffer { // we're not replacing anything self.replaced_documents_ids.remove(original_docid); // and we need to put back the original id as it was before @@ -284,13 +285,12 @@ impl<'a, 'i> Transform<'a, 'i> { document_sorter_value_buffer.clear(); document_sorter_value_buffer.push(Operation::Addition as u8); into_del_add_obkv( - KvReaderU16::from_slice(base_obkv), + base_obkv, deladd_operation, &mut document_sorter_value_buffer, )?; self.original_sorter .insert(&document_sorter_key_buffer, &document_sorter_value_buffer)?; - let base_obkv = KvReader::from_slice(base_obkv); if let Some(flattened_obkv) = Self::flatten_from_fields_ids_map(base_obkv, &mut self.fields_ids_map)? { @@ -354,9 +354,12 @@ impl<'a, 'i> Transform<'a, 'i> { documents_seen: documents_count, }); + drop(dictionary); + self.index.put_fields_ids_map(wtxn, &self.fields_ids_map)?; self.index.put_primary_key(wtxn, &primary_key)?; self.documents_count += documents_count; + // Now that we have a valid sorter that contains the user id and the obkv we // give it to the last transforming function which returns the TransformOutput. Ok(documents_count) @@ -859,15 +862,21 @@ impl<'a, 'i> Transform<'a, 'i> { if original_sorter.is_some() || flattened_sorter.is_some() { let modified_faceted_fields = settings_diff.modified_faceted_fields(); + let dictionary = self.index.document_decompression_dictionary(wtxn)?; + let mut original_obkv_buffer = Vec::new(); let mut flattened_obkv_buffer = Vec::new(); let mut document_sorter_key_buffer = Vec::new(); + let mut buffer = Vec::new(); for result in self.index.external_documents_ids().iter(wtxn)? { let (external_id, docid) = result?; - let old_obkv = self.index.documents.get(wtxn, &docid)?.ok_or( + let old_compressed_obkv = self.index.documents.get(wtxn, &docid)?.ok_or( InternalError::DatabaseMissingEntry { db_name: db_name::DOCUMENTS, key: None }, )?; + let old_obkv = old_compressed_obkv + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref())?; + let injected_vectors: std::result::Result< serde_json::Map, arroy::Error, diff --git a/crates/milli/src/update/index_documents/typed_chunk.rs b/crates/milli/src/update/index_documents/typed_chunk.rs index a97569800..2656839f7 100644 --- a/crates/milli/src/update/index_documents/typed_chunk.rs +++ b/crates/milli/src/update/index_documents/typed_chunk.rs @@ -17,6 +17,7 @@ use super::helpers::{ }; use crate::external_documents_ids::{DocumentOperation, DocumentOperationKind}; use crate::facet::FacetType; +use crate::heed_codec::CompressedKvWriterU16; use crate::index::db_name::DOCUMENTS; use crate::index::IndexEmbeddingConfig; use crate::proximity::MAX_DISTANCE; @@ -159,6 +160,7 @@ pub(crate) fn write_typed_chunk_into_index( .into_iter() .map(|IndexEmbeddingConfig { name, .. }| name) .collect(); + let dictionary = index.document_compression_dictionary(wtxn)?; let mut vectors_buffer = Vec::new(); while let Some((key, reader)) = iter.next()? { let mut writer: KvWriter<_, FieldId> = KvWriter::memory(); @@ -208,7 +210,17 @@ pub(crate) fn write_typed_chunk_into_index( let db = index.documents.remap_data_type::(); if !writer.is_empty() { - db.put(wtxn, &docid, &writer.into_inner().unwrap())?; + let uncompressed_document_bytes = writer.into_inner().unwrap(); + match dictionary.as_ref() { + Some(dictionary) => { + let compressed = CompressedKvWriterU16::new_with_dictionary( + &uncompressed_document_bytes, + dictionary, + )?; + db.put(wtxn, &docid, compressed.as_bytes())? + } + None => db.put(wtxn, &docid, &uncompressed_document_bytes)?, + } operations.push(DocumentOperation { external_id: external_id.to_string(), internal_id: docid, diff --git a/crates/milli/src/update/new/document.rs b/crates/milli/src/update/new/document.rs index 930b0c078..2c75c516a 100644 --- a/crates/milli/src/update/new/document.rs +++ b/crates/milli/src/update/new/document.rs @@ -1,3 +1,4 @@ +use std::borrow::Cow; use std::collections::{BTreeMap, BTreeSet}; use bumparaw_collections::RawMap; @@ -47,23 +48,15 @@ pub trait Document<'doc> { fn geo_field(&self) -> Result>; } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct DocumentFromDb<'t, Mapper: FieldIdMapper> where Mapper: FieldIdMapper, { fields_ids_map: &'t Mapper, - content: &'t KvReaderFieldId, + content: Cow<'t, KvReaderFieldId>, } -impl<'t, Mapper: FieldIdMapper> Clone for DocumentFromDb<'t, Mapper> { - #[inline] - fn clone(&self) -> Self { - *self - } -} -impl<'t, Mapper: FieldIdMapper> Copy for DocumentFromDb<'t, Mapper> {} - impl<'t, Mapper: FieldIdMapper> Document<'t> for DocumentFromDb<'t, Mapper> { fn iter_top_level_fields(&self) -> impl Iterator> { let mut it = self.content.iter(); diff --git a/crates/milli/src/update/new/extract/documents.rs b/crates/milli/src/update/new/extract/documents.rs index 13307025a..b38e2d79a 100644 --- a/crates/milli/src/update/new/extract/documents.rs +++ b/crates/milli/src/update/new/extract/documents.rs @@ -53,6 +53,8 @@ impl<'a, 'b, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a, 'b> { let external_docid = change.external_docid().to_owned(); + todo!("manage documents compression"); + // document but we need to create a function that collects and compresses documents. match change { DocumentChange::Deletion(deletion) => { diff --git a/crates/milli/src/update/settings.rs b/crates/milli/src/update/settings.rs index 3d2702479..527e10998 100644 --- a/crates/milli/src/update/settings.rs +++ b/crates/milli/src/update/settings.rs @@ -1944,6 +1944,8 @@ mod tests { // Check that the searchable field is correctly set to "name" only. let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); // When we search for something that is not in // the searchable fields it must not return any document. let result = index.search(&rtxn).query("23").execute().unwrap(); @@ -1952,10 +1954,17 @@ mod tests { // When we search for something that is in the searchable fields // we must find the appropriate document. let result = index.search(&rtxn).query(r#""kevin""#).execute().unwrap(); - let documents = index.documents(&rtxn, result.documents_ids).unwrap(); + let mut compressed_documents = + index.compressed_documents(&rtxn, result.documents_ids).unwrap(); let fid_map = index.fields_ids_map(&rtxn).unwrap(); - assert_eq!(documents.len(), 1); - assert_eq!(documents[0].1.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..])); + assert_eq!(compressed_documents.len(), 1); + let (_id, compressed_document) = compressed_documents.remove(0); + let document = compressed_document + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + + assert_eq!(document.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..])); + drop(dictionary); drop(rtxn); // We change the searchable fields to be the "name" field only. @@ -1980,6 +1989,7 @@ mod tests { // Check that the searchable field have been reset and documents are found now. let rtxn = index.read_txn().unwrap(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let fid_map = index.fields_ids_map(&rtxn).unwrap(); let user_defined_searchable_fields = index.user_defined_searchable_fields(&rtxn).unwrap(); snapshot!(format!("{user_defined_searchable_fields:?}"), @"None"); @@ -1988,8 +1998,13 @@ mod tests { snapshot!(format!("{searchable_fields:?}"), @r###"["id", "name", "age"]"###); let result = index.search(&rtxn).query("23").execute().unwrap(); assert_eq!(result.documents_ids.len(), 1); - let documents = index.documents(&rtxn, result.documents_ids).unwrap(); - assert_eq!(documents[0].1.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..])); + let mut compressed_documents = + index.compressed_documents(&rtxn, result.documents_ids).unwrap(); + let (_id, compressed_document) = compressed_documents.remove(0); + let document = compressed_document + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); + assert_eq!(document.get(fid_map.id("name").unwrap()), Some(&br#""kevin""#[..])); } #[test] @@ -2120,15 +2135,20 @@ mod tests { // Check that the displayed fields are correctly set. let rtxn = index.read_txn().unwrap(); + let mut buffer = Vec::new(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let fields_ids = index.filterable_fields(&rtxn).unwrap(); assert_eq!(fields_ids, hashset! { S("age") }); // Only count the field_id 0 and level 0 facet values. // TODO we must support typed CSVs for numbers to be understood. let fidmap = index.fields_ids_map(&rtxn).unwrap(); - for document in index.all_documents(&rtxn).unwrap() { - let document = document.unwrap(); - let json = crate::obkv_to_json(&fidmap.ids().collect::>(), &fidmap, document.1) + for result in index.all_compressed_documents(&rtxn).unwrap() { + let (_id, compressed_document) = result.unwrap(); + let document = compressed_document + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) .unwrap(); + let json = + crate::obkv_to_json(&fidmap.ids().collect::>(), &fidmap, document).unwrap(); println!("json: {:?}", json); } let count = index @@ -2139,6 +2159,7 @@ mod tests { .unwrap() .count(); assert_eq!(count, 3); + drop(dictionary); drop(rtxn); // Index a little more documents with new and current facets values. @@ -2228,6 +2249,7 @@ mod tests { #[test] fn set_asc_desc_field() { let index = TempIndex::new(); + let mut buffer = Vec::new(); // Set the filterable fields to be the age. index @@ -2248,12 +2270,16 @@ mod tests { // Run an empty query just to ensure that the search results are ordered. let rtxn = index.read_txn().unwrap(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); let SearchResult { documents_ids, .. } = index.search(&rtxn).execute().unwrap(); - let documents = index.documents(&rtxn, documents_ids).unwrap(); + let compressed_documents = index.compressed_documents(&rtxn, documents_ids).unwrap(); // Fetch the documents "age" field in the ordre in which the documents appear. let age_field_id = index.fields_ids_map(&rtxn).unwrap().id("age").unwrap(); - let iter = documents.into_iter().map(|(_, doc)| { + let iter = compressed_documents.into_iter().map(|(_, compressed_doc)| { + let doc = compressed_doc + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); let bytes = doc.get(age_field_id).unwrap(); let string = std::str::from_utf8(bytes).unwrap(); string.parse::().unwrap() @@ -2645,6 +2671,7 @@ mod tests { #[test] fn setting_impact_relevancy() { let index = TempIndex::new(); + let mut buffer = Vec::new(); // Set the genres setting index @@ -2676,8 +2703,12 @@ mod tests { let rtxn = index.read_txn().unwrap(); let SearchResult { documents_ids, .. } = index.search(&rtxn).query("S").execute().unwrap(); let first_id = documents_ids[0]; - let documents = index.documents(&rtxn, documents_ids).unwrap(); - let (_, content) = documents.iter().find(|(id, _)| *id == first_id).unwrap(); + let documents = index.compressed_documents(&rtxn, documents_ids).unwrap(); + let (_, compressed_content) = documents.iter().find(|(id, _)| *id == first_id).unwrap(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); + let content = compressed_content + .decompress_with_optional_dictionary(&mut buffer, dictionary.as_ref()) + .unwrap(); let fid = index.fields_ids_map(&rtxn).unwrap().id("title").unwrap(); let line = std::str::from_utf8(content.get(fid).unwrap()).unwrap(); @@ -2851,7 +2882,7 @@ mod tests { wtxn.commit().unwrap(); let rtxn = index.write_txn().unwrap(); - let docs: StdResult, _> = index.all_documents(&rtxn).unwrap().collect(); + let docs: StdResult, _> = index.all_compressed_documents(&rtxn).unwrap().collect(); let docs = docs.unwrap(); assert_eq!(docs.len(), 5); } diff --git a/crates/milli/tests/search/query_criteria.rs b/crates/milli/tests/search/query_criteria.rs index 8401f0444..cdf0a6538 100644 --- a/crates/milli/tests/search/query_criteria.rs +++ b/crates/milli/tests/search/query_criteria.rs @@ -348,7 +348,20 @@ fn criteria_ascdesc() { wtxn.commit().unwrap(); let rtxn = index.read_txn().unwrap(); - let documents = index.all_documents(&rtxn).unwrap().map(|doc| doc.unwrap()).collect::>(); + let dictionary = index.document_decompression_dictionary(&rtxn).unwrap(); + let mut buffers = vec![Vec::new(); index.number_of_documents(&rtxn).unwrap() as usize]; + let documents = index + .all_compressed_documents(&rtxn) + .unwrap() + .zip(buffers.iter_mut()) + .map(|(compressed, buffer)| { + let (id, compressed) = compressed.unwrap(); + let doc = compressed + .decompress_with_optional_dictionary(buffer, dictionary.as_ref()) + .unwrap(); + (id, doc) + }) + .collect::>(); for criterion in [Asc(S("name")), Desc(S("name")), Asc(S("age")), Desc(S("age"))] { eprintln!("Testing with criterion: {:?}", &criterion); diff --git a/milli/examples/search.rs b/milli/examples/search.rs new file mode 100644 index 000000000..e69de29bb