From f6eecb855e5686021120062c268ec3f1d9424fc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= Date: Wed, 21 Oct 2020 15:38:28 +0200 Subject: [PATCH] Send a basic progressing status to the updates front page --- public/updates-script.js | 10 +-- src/indexing/mod.rs | 12 ++-- src/indexing/store.rs | 7 ++- src/subcommand/indexer.rs | 2 +- src/subcommand/serve.rs | 129 ++++++++++++++++++++++++++++++-------- src/update_store.rs | 10 +-- templates/updates.html | 4 +- 7 files changed, 127 insertions(+), 47 deletions(-) diff --git a/public/updates-script.js b/public/updates-script.js index 09e9c25e1..8005c554c 100644 --- a/public/updates-script.js +++ b/public/updates-script.js @@ -49,22 +49,16 @@ $(window).on('load', function () { prependChild(results, elem); } - if (status.type == "Processing") { - const id = 'update-' + status.update_id; - const content = $(`#${id} .updateStatus.content`); - content.html('processing...'); - } - if (status.type == "Progressing") { const id = 'update-' + status.update_id; const content = $(`#${id} .updateStatus.content`); - content.html('progressing...'); + content.html('progressing... ' + JSON.stringify(status.meta)); } if (status.type == "Processed") { const id = 'update-' + status.update_id; const content = $(`#${id} .updateStatus.content`); - content.html('processed'); + content.html('processed ' + JSON.stringify(status.meta)); } } }); diff --git a/src/indexing/mod.rs b/src/indexing/mod.rs index 5e7293bcd..ef7a6f108 100644 --- a/src/indexing/mod.rs +++ b/src/indexing/mod.rs @@ -230,26 +230,30 @@ fn csv_bytes_readers<'a>( readers } -pub fn run<'a>( +pub fn run<'a, F>( env: &heed::Env, index: &Index, opt: &IndexerOpt, content: &'a [u8], gzipped: bool, + progress_callback: F, ) -> anyhow::Result<()> +where F: Fn(u32) + Sync + Send, { let jobs = opt.indexing_jobs.unwrap_or(0); let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?; - pool.install(|| run_intern(env, index, opt, content, gzipped)) + pool.install(|| run_intern(env, index, opt, content, gzipped, progress_callback)) } -fn run_intern<'a>( +fn run_intern<'a, F>( env: &heed::Env, index: &Index, opt: &IndexerOpt, content: &'a [u8], gzipped: bool, + progress_callback: F, ) -> anyhow::Result<()> +where F: Fn(u32) + Sync + Send, { let before_indexing = Instant::now(); let num_threads = rayon::current_num_threads(); @@ -283,7 +287,7 @@ fn run_intern<'a>( chunk_fusing_shrink_size, )?; let base_document_id = number_of_documents; - store.index_csv(rdr, base_document_id, i, num_threads, log_every_n) + store.index_csv(rdr, base_document_id, i, num_threads, log_every_n, &progress_callback) }) .collect::, _>>()?; diff --git a/src/indexing/store.rs b/src/indexing/store.rs index 904286ae2..3f96880a3 100644 --- a/src/indexing/store.rs +++ b/src/indexing/store.rs @@ -301,14 +301,16 @@ impl Store { Ok(()) } - pub fn index_csv<'a>( + pub fn index_csv<'a, F>( mut self, mut rdr: csv::Reader>, base_document_id: usize, thread_index: usize, num_threads: usize, log_every_n: usize, + mut progress_callback: F, ) -> anyhow::Result + where F: FnMut(u32), { debug!("{:?}: Indexing in a Store...", thread_index); @@ -328,6 +330,7 @@ impl Store { if document_id % log_every_n == 0 { let count = format_count(document_id); info!("We have seen {} documents so far ({:.02?}).", count, before.elapsed()); + progress_callback((document_id - base_document_id) as u32); before = Instant::now(); } @@ -349,6 +352,8 @@ impl Store { document_id = document_id + 1; } + progress_callback((document_id - base_document_id) as u32); + let readers = self.finish()?; debug!("{:?}: Store created!", thread_index); Ok(readers) diff --git a/src/subcommand/indexer.rs b/src/subcommand/indexer.rs index bfa9b2bc7..ba0a03c41 100644 --- a/src/subcommand/indexer.rs +++ b/src/subcommand/indexer.rs @@ -63,5 +63,5 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { let file = File::open(file_path)?; let content = unsafe { memmap::Mmap::map(&file)? }; - indexing::run(&env, &index, &opt.indexer, &content, gzipped) + indexing::run(&env, &index, &opt.indexer, &content, gzipped, |_docid| { }) } diff --git a/src/subcommand/serve.rs b/src/subcommand/serve.rs index a6970431c..094ce3059 100644 --- a/src/subcommand/serve.rs +++ b/src/subcommand/serve.rs @@ -87,20 +87,39 @@ struct IndexTemplate { #[derive(Template)] #[template(path = "updates.html")] -struct UpdatesTemplate { +struct UpdatesTemplate { db_name: String, db_size: usize, docs_count: usize, - updates: Vec>, + updates: Vec>, } #[derive(Debug, Clone, Serialize)] #[serde(tag = "type")] -enum UpdateStatus { +enum UpdateStatus { Pending { update_id: u64, meta: M }, - Processing { update_id: u64, meta: M }, - Progressing { update_id: u64, meta: M }, - Processed { update_id: u64, meta: M }, + Progressing { update_id: u64, meta: P }, + Processed { update_id: u64, meta: N }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +enum UpdateMeta { + DocumentsAddition { + total_number_of_documents: Option, + }, + DocumentsAdditionFromPath { + path: PathBuf, + }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +enum UpdateMetaProgress { + DocumentsAddition { + processed_number_of_documents: usize, + total_number_of_documents: Option, + }, } pub fn run(opt: Opt) -> anyhow::Result<()> { @@ -134,21 +153,62 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { let update_store = UpdateStore::open( update_store_options, update_store_path, - move |update_id, meta: String, content| { - let processing = UpdateStatus::Processing { update_id, meta: meta.clone() }; - let _ = update_status_sender_cloned.send(processing); + move |update_id, meta, content| { + let result = match meta { + UpdateMeta::DocumentsAddition { total_number_of_documents } => { + let gzipped = false; + indexing::run( + &env_cloned, + &index_cloned, + &indexer_opt_cloned, + content, + gzipped, + |count| { + // We send progress status... + let meta = UpdateMetaProgress::DocumentsAddition { + processed_number_of_documents: count as usize, + total_number_of_documents, + }; + let progress = UpdateStatus::Progressing { update_id, meta }; + let _ = update_status_sender_cloned.send(progress); + }, + ) + }, + UpdateMeta::DocumentsAdditionFromPath { path } => { + let file = match File::open(&path) { + Ok(file) => file, + Err(e) => { + let meta = format!("documents addition file ({}) error: {}", path.display(), e); + return Ok(meta); + } + }; + let content = match unsafe { memmap::Mmap::map(&file) } { + Ok(mmap) => mmap, + Err(e) => { + let meta = format!("documents addition file ({}) mmap error: {}", path.display(), e); + return Ok(meta); + }, + }; - let _progress = UpdateStatus::Progressing { update_id, meta: meta.clone() }; - // let _ = update_status_sender_cloned.send(progress); - - let gzipped = false; - let result = indexing::run( - &env_cloned, - &index_cloned, - &indexer_opt_cloned, - content, - gzipped, - ); + let gzipped = path.extension().map_or(false, |e| e == "gz" || e == "gzip"); + indexing::run( + &env_cloned, + &index_cloned, + &indexer_opt_cloned, + &content, + gzipped, + |count| { + // We send progress status... + let meta = UpdateMetaProgress::DocumentsAddition { + processed_number_of_documents: count as usize, + total_number_of_documents: None, + }; + let progress = UpdateStatus::Progressing { update_id, meta }; + let _ = update_status_sender_cloned.send(progress); + }, + ) + } + }; let meta = match result { Ok(()) => format!("valid update content"), @@ -201,7 +261,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { .map(move |header: String| { let update_store = update_store_cloned.clone(); let mut updates = update_store.iter_metas(|processed, pending| { - let mut updates = Vec::new(); + let mut updates = Vec::>::new(); for result in processed { let (uid, meta) = result?; updates.push(UpdateStatus::Processed { update_id: uid.get(), meta }); @@ -359,8 +419,8 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { }); async fn buf_stream( - update_store: Arc>, - update_status_sender: broadcast::Sender>, + update_store: Arc>, + update_status_sender: broadcast::Sender>, mut stream: impl futures::Stream> + Unpin, ) -> Result { @@ -375,7 +435,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { let file = file.into_std().await; let mmap = unsafe { memmap::Mmap::map(&file).unwrap() }; - let meta = String::from("I am the metadata"); + let meta = UpdateMeta::DocumentsAddition { total_number_of_documents: None }; let update_id = update_store.register_update(&meta, &mmap[..]).unwrap(); let _ = update_status_sender.send(UpdateStatus::Pending { update_id, meta }); eprintln!("update {} registered", update_id); @@ -385,13 +445,29 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { let update_store_cloned = update_store.clone(); let update_status_sender_cloned = update_status_sender.clone(); - let indexing_route = warp::filters::method::post() + let indexing_route_csv = warp::filters::method::post() .and(warp::path!("documents")) + .and(warp::header::exact_ignore_case("content-type", "text/csv")) .and(warp::body::stream()) .and_then(move |stream| { buf_stream(update_store_cloned.clone(), update_status_sender_cloned.clone(), stream) }); + let update_store_cloned = update_store.clone(); + let update_status_sender_cloned = update_status_sender.clone(); + let indexing_route_filepath = warp::filters::method::post() + .and(warp::path!("documents")) + .and(warp::header::exact_ignore_case("content-type", "text/x-filepath")) + .and(warp::body::bytes()) + .map(move |bytes: bytes::Bytes| { + let string = std::str::from_utf8(&bytes).unwrap().trim(); + let meta = UpdateMeta::DocumentsAdditionFromPath { path: PathBuf::from(string) }; + let update_id = update_store_cloned.register_update(&meta, &[]).unwrap(); + let _ = update_status_sender_cloned.send(UpdateStatus::Pending { update_id, meta }); + eprintln!("update {} registered", update_id); + Ok(warp::reply()) + }); + let update_ws_route = warp::ws() .and(warp::path!("updates" / "ws")) .map(move |ws: warp::ws::Ws| { @@ -435,7 +511,8 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { .or(dash_logo_white_route) .or(dash_logo_black_route) .or(query_route) - .or(indexing_route) + .or(indexing_route_csv) + .or(indexing_route_filepath) .or(update_ws_route); let addr = SocketAddr::from_str(&opt.http_listen_addr)?; diff --git a/src/update_store.rs b/src/update_store.rs index 304d1a1e6..448a0ed95 100644 --- a/src/update_store.rs +++ b/src/update_store.rs @@ -2,7 +2,7 @@ use std::path::Path; use std::sync::Arc; use crossbeam_channel::Sender; -use heed::types::{OwnedType, DecodeIgnore, SerdeBincode, ByteSlice}; +use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice}; use heed::{EnvOpenOptions, Env, Database}; use serde::{Serialize, Deserialize}; @@ -11,9 +11,9 @@ use crate::BEU64; #[derive(Clone)] pub struct UpdateStore { env: Env, - pending_meta: Database, SerdeBincode>, + pending_meta: Database, SerdeJson>, pending: Database, ByteSlice>, - processed_meta: Database, SerdeBincode>, + processed_meta: Database, SerdeJson>, notification_sender: Sender<()>, } @@ -156,8 +156,8 @@ impl UpdateStore { M: for<'a> Deserialize<'a>, N: for<'a> Deserialize<'a>, F: for<'a> FnMut( - heed::RoIter<'a, OwnedType, SerdeBincode>, - heed::RoIter<'a, OwnedType, SerdeBincode>, + heed::RoIter<'a, OwnedType, SerdeJson>, + heed::RoIter<'a, OwnedType, SerdeJson>, ) -> heed::Result, { let rtxn = self.env.read_txn()?; diff --git a/templates/updates.html b/templates/updates.html index 909df222e..8ffdae390 100644 --- a/templates/updates.html +++ b/templates/updates.html @@ -55,7 +55,7 @@ {% for update in updates %} {% match update %} - {% when UpdateStatus::Pending with { update_id , meta } %} + {% when UpdateStatus::Pending with { update_id, meta } %}
    1. @@ -64,7 +64,7 @@
  • - {% when UpdateStatus::Processed with { update_id , meta } %} + {% when UpdateStatus::Processed with { update_id, meta } %}