mirror of
https://github.com/meilisearch/MeiliSearch
synced 2025-07-04 20:37:15 +02:00
Introduce some new routes to handle live indexing
This commit is contained in:
parent
d3145be744
commit
871222aebd
5 changed files with 215 additions and 125 deletions
|
@ -1,18 +1,25 @@
|
|||
use std::collections::HashSet;
|
||||
use std::fs::File;
|
||||
use std::fs::{File, create_dir_all};
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use askama_warp::Template;
|
||||
use futures::FutureExt;
|
||||
use futures::StreamExt;
|
||||
use heed::EnvOpenOptions;
|
||||
use serde::Deserialize;
|
||||
use structopt::StructOpt;
|
||||
use tokio::fs::File as TFile;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use warp::filters::ws::Message;
|
||||
use warp::{Filter, http::Response};
|
||||
|
||||
use crate::tokenizer::{simple_tokenizer, TokenType};
|
||||
use crate::{Index, SearchResult};
|
||||
use crate::{Index, UpdateStore, SearchResult};
|
||||
|
||||
#[derive(Debug, StructOpt)]
|
||||
/// The HTTP main server of the milli project.
|
||||
|
@ -27,6 +34,11 @@ pub struct Opt {
|
|||
#[structopt(long = "db-size", default_value = "107374182400")] // 100 GB
|
||||
database_size: usize,
|
||||
|
||||
/// The maximum size the database that stores the updates can take on disk. It is recommended
|
||||
/// to specify the whole disk space (value must be a multiple of a page size).
|
||||
#[structopt(long = "udb-size", default_value = "10737418240")] // 10 GB
|
||||
update_database_size: usize,
|
||||
|
||||
/// Disable document highlighting on the dashboard.
|
||||
#[structopt(long)]
|
||||
disable_highlighting: bool,
|
||||
|
@ -84,6 +96,25 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
|
|||
// Open the LMDB database.
|
||||
let index = Index::new(&env)?;
|
||||
|
||||
// Setup the LMDB based update database.
|
||||
let mut update_store_options = EnvOpenOptions::new();
|
||||
update_store_options.map_size(opt.update_database_size);
|
||||
|
||||
let update_store_path = opt.database.join("updates.mdb");
|
||||
create_dir_all(&update_store_path)?;
|
||||
|
||||
let (update_status_sender, update_status_receiver) = async_channel::unbounded();
|
||||
let update_status_sender_cloned = update_status_sender.clone();
|
||||
let update_store = UpdateStore::open(
|
||||
update_store_options,
|
||||
update_store_path,
|
||||
move |_uid, meta: String, _content| {
|
||||
let _ = update_status_sender_cloned.try_send("processing update");
|
||||
std::thread::sleep(Duration::from_secs(3));
|
||||
let _ = update_status_sender_cloned.try_send("update processed");
|
||||
Ok(meta)
|
||||
})?;
|
||||
|
||||
// Retrieve the database the file stem (w/o the extension),
|
||||
// the disk file size and the number of documents in the database.
|
||||
let db_name = opt.database.file_stem().and_then(|s| s.to_str()).unwrap_or("").to_string();
|
||||
|
@ -212,6 +243,77 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
|
|||
.body(String::from_utf8(body).unwrap())
|
||||
});
|
||||
|
||||
async fn buf_stream(
|
||||
update_store: Arc<UpdateStore<String>>,
|
||||
update_status_sender: async_channel::Sender<&'static str>,
|
||||
mut stream: impl futures::Stream<Item=Result<impl bytes::Buf, warp::Error>> + Unpin,
|
||||
) -> Result<impl warp::Reply, warp::Rejection>
|
||||
{
|
||||
let file = tokio::task::block_in_place(tempfile::tempfile).unwrap();
|
||||
let mut file = TFile::from_std(file);
|
||||
|
||||
while let Some(result) = stream.next().await {
|
||||
let bytes = result.unwrap().to_bytes();
|
||||
file.write_all(&bytes[..]).await.unwrap();
|
||||
}
|
||||
|
||||
let file = file.into_std().await;
|
||||
let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
|
||||
|
||||
let meta = String::from("I am the metadata");
|
||||
let uid = update_store.register_update(&meta, &mmap[..]).unwrap();
|
||||
update_status_sender.try_send("registering update").unwrap();
|
||||
eprintln!("Registering update {}", uid);
|
||||
|
||||
Ok(warp::reply())
|
||||
}
|
||||
|
||||
let update_store_cloned = update_store.clone();
|
||||
let indexing_route = warp::filters::method::post()
|
||||
.and(warp::path!("documents"))
|
||||
.and(warp::body::stream())
|
||||
.and_then(move |stream| {
|
||||
buf_stream(update_store_cloned.clone(), update_status_sender.clone(), stream)
|
||||
});
|
||||
|
||||
let update_ws_route = warp::ws()
|
||||
.and(warp::path!("updates" / "ws"))
|
||||
.map(move |ws: warp::ws::Ws| {
|
||||
// And then our closure will be called when it completes...
|
||||
let update_status_receiver_cloned = update_status_receiver.clone();
|
||||
ws.on_upgrade(|websocket| {
|
||||
// Just echo all updates messages...
|
||||
update_status_receiver_cloned
|
||||
.map(|msg| Ok(Message::text(msg)))
|
||||
.forward(websocket)
|
||||
.map(|result| {
|
||||
if let Err(e) = result {
|
||||
eprintln!("websocket error: {:?}", e);
|
||||
}
|
||||
})
|
||||
})
|
||||
});
|
||||
|
||||
let update_store_cloned = update_store.clone();
|
||||
let list_updates_route = warp::filters::method::get()
|
||||
.and(warp::path!("updates"))
|
||||
.map(move || {
|
||||
let update_store = update_store_cloned.clone();
|
||||
let updates = update_store.iter_metas(|processed, pending| {
|
||||
let mut updates = Vec::new();
|
||||
for result in processed {
|
||||
let (id, _) = result?;
|
||||
updates.push(format!("update {} processed", id.get()));
|
||||
}
|
||||
for result in pending {
|
||||
let (id, _) = result?;
|
||||
updates.push(format!("update {} pending", id.get()));
|
||||
}
|
||||
Ok(updates)
|
||||
}).unwrap();
|
||||
Ok(warp::reply::json(&updates))
|
||||
});
|
||||
|
||||
let routes = dash_html_route
|
||||
.or(dash_bulma_route)
|
||||
.or(dash_bulma_dark_route)
|
||||
|
@ -222,10 +324,14 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
|
|||
.or(dash_script_route)
|
||||
.or(dash_logo_white_route)
|
||||
.or(dash_logo_black_route)
|
||||
.or(query_route);
|
||||
.or(query_route)
|
||||
.or(indexing_route)
|
||||
.or(update_ws_route)
|
||||
.or(list_updates_route);
|
||||
|
||||
let addr = SocketAddr::from_str(&opt.http_listen_addr)?;
|
||||
tokio::runtime::Builder::new_multi_thread()
|
||||
tokio::runtime::Builder::new()
|
||||
.threaded_scheduler()
|
||||
.enable_all()
|
||||
.build()?
|
||||
.block_on(async {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue