move csv parsing to document_formats

This commit is contained in:
mpostma 2021-09-28 22:58:48 +02:00
parent 2a14948123
commit 6e8a3fe8de
4 changed files with 384 additions and 414 deletions

185
Cargo.lock generated
View File

@ -82,7 +82,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2f86cd6857c135e6e9fe57b1619a88d1f94a7df34c00e11fe13e64fd3438837"
dependencies = [
"quote 1.0.9",
"syn 1.0.76",
"syn 1.0.77",
]
[[package]]
@ -218,7 +218,7 @@ dependencies = [
"actix-router",
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.76",
"syn 1.0.77",
]
[[package]]
@ -296,29 +296,15 @@ dependencies = [
[[package]]
name = "arc-swap"
version = "1.3.2"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5ab7d9e73059c86c36473f459b52adbd99c3554a4fec492caef460806006f00"
checksum = "e6df5aef5c5830360ce5218cecb8f018af3438af5686ae945094affc86fdec63"
[[package]]
name = "as-slice"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "45403b49e3954a4b8428a0ac21a4b7afadccf92bfd96273f1a58cd4812496ae0"
<<<<<<< HEAD
=======
dependencies = [
"generic-array 0.12.4",
"generic-array 0.13.3",
"generic-array 0.14.4",
"stable_deref_trait",
]
[[package]]
name = "assert-json-diff"
version = "1.0.1"
source = "git+https://github.com/qdequele/assert-json-diff?branch=master#9012a0c8866d0f2db0ef9a6242e4a19d1e8c67e4"
>>>>>>> 9d9543fd (Use an existing revision of milli)
dependencies = [
"generic-array 0.12.4",
"generic-array 0.13.3",
@ -344,7 +330,7 @@ checksum = "648ed8c8d2ce5409ccd57453d9d1b214b342a0d69376a6feda1fd6cae3299308"
dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.76",
"syn 1.0.77",
]
[[package]]
@ -355,7 +341,7 @@ checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e"
dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.76",
"syn 1.0.77",
]
[[package]]
@ -478,9 +464,9 @@ dependencies = [
[[package]]
name = "bstr"
version = "0.2.16"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90682c8d613ad3373e66de8c6411e0ae2ab2571e879d2efbf73558cc66f21279"
checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223"
dependencies = [
"lazy_static",
"memchr",
@ -490,9 +476,9 @@ dependencies = [
[[package]]
name = "bumpalo"
version = "3.7.0"
version = "3.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c59e7af012c713f529e7a3ee57ce9b31ddd858d4b512923602f74608b009631"
checksum = "d9df67f7bf9ef8498769f994239c45613ef0c5899415fb58e9add412d2c1a538"
[[package]]
name = "byte-tools"
@ -526,7 +512,7 @@ checksum = "8e215f8c2f9f79cb53c8335e687ffd07d5bfcb6fe5fc80723762d0be46e7cc54"
dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.76",
"syn 1.0.77",
]
[[package]]
@ -593,9 +579,9 @@ dependencies = [
[[package]]
name = "cedarwood"
version = "0.4.4"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "963e82c7b94163808ca3a452608d260b64ba5bc7b5653b4af1af59887899f48d"
checksum = "fa312498f9f41452998d984d3deb84c84f86aeb8a2499d7505bb8106d78d147d"
dependencies = [
"smallvec",
]
@ -668,7 +654,7 @@ checksum = "1df715824eb382e34b7afb7463b0247bf41538aeba731fba05241ecdb5dc3747"
dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.76",
"syn 1.0.77",
]
[[package]]
@ -817,7 +803,7 @@ checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b"
dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.76",
"syn 1.0.77",
]
[[package]]
@ -830,7 +816,7 @@ dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
"rustc_version 0.3.3",
"syn 1.0.76",
"syn 1.0.77",
]
[[package]]
@ -895,7 +881,7 @@ checksum = "c134c37760b27a871ba422106eedbb8247da973a09e82558bf26d619c882b159"
dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.76",
"syn 1.0.77",
]
[[package]]
@ -937,9 +923,9 @@ checksum = "31586bda1b136406162e381a3185a506cdfc1631708dd40cba2f6628d8634499"
[[package]]
name = "flate2"
version = "1.0.21"
version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80edafed416a46fb378521624fab1cfa2eb514784fd8921adbe8a8d8321da811"
checksum = "1e6988e897c1c9c485f43b47a529cef42fde0547f9d8d41a7062518f1d8fc53f"
dependencies = [
"cfg-if 1.0.0",
"crc32fast",
@ -1033,7 +1019,7 @@ dependencies = [
"proc-macro-hack",
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.76",
"syn 1.0.77",
]
[[package]]
@ -1132,7 +1118,7 @@ dependencies = [
"proc-macro-error",
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.76",
"syn 1.0.77",
]
[[package]]
@ -1288,9 +1274,9 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "http"
version = "0.2.4"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "527e8c9ac747e28542699a951517aa9a6945af506cd1f2e1b53a576c17b6cc11"
checksum = "1323096b05d41827dadeaee54c9981958c0f94e670bc94ed80037d1a7b8b186b"
dependencies = [
"bytes",
"fnv",
@ -1334,9 +1320,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "hyper"
version = "0.14.12"
version = "0.14.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13f67199e765030fa08fe0bd581af683f0d5bc04ea09c2b1102012c5fb90e7fd"
checksum = "15d1cfb9e4f68655fa04c01f59edb405b6074a0f7118ea881e5026e4a1cd8593"
dependencies = [
"bytes",
"futures-channel",
@ -1395,9 +1381,9 @@ dependencies = [
[[package]]
name = "instant"
version = "0.1.10"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bee0328b1209d157ef001c94dd85b4f8f64139adb0eac2659f4b08382b2f474d"
checksum = "716d3d89f35ac6a34fd0eed635395f4c3b76fa889338a4632e5231a8684216bd"
dependencies = [
"cfg-if 1.0.0",
]
@ -1470,9 +1456,9 @@ dependencies = [
[[package]]
name = "js-sys"
version = "0.3.54"
version = "0.3.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1866b355d9c878e5e607473cbe3f63282c0b7aad2db1dbebf55076c686918254"
checksum = "7cc9ffccd38c451a86bf13657df244e9c3f37493cce8e5e21e940963777acc84"
dependencies = [
"wasm-bindgen",
]
@ -1500,9 +1486,9 @@ dependencies = [
[[package]]
name = "libc"
version = "0.2.101"
version = "0.2.103"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3cb00336871be5ed2c8ed44b60ae9959dc5b9f08539422ed43f09e34ecaeba21"
checksum = "dd8f7255a17a627354f321ef0055d63b898c6fb27eff628af4d1b66b7331edf6"
[[package]]
name = "libgit2-sys"
@ -1792,12 +1778,7 @@ dependencies = [
[[package]]
name = "milli"
<<<<<<< HEAD
version = "0.16.0"
=======
version = "0.14.0"
source = "git+https://github.com/meilisearch/milli.git?rev=9d9010e#9d9010e45ff1eddd8a7715423ad0988a35ee34b6"
>>>>>>> 9d9543fd (Use an existing revision of milli)
dependencies = [
"bimap",
"bincode",
@ -2071,7 +2052,7 @@ dependencies = [
"pest_meta",
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.76",
"syn 1.0.77",
]
[[package]]
@ -2140,7 +2121,7 @@ checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"
dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.76",
"syn 1.0.77",
]
[[package]]
@ -2157,9 +2138,9 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pkg-config"
version = "0.3.19"
version = "0.3.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c"
checksum = "7c9b1041b4387893b91ee6746cddfc28516aff326a3519fb2adf820932c5e6cb"
[[package]]
name = "ppv-lite86"
@ -2176,7 +2157,7 @@ dependencies = [
"proc-macro-error-attr",
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.76",
"syn 1.0.77",
"version_check",
]
@ -2383,9 +2364,9 @@ dependencies = [
[[package]]
name = "retain_mut"
version = "0.1.3"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9c17925a9027d298a4603d286befe3f9dc0e8ed02523141914eb628798d6e5b"
checksum = "448296241d034b96c11173591deaa1302f2c17b56092106c1f92c1bc0183a8c9"
[[package]]
name = "ring"
@ -2550,14 +2531,14 @@ checksum = "d7bc1a1ab1961464eae040d96713baa5a724a8152c1222492465b54322ec508b"
dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.76",
"syn 1.0.77",
]
[[package]]
name = "serde_json"
version = "1.0.67"
version = "1.0.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7f9e390c27c3c0ce8bc5d725f6e4d30a29d26659494aa4b17535f7522c5c950"
checksum = "0f690853975602e1bfe1ccbf50504d67174e3bcf340f23b5ea9992e0587a52d8"
dependencies = [
"indexmap",
"itoa",
@ -2670,15 +2651,15 @@ dependencies = [
[[package]]
name = "smallvec"
version = "1.6.1"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe0f37c9e8f3c5a4a66ad655a93c74daac4ad00c441533bf5c6e7990bb42604e"
checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309"
[[package]]
name = "socket2"
version = "0.4.1"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "765f090f0e423d2b55843402a07915add955e7d60657db13707a159727326cad"
checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516"
dependencies = [
"libc",
"winapi",
@ -2729,7 +2710,7 @@ dependencies = [
"quote 1.0.9",
"serde",
"serde_derive",
"syn 1.0.76",
"syn 1.0.77",
]
[[package]]
@ -2745,7 +2726,7 @@ dependencies = [
"serde_derive",
"serde_json",
"sha1",
"syn 1.0.76",
"syn 1.0.77",
]
[[package]]
@ -2781,7 +2762,7 @@ dependencies = [
"proc-macro-error",
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.76",
"syn 1.0.77",
]
[[package]]
@ -2797,9 +2778,9 @@ dependencies = [
[[package]]
name = "syn"
version = "1.0.76"
version = "1.0.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6f107db402c2c2055242dbf4d2af0e69197202e9faacbef9571bbe47f5a1b84"
checksum = "5239bc68e0fef57495900cfea4e8dc75596d9a319d7e16b1e0a440d24e6fe0a0"
dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
@ -2823,15 +2804,15 @@ checksum = "474aaa926faa1603c40b7885a9eaea29b444d1cb2850cb7c0e37bb1a4182f4fa"
dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.76",
"syn 1.0.77",
"unicode-xid 0.2.2",
]
[[package]]
name = "sysinfo"
version = "0.20.3"
version = "0.20.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92d77883450d697c0010e60db3d940ed130b0ed81d27485edee981621b434e52"
checksum = "ffff4a02fa61eee51f95210fc9c98ea6eeb46bb071adeafd61e1a0b9b22c6a6d"
dependencies = [
"cfg-if 1.0.0",
"core-foundation-sys",
@ -2902,7 +2883,7 @@ checksum = "bad553cc2c78e8de258400763a647e80e6d1b31ee237275d756f6836d204494c"
dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.76",
"syn 1.0.77",
]
[[package]]
@ -2951,14 +2932,14 @@ dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
"standback",
"syn 1.0.76",
"syn 1.0.77",
]
[[package]]
name = "tinyvec"
version = "1.4.0"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5241dd6f21443a3606b432718b166d3cedc962fd4b8bea54a8bc7f514ebda986"
checksum = "f83b2a3d4d9091d0abd7eba4dc2710b1718583bd4d8992e2190720ea38f391f7"
dependencies = [
"tinyvec_macros",
]
@ -2971,9 +2952,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]]
name = "tokio"
version = "1.11.0"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4efe6fc2395938c8155973d7be49fe8d03a843726e285e100a8a383cc0154ce"
checksum = "c2c2416fdedca8443ae44b4527de1ea633af61d8f7169ffa6e72c5b53d24efcc"
dependencies = [
"autocfg",
"bytes",
@ -2997,7 +2978,7 @@ checksum = "54473be61f4ebe4efd09cec9bd5d16fa51d70ea0192213d754d2d500457db110"
dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.76",
"syn 1.0.77",
]
[[package]]
@ -3053,9 +3034,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]]
name = "tracing"
version = "0.1.26"
version = "0.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09adeb8c97449311ccd28a427f96fb563e7fd31aabf994189879d9da2394b89d"
checksum = "84f96e095c0c82419687c20ddf5cb3eadb61f4e1405923c9dc8e53a1adacbda8"
dependencies = [
"cfg-if 1.0.0",
"pin-project-lite",
@ -3121,9 +3102,9 @@ checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b"
[[package]]
name = "unicode-width"
version = "0.1.8"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3"
checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973"
[[package]]
name = "unicode-xid"
@ -3240,9 +3221,9 @@ checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "wasm-bindgen"
version = "0.2.77"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e68338db6becec24d3c7977b5bf8a48be992c934b5d07177e3931f5dc9b076c"
checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce"
dependencies = [
"cfg-if 1.0.0",
"serde",
@ -3252,24 +3233,24 @@ dependencies = [
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.77"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f34c405b4f0658583dba0c1c7c9b694f3cac32655db463b56c254a1c75269523"
checksum = "a317bf8f9fba2476b4b2c85ef4c4af8ff39c3c7f0cdfeed4f82c34a880aa837b"
dependencies = [
"bumpalo",
"lazy_static",
"log",
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.76",
"syn 1.0.77",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.27"
version = "0.4.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a87d738d4abc4cf22f6eb142f5b9a81301331ee3c767f2fef2fda4e325492060"
checksum = "8e8d7523cb1f2a4c96c1317ca690031b714a51cc14e05f712446691f413f5d39"
dependencies = [
"cfg-if 1.0.0",
"js-sys",
@ -3279,9 +3260,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.77"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9d5a6580be83b19dc570a8f9c324251687ab2184e57086f71625feb57ec77c8"
checksum = "d56146e7c495528bf6587663bea13a8eb588d39b36b679d83972e1a2dbbdacf9"
dependencies = [
"quote 1.0.9",
"wasm-bindgen-macro-support",
@ -3289,28 +3270,28 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.77"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3775a030dc6f5a0afd8a84981a21cc92a781eb429acef9ecce476d0c9113e92"
checksum = "7803e0eea25835f8abdc585cd3021b3deb11543c6fe226dcd30b228857c5c5ab"
dependencies = [
"proc-macro2 1.0.29",
"quote 1.0.9",
"syn 1.0.76",
"syn 1.0.77",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.77"
version = "0.2.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c279e376c7a8e8752a8f1eaa35b7b0bee6bb9fb0cdacfa97cc3f1f289c87e2b4"
checksum = "0237232789cf037d5480773fe568aac745bfe2afbc11a863e97901780a6b47cc"
[[package]]
name = "web-sys"
version = "0.3.54"
version = "0.3.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a84d70d1ec7d2da2d26a5bd78f4bca1b8c3254805363ce743b7a05bc30d195a"
checksum = "38eb105f1c59d9eaa6b5cdc92b859d85b926e82cb2e0945cd0c9259faa6fe9fb"
dependencies = [
"js-sys",
"wasm-bindgen",
@ -3346,9 +3327,9 @@ dependencies = [
[[package]]
name = "whoami"
version = "1.1.3"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7741161a40200a867c96dfa5574544efa4178cf4c8f770b62dd1cc0362d7ae1"
checksum = "cabfe22aa4936611957e0b5ad9ed0472ac52b2bfb9aedac4a3f3a91a03bd1ff0"
dependencies = [
"wasm-bindgen",
"web-sys",
@ -3420,7 +3401,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d498dbd1fd7beb83c86709ae1c33ca50942889473473d287d56ce4770a18edfb"
dependencies = [
"proc-macro2 1.0.29",
"syn 1.0.76",
"syn 1.0.77",
"synstructure",
]

View File

@ -1,8 +1,7 @@
use std::{
fmt,
io::{Read, Seek, Write},
};
use std::io::{self, Read, Result as IoResult, Seek, Write};
use std::fmt;
use csv::{Reader as CsvReader, StringRecordsIntoIter};
use milli::documents::DocumentBatchBuilder;
use serde_json::{Deserializer, Map, Value};
@ -12,6 +11,7 @@ type Result<T> = std::result::Result<T, DocumentFormatError>;
pub enum PayloadType {
Jsonl,
Json,
Csv,
}
impl fmt::Display for PayloadType {
@ -19,6 +19,7 @@ impl fmt::Display for PayloadType {
match self {
PayloadType::Jsonl => write!(f, "ndjson"),
PayloadType::Json => write!(f, "json"),
PayloadType::Csv => write!(f, "csv"),
}
}
}
@ -34,7 +35,7 @@ pub enum DocumentFormatError {
),
}
internal_error!(DocumentFormatError: milli::documents::Error);
internal_error!(DocumentFormatError: milli::documents::Error, io::Error);
macro_rules! malformed {
($type:path, $e:expr) => {
@ -42,6 +43,20 @@ macro_rules! malformed {
};
}
pub fn read_csv(input: impl Read, writer: impl Write + Seek) -> Result<()> {
let mut builder = DocumentBatchBuilder::new(writer).unwrap();
let iter = CsvDocumentIter::from_reader(input)?;
for doc in iter {
let doc = doc?;
builder.add_documents(doc).unwrap();
}
builder.finish().unwrap();
Ok(())
}
/// read jsonl from input and write an obkv batch to writer.
pub fn read_jsonl(input: impl Read, writer: impl Write + Seek) -> Result<()> {
let mut builder = DocumentBatchBuilder::new(writer)?;
@ -68,3 +83,281 @@ pub fn read_json(input: impl Read, writer: impl Write + Seek) -> Result<()> {
Ok(())
}
enum AllowedType {
String,
Number,
}
fn parse_csv_header(header: &str) -> (String, AllowedType) {
// if there are several separators we only split on the last one.
match header.rsplit_once(':') {
Some((field_name, field_type)) => match field_type {
"string" => (field_name.to_string(), AllowedType::String),
"number" => (field_name.to_string(), AllowedType::Number),
// if the pattern isn't reconized, we keep the whole field.
_otherwise => (header.to_string(), AllowedType::String),
},
None => (header.to_string(), AllowedType::String),
}
}
pub struct CsvDocumentIter<R>
where
R: Read,
{
documents: StringRecordsIntoIter<R>,
headers: Vec<(String, AllowedType)>,
}
impl<R: Read> CsvDocumentIter<R> {
pub fn from_reader(reader: R) -> IoResult<Self> {
let mut records = CsvReader::from_reader(reader);
let headers = records
.headers()?
.into_iter()
.map(parse_csv_header)
.collect();
Ok(Self {
documents: records.into_records(),
headers,
})
}
}
impl<R: Read> Iterator for CsvDocumentIter<R> {
type Item = Result<Map<String, Value>>;
fn next(&mut self) -> Option<Self::Item> {
let csv_document = self.documents.next()?;
match csv_document {
Ok(csv_document) => {
let mut document = Map::new();
for ((field_name, field_type), value) in
self.headers.iter().zip(csv_document.into_iter())
{
let parsed_value = (|| match field_type {
AllowedType::Number => malformed!(PayloadType::Csv, value
.parse::<f64>()
.map(Value::from)),
AllowedType::String => Ok(Value::String(value.to_string())),
})();
match parsed_value {
Ok(value) => drop(document.insert(field_name.to_string(), value)),
Err(e) => return Some(Err(e)),
}
}
Some(Ok(document))
}
Err(e) => Some(Err(DocumentFormatError::MalformedPayload(Box::new(e), PayloadType::Csv))),
}
}
}
#[cfg(test)]
mod test {
use serde_json::json;
use super::*;
#[test]
fn simple_csv_document() {
let documents = r#"city,country,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city": "Boston",
"country": "United States",
"pop": "4628910",
})
);
}
#[test]
fn coma_in_field() {
let documents = r#"city,country,pop
"Boston","United, States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city": "Boston",
"country": "United, States",
"pop": "4628910",
})
);
}
#[test]
fn quote_in_field() {
let documents = r#"city,country,pop
"Boston","United"" States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city": "Boston",
"country": "United\" States",
"pop": "4628910",
})
);
}
#[test]
fn integer_in_field() {
let documents = r#"city,country,pop:number
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city": "Boston",
"country": "United States",
"pop": 4628910.0,
})
);
}
#[test]
fn float_in_field() {
let documents = r#"city,country,pop:number
"Boston","United States","4628910.01""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city": "Boston",
"country": "United States",
"pop": 4628910.01,
})
);
}
#[test]
fn several_colon_in_header() {
let documents = r#"city:love:string,country:state,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city:love": "Boston",
"country:state": "United States",
"pop": "4628910",
})
);
}
#[test]
fn ending_by_colon_in_header() {
let documents = r#"city:,country,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city:": "Boston",
"country": "United States",
"pop": "4628910",
})
);
}
#[test]
fn starting_by_colon_in_header() {
let documents = r#":city,country,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
":city": "Boston",
"country": "United States",
"pop": "4628910",
})
);
}
#[ignore]
#[test]
fn starting_by_colon_in_header2() {
let documents = r#":string,country,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert!(dbg!(csv_iter.next().unwrap()).is_err());
}
#[test]
fn double_colon_in_header() {
let documents = r#"city::string,country,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city:": "Boston",
"country": "United States",
"pop": "4628910",
})
);
}
#[test]
fn bad_type_in_header() {
let documents = r#"city,country:number,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert!(csv_iter.next().unwrap().is_err());
}
#[test]
fn bad_column_count1() {
let documents = r#"city,country,pop
"Boston","United States","4628910", "too much""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert!(csv_iter.next().unwrap().is_err());
}
#[test]
fn bad_column_count2() {
let documents = r#"city,country,pop
"Boston","United States""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert!(csv_iter.next().unwrap().is_err());
}
}

View File

@ -1,282 +0,0 @@
use super::error::{Result, UpdateLoopError};
use std::io::{Read, Result as IoResult};
use csv::{Reader as CsvReader, StringRecordsIntoIter};
use serde_json::{Map, Value};
enum AllowedType {
String,
Number,
}
fn parse_csv_header(header: &str) -> (String, AllowedType) {
// if there are several separators we only split on the last one.
match header.rsplit_once(':') {
Some((field_name, field_type)) => match field_type {
"string" => (field_name.to_string(), AllowedType::String),
"number" => (field_name.to_string(), AllowedType::Number),
// if the pattern isn't reconized, we keep the whole field.
_otherwise => (header.to_string(), AllowedType::String),
},
None => (header.to_string(), AllowedType::String),
}
}
pub struct CsvDocumentIter<R>
where
R: Read,
{
documents: StringRecordsIntoIter<R>,
headers: Vec<(String, AllowedType)>,
}
impl<R: Read> CsvDocumentIter<R> {
pub fn from_reader(reader: R) -> IoResult<Self> {
let mut records = CsvReader::from_reader(reader);
let headers = records
.headers()?
.into_iter()
.map(parse_csv_header)
.collect();
Ok(Self {
documents: records.into_records(),
headers,
})
}
}
impl<R: Read> Iterator for CsvDocumentIter<R> {
type Item = Result<Map<String, Value>>;
fn next(&mut self) -> Option<Self::Item> {
let csv_document = self.documents.next()?;
match csv_document {
Ok(csv_document) => {
let mut document = Map::new();
for ((field_name, field_type), value) in
self.headers.iter().zip(csv_document.into_iter())
{
let parsed_value = (|| match field_type {
AllowedType::Number => value
.parse::<f64>()
.map(Value::from)
.map_err(|e| UpdateLoopError::MalformedPayload(Box::new(e))),
AllowedType::String => Ok(Value::String(value.to_string())),
})();
match parsed_value {
Ok(value) => drop(document.insert(field_name.to_string(), value)),
Err(e) => return Some(Err(e)),
}
}
Some(Ok(document))
}
Err(e) => Some(Err(UpdateLoopError::MalformedPayload(Box::new(e)))),
}
}
}
#[cfg(test)]
mod test {
use serde_json::json;
use super::*;
#[test]
fn simple_csv_document() {
let documents = r#"city,country,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city": "Boston",
"country": "United States",
"pop": "4628910",
})
);
}
#[test]
fn coma_in_field() {
let documents = r#"city,country,pop
"Boston","United, States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city": "Boston",
"country": "United, States",
"pop": "4628910",
})
);
}
#[test]
fn quote_in_field() {
let documents = r#"city,country,pop
"Boston","United"" States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city": "Boston",
"country": "United\" States",
"pop": "4628910",
})
);
}
#[test]
fn integer_in_field() {
let documents = r#"city,country,pop:number
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city": "Boston",
"country": "United States",
"pop": 4628910.0,
})
);
}
#[test]
fn float_in_field() {
let documents = r#"city,country,pop:number
"Boston","United States","4628910.01""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city": "Boston",
"country": "United States",
"pop": 4628910.01,
})
);
}
#[test]
fn several_double_dot_in_header() {
let documents = r#"city:love:string,country:state,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city:love": "Boston",
"country:state": "United States",
"pop": "4628910",
})
);
}
#[test]
fn ending_by_double_dot_in_header() {
let documents = r#"city:,country,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city:": "Boston",
"country": "United States",
"pop": "4628910",
})
);
}
#[test]
fn starting_by_double_dot_in_header() {
let documents = r#":city,country,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
":city": "Boston",
"country": "United States",
"pop": "4628910",
})
);
}
#[test]
fn starting_by_double_dot_in_header2() {
let documents = r#":string,country,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert!(csv_iter.next().unwrap().is_err());
}
#[test]
fn double_double_dot_in_header() {
let documents = r#"city::string,country,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert_eq!(
Value::Object(csv_iter.next().unwrap().unwrap()),
json!({
"city:": "Boston",
"country": "United States",
"pop": "4628910",
})
);
}
#[test]
fn bad_type_in_header() {
let documents = r#"city,country:number,pop
"Boston","United States","4628910""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert!(csv_iter.next().unwrap().is_err());
}
#[test]
fn bad_column_count1() {
let documents = r#"city,country,pop
"Boston","United States","4628910", "too much""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert!(csv_iter.next().unwrap().is_err());
}
#[test]
fn bad_column_count2() {
let documents = r#"city,country,pop
"Boston","United States""#;
let mut csv_iter = CsvDocumentIter::from_reader(documents.as_bytes()).unwrap();
assert!(csv_iter.next().unwrap().is_err());
}
}

View File

@ -1,10 +1,8 @@
mod csv_documents_iter;
pub mod error;
mod message;
pub mod status;
pub mod store;
use crate::index_controller::updates::csv_documents_iter::CsvDocumentIter;
use std::io;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool;
@ -15,7 +13,6 @@ use async_stream::stream;
use bytes::Bytes;
use futures::{Stream, StreamExt};
use log::trace;
use milli::documents::DocumentBatchBuilder;
use milli::update::IndexDocumentsMethod;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
@ -24,13 +21,13 @@ use uuid::Uuid;
use self::error::{Result, UpdateLoopError};
pub use self::message::UpdateMsg;
use self::store::{UpdateStore, UpdateStoreInfo};
use crate::document_formats::read_json;
use crate::document_formats::{read_csv, read_json};
use crate::index::{Index, Settings, Unchecked};
use crate::index_controller::update_file_store::UpdateFileStore;
use status::UpdateStatus;
use super::index_resolver::HardStateIndexResolver;
use super::{DocumentAdditionFormat, Payload, Update};
use super::{DocumentAdditionFormat, Update};
pub type UpdateSender = mpsc::Sender<UpdateMsg>;
@ -198,6 +195,7 @@ impl UpdateLoop {
tokio::task::spawn_blocking(move || -> Result<_> {
match format {
DocumentAdditionFormat::Json => read_json(reader, &mut *update_file)?,
DocumentAdditionFormat::Csv => read_csv(reader, &mut *update_file)?,
}
update_file.persist()?;
@ -225,26 +223,6 @@ impl UpdateLoop {
Ok(status.into())
}
async fn documents_from_csv(&self, payload: Payload) -> Result<Uuid> {
let file_store = self.update_file_store.clone();
tokio::task::spawn_blocking(move || {
let (uuid, mut file) = file_store.new_update().unwrap();
let mut builder = DocumentBatchBuilder::new(&mut *file).unwrap();
let iter = CsvDocumentIter::from_reader(StreamReader::new(payload))?;
for doc in iter {
let doc = doc?;
builder.add_documents(doc).unwrap();
}
builder.finish().unwrap();
file.persist();
Ok(uuid)
})
.await?
}
async fn handle_list_updates(&self, uuid: Uuid) -> Result<Vec<UpdateStatus>> {
let update_store = self.store.clone();
tokio::task::spawn_blocking(move || {