mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-04 20:37:15 +02:00
Make the updates page interactive
This commit is contained in:
parent
35c9a3c558
commit
03ca1ff634
3 changed files with 102 additions and 29 deletions
|
@ -11,7 +11,7 @@ use askama_warp::Template;
|
|||
use futures::{FutureExt, StreamExt};
|
||||
use futures::stream;
|
||||
use heed::EnvOpenOptions;
|
||||
use serde::Deserialize;
|
||||
use serde::{Serialize, Deserialize};
|
||||
use structopt::StructOpt;
|
||||
use tokio::fs::File as TFile;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
@ -84,9 +84,19 @@ struct IndexTemplate {
|
|||
|
||||
#[derive(Template)]
|
||||
#[template(path = "updates.html")]
|
||||
struct UpdatesTemplate {
|
||||
struct UpdatesTemplate<M: Serialize + Send> {
|
||||
db_name: String,
|
||||
updates: Vec<String>,
|
||||
db_size: usize,
|
||||
docs_count: usize,
|
||||
updates: Vec<UpdateStatus<M>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(tag = "type")]
|
||||
enum UpdateStatus<M> {
|
||||
Pending { update_id: u64, meta: M },
|
||||
Processing { update_id: u64, meta: M },
|
||||
Processed { update_id: u64, meta: M },
|
||||
}
|
||||
|
||||
pub fn run(opt: Opt) -> anyhow::Result<()> {
|
||||
|
@ -116,10 +126,14 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
|
|||
let update_store = UpdateStore::open(
|
||||
update_store_options,
|
||||
update_store_path,
|
||||
move |uid, meta: String, _content| {
|
||||
let _ = update_status_sender_cloned.send(format!("processing update {}", uid));
|
||||
move |update_id, meta: String, _content| {
|
||||
let processing = UpdateStatus::Processing { update_id, meta: meta.clone() };
|
||||
let _ = update_status_sender_cloned.send(processing);
|
||||
|
||||
std::thread::sleep(Duration::from_secs(3));
|
||||
let _ = update_status_sender_cloned.send(format!("update {} processed", uid));
|
||||
|
||||
let processed = UpdateStatus::Processed { update_id, meta: meta.clone() };
|
||||
let _ = update_status_sender_cloned.send(processed);
|
||||
Ok(meta)
|
||||
})?;
|
||||
|
||||
|
@ -149,19 +163,24 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
|
|||
let mut updates = update_store.iter_metas(|processed, pending| {
|
||||
let mut updates = Vec::new();
|
||||
for result in processed {
|
||||
let (id, _) = result?;
|
||||
updates.push(format!("update {} processed", id.get()));
|
||||
let (uid, meta) = result?;
|
||||
updates.push(UpdateStatus::Processed { update_id: uid.get(), meta });
|
||||
}
|
||||
for result in pending {
|
||||
let (id, _) = result?;
|
||||
updates.push(format!("update {} pending", id.get()));
|
||||
let (uid, meta) = result?;
|
||||
updates.push(UpdateStatus::Pending { update_id: uid.get(), meta });
|
||||
}
|
||||
Ok(updates)
|
||||
}).unwrap();
|
||||
|
||||
if header.contains("text/html") {
|
||||
updates.reverse();
|
||||
let template = UpdatesTemplate { db_name: db_name.clone(), updates };
|
||||
let template = UpdatesTemplate {
|
||||
db_name: db_name.clone(),
|
||||
db_size,
|
||||
docs_count,
|
||||
updates,
|
||||
};
|
||||
Box::new(template) as Box<dyn warp::Reply>
|
||||
} else {
|
||||
Box::new(warp::reply::json(&updates))
|
||||
|
@ -289,7 +308,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
|
|||
|
||||
async fn buf_stream(
|
||||
update_store: Arc<UpdateStore<String>>,
|
||||
update_status_sender: broadcast::Sender<String>,
|
||||
update_status_sender: broadcast::Sender<UpdateStatus<String>>,
|
||||
mut stream: impl futures::Stream<Item=Result<impl bytes::Buf, warp::Error>> + Unpin,
|
||||
) -> Result<impl warp::Reply, warp::Rejection>
|
||||
{
|
||||
|
@ -305,9 +324,9 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
|
|||
let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
|
||||
|
||||
let meta = String::from("I am the metadata");
|
||||
let uid = update_store.register_update(&meta, &mmap[..]).unwrap();
|
||||
update_status_sender.send(format!("update {} pending", uid)).unwrap();
|
||||
eprintln!("Registering update {}", uid);
|
||||
let update_id = update_store.register_update(&meta, &mmap[..]).unwrap();
|
||||
update_status_sender.send(UpdateStatus::Pending { update_id, meta }).unwrap();
|
||||
eprintln!("update {} registered", update_id);
|
||||
|
||||
Ok(warp::reply())
|
||||
}
|
||||
|
@ -331,8 +350,11 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
|
|||
update_status_receiver
|
||||
.into_stream()
|
||||
.flat_map(|result| {
|
||||
match result{
|
||||
Ok(msg) => stream::iter(Some(Ok(Message::text(msg)))),
|
||||
match result {
|
||||
Ok(status) => {
|
||||
let msg = serde_json::to_string(&status).unwrap();
|
||||
stream::iter(Some(Ok(Message::text(msg))))
|
||||
},
|
||||
Err(e) => {
|
||||
eprintln!("channel error: {:?}", e);
|
||||
stream::iter(None)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue