Send a basic progressing status to the updates front page

This commit is contained in:
Clément Renault 2020-10-21 15:38:28 +02:00
parent 4eeeccb9cd
commit f6eecb855e
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
7 changed files with 127 additions and 47 deletions

View File

@ -49,22 +49,16 @@ $(window).on('load', function () {
prependChild(results, elem); prependChild(results, elem);
} }
if (status.type == "Processing") {
const id = 'update-' + status.update_id;
const content = $(`#${id} .updateStatus.content`);
content.html('processing...');
}
if (status.type == "Progressing") { if (status.type == "Progressing") {
const id = 'update-' + status.update_id; const id = 'update-' + status.update_id;
const content = $(`#${id} .updateStatus.content`); const content = $(`#${id} .updateStatus.content`);
content.html('progressing...'); content.html('progressing... ' + JSON.stringify(status.meta));
} }
if (status.type == "Processed") { if (status.type == "Processed") {
const id = 'update-' + status.update_id; const id = 'update-' + status.update_id;
const content = $(`#${id} .updateStatus.content`); const content = $(`#${id} .updateStatus.content`);
content.html('processed'); content.html('processed ' + JSON.stringify(status.meta));
} }
} }
}); });

View File

@ -230,26 +230,30 @@ fn csv_bytes_readers<'a>(
readers readers
} }
pub fn run<'a>( pub fn run<'a, F>(
env: &heed::Env, env: &heed::Env,
index: &Index, index: &Index,
opt: &IndexerOpt, opt: &IndexerOpt,
content: &'a [u8], content: &'a [u8],
gzipped: bool, gzipped: bool,
progress_callback: F,
) -> anyhow::Result<()> ) -> anyhow::Result<()>
where F: Fn(u32) + Sync + Send,
{ {
let jobs = opt.indexing_jobs.unwrap_or(0); let jobs = opt.indexing_jobs.unwrap_or(0);
let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?; let pool = rayon::ThreadPoolBuilder::new().num_threads(jobs).build()?;
pool.install(|| run_intern(env, index, opt, content, gzipped)) pool.install(|| run_intern(env, index, opt, content, gzipped, progress_callback))
} }
fn run_intern<'a>( fn run_intern<'a, F>(
env: &heed::Env, env: &heed::Env,
index: &Index, index: &Index,
opt: &IndexerOpt, opt: &IndexerOpt,
content: &'a [u8], content: &'a [u8],
gzipped: bool, gzipped: bool,
progress_callback: F,
) -> anyhow::Result<()> ) -> anyhow::Result<()>
where F: Fn(u32) + Sync + Send,
{ {
let before_indexing = Instant::now(); let before_indexing = Instant::now();
let num_threads = rayon::current_num_threads(); let num_threads = rayon::current_num_threads();
@ -283,7 +287,7 @@ fn run_intern<'a>(
chunk_fusing_shrink_size, chunk_fusing_shrink_size,
)?; )?;
let base_document_id = number_of_documents; let base_document_id = number_of_documents;
store.index_csv(rdr, base_document_id, i, num_threads, log_every_n) store.index_csv(rdr, base_document_id, i, num_threads, log_every_n, &progress_callback)
}) })
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;

View File

@ -301,14 +301,16 @@ impl Store {
Ok(()) Ok(())
} }
pub fn index_csv<'a>( pub fn index_csv<'a, F>(
mut self, mut self,
mut rdr: csv::Reader<Box<dyn Read + Send + 'a>>, mut rdr: csv::Reader<Box<dyn Read + Send + 'a>>,
base_document_id: usize, base_document_id: usize,
thread_index: usize, thread_index: usize,
num_threads: usize, num_threads: usize,
log_every_n: usize, log_every_n: usize,
mut progress_callback: F,
) -> anyhow::Result<Readers> ) -> anyhow::Result<Readers>
where F: FnMut(u32),
{ {
debug!("{:?}: Indexing in a Store...", thread_index); debug!("{:?}: Indexing in a Store...", thread_index);
@ -328,6 +330,7 @@ impl Store {
if document_id % log_every_n == 0 { if document_id % log_every_n == 0 {
let count = format_count(document_id); let count = format_count(document_id);
info!("We have seen {} documents so far ({:.02?}).", count, before.elapsed()); info!("We have seen {} documents so far ({:.02?}).", count, before.elapsed());
progress_callback((document_id - base_document_id) as u32);
before = Instant::now(); before = Instant::now();
} }
@ -349,6 +352,8 @@ impl Store {
document_id = document_id + 1; document_id = document_id + 1;
} }
progress_callback((document_id - base_document_id) as u32);
let readers = self.finish()?; let readers = self.finish()?;
debug!("{:?}: Store created!", thread_index); debug!("{:?}: Store created!", thread_index);
Ok(readers) Ok(readers)

View File

@ -63,5 +63,5 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
let file = File::open(file_path)?; let file = File::open(file_path)?;
let content = unsafe { memmap::Mmap::map(&file)? }; let content = unsafe { memmap::Mmap::map(&file)? };
indexing::run(&env, &index, &opt.indexer, &content, gzipped) indexing::run(&env, &index, &opt.indexer, &content, gzipped, |_docid| { })
} }

View File

@ -87,20 +87,39 @@ struct IndexTemplate {
#[derive(Template)] #[derive(Template)]
#[template(path = "updates.html")] #[template(path = "updates.html")]
struct UpdatesTemplate<M: Serialize + Send> { struct UpdatesTemplate<M: Serialize + Send, P: Serialize + Send, N: Serialize + Send> {
db_name: String, db_name: String,
db_size: usize, db_size: usize,
docs_count: usize, docs_count: usize,
updates: Vec<UpdateStatus<M>>, updates: Vec<UpdateStatus<M, P, N>>,
} }
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
#[serde(tag = "type")] #[serde(tag = "type")]
enum UpdateStatus<M> { enum UpdateStatus<M, P, N> {
Pending { update_id: u64, meta: M }, Pending { update_id: u64, meta: M },
Processing { update_id: u64, meta: M }, Progressing { update_id: u64, meta: P },
Progressing { update_id: u64, meta: M }, Processed { update_id: u64, meta: N },
Processed { update_id: u64, meta: M }, }
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
enum UpdateMeta {
DocumentsAddition {
total_number_of_documents: Option<usize>,
},
DocumentsAdditionFromPath {
path: PathBuf,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
enum UpdateMetaProgress {
DocumentsAddition {
processed_number_of_documents: usize,
total_number_of_documents: Option<usize>,
},
} }
pub fn run(opt: Opt) -> anyhow::Result<()> { pub fn run(opt: Opt) -> anyhow::Result<()> {
@ -134,21 +153,62 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
let update_store = UpdateStore::open( let update_store = UpdateStore::open(
update_store_options, update_store_options,
update_store_path, update_store_path,
move |update_id, meta: String, content| { move |update_id, meta, content| {
let processing = UpdateStatus::Processing { update_id, meta: meta.clone() }; let result = match meta {
let _ = update_status_sender_cloned.send(processing); UpdateMeta::DocumentsAddition { total_number_of_documents } => {
let _progress = UpdateStatus::Progressing { update_id, meta: meta.clone() };
// let _ = update_status_sender_cloned.send(progress);
let gzipped = false; let gzipped = false;
let result = indexing::run( indexing::run(
&env_cloned, &env_cloned,
&index_cloned, &index_cloned,
&indexer_opt_cloned, &indexer_opt_cloned,
content, content,
gzipped, gzipped,
); |count| {
// We send progress status...
let meta = UpdateMetaProgress::DocumentsAddition {
processed_number_of_documents: count as usize,
total_number_of_documents,
};
let progress = UpdateStatus::Progressing { update_id, meta };
let _ = update_status_sender_cloned.send(progress);
},
)
},
UpdateMeta::DocumentsAdditionFromPath { path } => {
let file = match File::open(&path) {
Ok(file) => file,
Err(e) => {
let meta = format!("documents addition file ({}) error: {}", path.display(), e);
return Ok(meta);
}
};
let content = match unsafe { memmap::Mmap::map(&file) } {
Ok(mmap) => mmap,
Err(e) => {
let meta = format!("documents addition file ({}) mmap error: {}", path.display(), e);
return Ok(meta);
},
};
let gzipped = path.extension().map_or(false, |e| e == "gz" || e == "gzip");
indexing::run(
&env_cloned,
&index_cloned,
&indexer_opt_cloned,
&content,
gzipped,
|count| {
// We send progress status...
let meta = UpdateMetaProgress::DocumentsAddition {
processed_number_of_documents: count as usize,
total_number_of_documents: None,
};
let progress = UpdateStatus::Progressing { update_id, meta };
let _ = update_status_sender_cloned.send(progress);
},
)
}
};
let meta = match result { let meta = match result {
Ok(()) => format!("valid update content"), Ok(()) => format!("valid update content"),
@ -201,7 +261,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
.map(move |header: String| { .map(move |header: String| {
let update_store = update_store_cloned.clone(); let update_store = update_store_cloned.clone();
let mut updates = update_store.iter_metas(|processed, pending| { let mut updates = update_store.iter_metas(|processed, pending| {
let mut updates = Vec::new(); let mut updates = Vec::<UpdateStatus<_, UpdateMetaProgress, _>>::new();
for result in processed { for result in processed {
let (uid, meta) = result?; let (uid, meta) = result?;
updates.push(UpdateStatus::Processed { update_id: uid.get(), meta }); updates.push(UpdateStatus::Processed { update_id: uid.get(), meta });
@ -359,8 +419,8 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
}); });
async fn buf_stream( async fn buf_stream(
update_store: Arc<UpdateStore<String, String>>, update_store: Arc<UpdateStore<UpdateMeta, String>>,
update_status_sender: broadcast::Sender<UpdateStatus<String>>, update_status_sender: broadcast::Sender<UpdateStatus<UpdateMeta, UpdateMetaProgress, String>>,
mut stream: impl futures::Stream<Item=Result<impl bytes::Buf, warp::Error>> + Unpin, mut stream: impl futures::Stream<Item=Result<impl bytes::Buf, warp::Error>> + Unpin,
) -> Result<impl warp::Reply, warp::Rejection> ) -> Result<impl warp::Reply, warp::Rejection>
{ {
@ -375,7 +435,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
let file = file.into_std().await; let file = file.into_std().await;
let mmap = unsafe { memmap::Mmap::map(&file).unwrap() }; let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
let meta = String::from("I am the metadata"); let meta = UpdateMeta::DocumentsAddition { total_number_of_documents: None };
let update_id = update_store.register_update(&meta, &mmap[..]).unwrap(); let update_id = update_store.register_update(&meta, &mmap[..]).unwrap();
let _ = update_status_sender.send(UpdateStatus::Pending { update_id, meta }); let _ = update_status_sender.send(UpdateStatus::Pending { update_id, meta });
eprintln!("update {} registered", update_id); eprintln!("update {} registered", update_id);
@ -385,13 +445,29 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
let update_store_cloned = update_store.clone(); let update_store_cloned = update_store.clone();
let update_status_sender_cloned = update_status_sender.clone(); let update_status_sender_cloned = update_status_sender.clone();
let indexing_route = warp::filters::method::post() let indexing_route_csv = warp::filters::method::post()
.and(warp::path!("documents")) .and(warp::path!("documents"))
.and(warp::header::exact_ignore_case("content-type", "text/csv"))
.and(warp::body::stream()) .and(warp::body::stream())
.and_then(move |stream| { .and_then(move |stream| {
buf_stream(update_store_cloned.clone(), update_status_sender_cloned.clone(), stream) buf_stream(update_store_cloned.clone(), update_status_sender_cloned.clone(), stream)
}); });
let update_store_cloned = update_store.clone();
let update_status_sender_cloned = update_status_sender.clone();
let indexing_route_filepath = warp::filters::method::post()
.and(warp::path!("documents"))
.and(warp::header::exact_ignore_case("content-type", "text/x-filepath"))
.and(warp::body::bytes())
.map(move |bytes: bytes::Bytes| {
let string = std::str::from_utf8(&bytes).unwrap().trim();
let meta = UpdateMeta::DocumentsAdditionFromPath { path: PathBuf::from(string) };
let update_id = update_store_cloned.register_update(&meta, &[]).unwrap();
let _ = update_status_sender_cloned.send(UpdateStatus::Pending { update_id, meta });
eprintln!("update {} registered", update_id);
Ok(warp::reply())
});
let update_ws_route = warp::ws() let update_ws_route = warp::ws()
.and(warp::path!("updates" / "ws")) .and(warp::path!("updates" / "ws"))
.map(move |ws: warp::ws::Ws| { .map(move |ws: warp::ws::Ws| {
@ -435,7 +511,8 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
.or(dash_logo_white_route) .or(dash_logo_white_route)
.or(dash_logo_black_route) .or(dash_logo_black_route)
.or(query_route) .or(query_route)
.or(indexing_route) .or(indexing_route_csv)
.or(indexing_route_filepath)
.or(update_ws_route); .or(update_ws_route);
let addr = SocketAddr::from_str(&opt.http_listen_addr)?; let addr = SocketAddr::from_str(&opt.http_listen_addr)?;

View File

@ -2,7 +2,7 @@ use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use crossbeam_channel::Sender; use crossbeam_channel::Sender;
use heed::types::{OwnedType, DecodeIgnore, SerdeBincode, ByteSlice}; use heed::types::{OwnedType, DecodeIgnore, SerdeJson, ByteSlice};
use heed::{EnvOpenOptions, Env, Database}; use heed::{EnvOpenOptions, Env, Database};
use serde::{Serialize, Deserialize}; use serde::{Serialize, Deserialize};
@ -11,9 +11,9 @@ use crate::BEU64;
#[derive(Clone)] #[derive(Clone)]
pub struct UpdateStore<M, N> { pub struct UpdateStore<M, N> {
env: Env, env: Env,
pending_meta: Database<OwnedType<BEU64>, SerdeBincode<M>>, pending_meta: Database<OwnedType<BEU64>, SerdeJson<M>>,
pending: Database<OwnedType<BEU64>, ByteSlice>, pending: Database<OwnedType<BEU64>, ByteSlice>,
processed_meta: Database<OwnedType<BEU64>, SerdeBincode<N>>, processed_meta: Database<OwnedType<BEU64>, SerdeJson<N>>,
notification_sender: Sender<()>, notification_sender: Sender<()>,
} }
@ -156,8 +156,8 @@ impl<M: 'static, N: 'static> UpdateStore<M, N> {
M: for<'a> Deserialize<'a>, M: for<'a> Deserialize<'a>,
N: for<'a> Deserialize<'a>, N: for<'a> Deserialize<'a>,
F: for<'a> FnMut( F: for<'a> FnMut(
heed::RoIter<'a, OwnedType<BEU64>, SerdeBincode<N>>, heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<N>>,
heed::RoIter<'a, OwnedType<BEU64>, SerdeBincode<M>>, heed::RoIter<'a, OwnedType<BEU64>, SerdeJson<M>>,
) -> heed::Result<T>, ) -> heed::Result<T>,
{ {
let rtxn = self.env.read_txn()?; let rtxn = self.env.read_txn()?;