Add replace/update csv/json from the HTTP server

This commit is contained in:
Clément Renault 2020-10-31 17:48:24 +01:00
parent a4f8be7811
commit 21b4d60101
No known key found for this signature in database
GPG Key ID: 92ADA4E935E71FA4
2 changed files with 61 additions and 13 deletions

View File

@ -23,7 +23,7 @@ use warp::filters::ws::Message;
use warp::{Filter, http::Response}; use warp::{Filter, http::Response};
use crate::tokenizer::{simple_tokenizer, TokenType}; use crate::tokenizer::{simple_tokenizer, TokenType};
use crate::update::{UpdateBuilder, IndexDocumentsMethod}; use crate::update::{UpdateBuilder, IndexDocumentsMethod, UpdateFormat};
use crate::{Index, UpdateStore, SearchResult}; use crate::{Index, UpdateStore, SearchResult};
#[derive(Debug, StructOpt)] #[derive(Debug, StructOpt)]
@ -157,7 +157,7 @@ enum UpdateStatus<M, P, N> {
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")] #[serde(tag = "type")]
enum UpdateMeta { enum UpdateMeta {
DocumentsAddition, DocumentsAddition { method: String, format: String },
ClearDocuments, ClearDocuments,
} }
@ -218,17 +218,22 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
// we extract the update type and execute the update itself. // we extract the update type and execute the update itself.
let result: anyhow::Result<()> = match meta { let result: anyhow::Result<()> = match meta {
UpdateMeta::DocumentsAddition => { UpdateMeta::DocumentsAddition { method, format } => {
// We must use the write transaction of the update here. // We must use the write transaction of the update here.
let mut wtxn = index_cloned.write_txn()?; let mut wtxn = index_cloned.write_txn()?;
let mut builder = update_builder.index_documents(&mut wtxn, &index_cloned); let mut builder = update_builder.index_documents(&mut wtxn, &index_cloned);
let replace_documents = true; match format.as_str() {
if replace_documents { "json" => builder.update_format(UpdateFormat::Json),
builder.index_documents_method(IndexDocumentsMethod::ReplaceDocuments); "csv" => builder.update_format(UpdateFormat::Csv),
} else { otherwise => panic!("invalid update format {:?}", otherwise),
builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments); };
}
match method.as_str() {
"replace" => builder.index_documents_method(IndexDocumentsMethod::ReplaceDocuments),
"update" => builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments),
otherwise => panic!("invalid indexing method {:?}", otherwise),
};
let gzipped = false; let gzipped = false;
let reader = if gzipped { let reader = if gzipped {
@ -461,6 +466,8 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
async fn buf_stream( async fn buf_stream(
update_store: Arc<UpdateStore<UpdateMeta, String>>, update_store: Arc<UpdateStore<UpdateMeta, String>>,
update_status_sender: broadcast::Sender<UpdateStatus<UpdateMeta, UpdateMetaProgress, String>>, update_status_sender: broadcast::Sender<UpdateStatus<UpdateMeta, UpdateMetaProgress, String>>,
update_method: Option<String>,
update_format: UpdateFormat,
mut stream: impl futures::Stream<Item=Result<impl bytes::Buf, warp::Error>> + Unpin, mut stream: impl futures::Stream<Item=Result<impl bytes::Buf, warp::Error>> + Unpin,
) -> Result<impl warp::Reply, warp::Rejection> ) -> Result<impl warp::Reply, warp::Rejection>
{ {
@ -475,7 +482,18 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
let file = file.into_std().await; let file = file.into_std().await;
let mmap = unsafe { memmap::Mmap::map(&file).unwrap() }; let mmap = unsafe { memmap::Mmap::map(&file).unwrap() };
let meta = UpdateMeta::DocumentsAddition; let method = match update_method.as_deref() {
Some("replace") => String::from("replace"),
Some("update") => String::from("update"),
_ => String::from("replace"),
};
let format = match update_format {
UpdateFormat::Csv => String::from("csv"),
UpdateFormat::Json => String::from("json"),
};
let meta = UpdateMeta::DocumentsAddition { method, format };
let update_id = update_store.register_update(&meta, &mmap[..]).unwrap(); let update_id = update_store.register_update(&meta, &mmap[..]).unwrap();
let _ = update_status_sender.send(UpdateStatus::Pending { update_id, meta }); let _ = update_status_sender.send(UpdateStatus::Pending { update_id, meta });
eprintln!("update {} registered", update_id); eprintln!("update {} registered", update_id);
@ -483,14 +501,43 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
Ok(warp::reply()) Ok(warp::reply())
} }
#[derive(Deserialize)]
struct QueryUpdate {
method: Option<String>,
}
let update_store_cloned = update_store.clone(); let update_store_cloned = update_store.clone();
let update_status_sender_cloned = update_status_sender.clone(); let update_status_sender_cloned = update_status_sender.clone();
let indexing_route_csv = warp::filters::method::post() let indexing_route_csv = warp::filters::method::post()
.and(warp::path!("documents")) .and(warp::path!("documents"))
.and(warp::header::exact_ignore_case("content-type", "text/csv")) .and(warp::header::exact_ignore_case("content-type", "text/csv"))
.and(warp::filters::query::query())
.and(warp::body::stream()) .and(warp::body::stream())
.and_then(move |stream| { .and_then(move |params: QueryUpdate, stream| {
buf_stream(update_store_cloned.clone(), update_status_sender_cloned.clone(), stream) buf_stream(
update_store_cloned.clone(),
update_status_sender_cloned.clone(),
params.method,
UpdateFormat::Csv,
stream,
)
});
let update_store_cloned = update_store.clone();
let update_status_sender_cloned = update_status_sender.clone();
let indexing_route_json = warp::filters::method::post()
.and(warp::path!("documents"))
.and(warp::header::exact_ignore_case("content-type", "application/json"))
.and(warp::filters::query::query())
.and(warp::body::stream())
.and_then(move |params: QueryUpdate, stream| {
buf_stream(
update_store_cloned.clone(),
update_status_sender_cloned.clone(),
params.method,
UpdateFormat::Json,
stream,
)
}); });
let update_status_sender_cloned = update_status_sender.clone(); let update_status_sender_cloned = update_status_sender.clone();
@ -547,6 +594,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> {
.or(dash_logo_black_route) .or(dash_logo_black_route)
.or(query_route) .or(query_route)
.or(indexing_route_csv) .or(indexing_route_csv)
.or(indexing_route_json)
.or(clearing_route) .or(clearing_route)
.or(update_ws_route); .or(update_ws_route);

View File

@ -8,6 +8,6 @@ mod update_store;
pub use self::available_documents_ids::AvailableDocumentsIds; pub use self::available_documents_ids::AvailableDocumentsIds;
pub use self::clear_documents::ClearDocuments; pub use self::clear_documents::ClearDocuments;
pub use self::delete_documents::DeleteDocuments; pub use self::delete_documents::DeleteDocuments;
pub use self::index_documents::{IndexDocuments, IndexDocumentsMethod}; pub use self::index_documents::{IndexDocuments, IndexDocumentsMethod, UpdateFormat};
pub use self::update_builder::UpdateBuilder; pub use self::update_builder::UpdateBuilder;
pub use self::update_store::UpdateStore; pub use self::update_store::UpdateStore;