Accept and mirror compression of documents additions

This commit is contained in:
Kerollmops 2020-12-20 23:10:09 +01:00
parent cd158d4cde
commit 9fcbc83ebc
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
3 changed files with 24 additions and 71 deletions

14
http-ui/Cargo.lock generated
View File

@ -80,19 +80,6 @@ dependencies = [
"warp", "warp",
] ]
[[package]]
name = "async-compression"
version = "0.3.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb1ff21a63d3262af46b9f33a826a8d134e2d0d9b2179c86034948b732ea8b2a"
dependencies = [
"flate2",
"futures-core",
"memchr",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "atty" name = "atty"
version = "0.2.14" version = "0.2.14"
@ -767,7 +754,6 @@ dependencies = [
"anyhow", "anyhow",
"askama", "askama",
"askama_warp", "askama_warp",
"async-compression",
"byte-unit", "byte-unit",
"bytes", "bytes",
"flate2", "flate2",

View File

@ -7,7 +7,6 @@ edition = "2018"
[dependencies] [dependencies]
anyhow = "1.0.28" anyhow = "1.0.28"
async-compression = { version = "0.3.6", features = ["gzip", "tokio-02"] }
byte-unit = { version = "4.0.9", default-features = false, features = ["std"] } byte-unit = { version = "4.0.9", default-features = false, features = ["std"] }
grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "3adcb26" } grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "3adcb26" }
heed = "0.10.5" heed = "0.10.5"

View File

@ -10,7 +10,6 @@ use std::time::Instant;
use std::{mem, io}; use std::{mem, io};
use askama_warp::Template; use askama_warp::Template;
use async_compression::tokio_02::write::GzipEncoder;
use byte_unit::Byte; use byte_unit::Byte;
use flate2::read::GzDecoder; use flate2::read::GzDecoder;
use futures::stream; use futures::stream;
@ -208,7 +207,7 @@ impl<M, P, N> UpdateStatus<M, P, N> {
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")] #[serde(tag = "type")]
enum UpdateMeta { enum UpdateMeta {
DocumentsAddition { method: String, format: String }, DocumentsAddition { method: String, format: String, encoding: Option<String> },
ClearDocuments, ClearDocuments,
Settings(Settings), Settings(Settings),
Facets(Facets), Facets(Facets),
@ -325,7 +324,7 @@ async fn main() -> anyhow::Result<()> {
// we extract the update type and execute the update itself. // we extract the update type and execute the update itself.
let result: anyhow::Result<()> = match meta { let result: anyhow::Result<()> = match meta {
UpdateMeta::DocumentsAddition { method, format } => { UpdateMeta::DocumentsAddition { method, format, encoding } => {
// We must use the write transaction of the update here. // We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?; let mut wtxn = index_cloned.write_txn()?;
let mut builder = update_builder.index_documents(&mut wtxn, &index_cloned); let mut builder = update_builder.index_documents(&mut wtxn, &index_cloned);
@ -343,11 +342,10 @@ async fn main() -> anyhow::Result<()> {
otherwise => panic!("invalid indexing method {:?}", otherwise), otherwise => panic!("invalid indexing method {:?}", otherwise),
}; };
let gzipped = true; let reader = match encoding.as_deref() {
let reader = if gzipped { Some("gzip") => Box::new(GzDecoder::new(content)),
Box::new(GzDecoder::new(content)) None => Box::new(content) as Box<dyn io::Read>,
} else { otherwise => panic!("invalid encoding format {:?}", otherwise),
Box::new(content) as Box<dyn io::Read>
}; };
let result = builder.execute(reader, |indexing_step| { let result = builder.execute(reader, |indexing_step| {
@ -703,21 +701,18 @@ async fn main() -> anyhow::Result<()> {
update_status_sender: broadcast::Sender<UpdateStatus<UpdateMeta, UpdateMetaProgress, String>>, update_status_sender: broadcast::Sender<UpdateStatus<UpdateMeta, UpdateMetaProgress, String>>,
update_method: Option<String>, update_method: Option<String>,
update_format: UpdateFormat, update_format: UpdateFormat,
encoding: Option<String>,
mut stream: impl futures::Stream<Item=Result<impl bytes::Buf, warp::Error>> + Unpin, mut stream: impl futures::Stream<Item=Result<impl bytes::Buf, warp::Error>> + Unpin,
) -> Result<impl warp::Reply, warp::Rejection> ) -> Result<impl warp::Reply, warp::Rejection>
{ {
let file = tokio::task::block_in_place(tempfile::tempfile).unwrap(); let file = tokio::task::block_in_place(tempfile::tempfile).unwrap();
let file = TFile::from_std(file); let mut file = TFile::from_std(file);
let mut encoder = GzipEncoder::new(file);
while let Some(result) = stream.next().await { while let Some(result) = stream.next().await {
let bytes = result.unwrap().to_bytes(); let bytes = result.unwrap().to_bytes();
encoder.write_all(&bytes[..]).await.unwrap(); file.write_all(&bytes[..]).await.unwrap();
} }
encoder.shutdown().await.unwrap();
let mut file = encoder.into_inner();
file.sync_all().await.unwrap();
let file = file.into_std().await; let file = file.into_std().await;
let mmap = unsafe { memmap::Mmap::map(&file).unwrap() }; let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
@ -734,7 +729,7 @@ async fn main() -> anyhow::Result<()> {
_ => panic!("Unknown update format"), _ => panic!("Unknown update format"),
}; };
let meta = UpdateMeta::DocumentsAddition { method, format }; let meta = UpdateMeta::DocumentsAddition { method, format, encoding };
let update_id = update_store.register_update(&meta, &mmap[..]).unwrap(); let update_id = update_store.register_update(&meta, &mmap[..]).unwrap();
let _ = update_status_sender.send(UpdateStatus::Pending { update_id, meta }); let _ = update_status_sender.send(UpdateStatus::Pending { update_id, meta });
eprintln!("update {} registered", update_id); eprintln!("update {} registered", update_id);
@ -749,51 +744,26 @@ async fn main() -> anyhow::Result<()> {
let update_store_cloned = update_store.clone(); let update_store_cloned = update_store.clone();
let update_status_sender_cloned = update_status_sender.clone(); let update_status_sender_cloned = update_status_sender.clone();
let indexing_csv_route = warp::filters::method::post() let indexing_route = warp::filters::method::post()
.and(warp::path!("documents")) .and(warp::path!("documents"))
.and(warp::header::exact_ignore_case("content-type", "text/csv")) .and(warp::header::header("content-type"))
.and(warp::filters::query::query()) .and(warp::header::optional::<String>("content-encoding"))
.and(warp::query::query())
.and(warp::body::stream()) .and(warp::body::stream())
.and_then(move |params: QueryUpdate, stream| { .and_then(move |content_type: String, content_encoding, params: QueryUpdate, stream| {
buf_stream( let format = match content_type.as_str() {
update_store_cloned.clone(), "text/csv" => UpdateFormat::Csv,
update_status_sender_cloned.clone(), "application/json" => UpdateFormat::Json,
params.method, "application/x-ndjson" => UpdateFormat::JsonStream,
UpdateFormat::Csv, otherwise => panic!("invalid update format: {}", otherwise),
stream, };
)
});
let update_store_cloned = update_store.clone();
let update_status_sender_cloned = update_status_sender.clone();
let indexing_json_route = warp::filters::method::post()
.and(warp::path!("documents"))
.and(warp::header::exact_ignore_case("content-type", "application/json"))
.and(warp::filters::query::query())
.and(warp::body::stream())
.and_then(move |params: QueryUpdate, stream| {
buf_stream( buf_stream(
update_store_cloned.clone(), update_store_cloned.clone(),
update_status_sender_cloned.clone(), update_status_sender_cloned.clone(),
params.method, params.method,
UpdateFormat::Json, format,
stream, content_encoding,
)
});
let update_store_cloned = update_store.clone();
let update_status_sender_cloned = update_status_sender.clone();
let indexing_json_stream_route = warp::filters::method::post()
.and(warp::path!("documents"))
.and(warp::header::exact_ignore_case("content-type", "application/x-ndjson"))
.and(warp::filters::query::query())
.and(warp::body::stream())
.and_then(move |params: QueryUpdate, stream| {
buf_stream(
update_store_cloned.clone(),
update_status_sender_cloned.clone(),
params.method,
UpdateFormat::JsonStream,
stream, stream,
) )
}); });
@ -904,9 +874,7 @@ async fn main() -> anyhow::Result<()> {
.or(dash_logo_black_route) .or(dash_logo_black_route)
.or(query_route) .or(query_route)
.or(document_route) .or(document_route)
.or(indexing_csv_route) .or(indexing_route)
.or(indexing_json_route)
.or(indexing_json_stream_route)
.or(abort_update_id_route) .or(abort_update_id_route)
.or(abort_pending_updates_route) .or(abort_pending_updates_route)
.or(clearing_route) .or(clearing_route)