diff --git a/src/subcommand/serve.rs b/src/subcommand/serve.rs index 6da45f8b2..9b2a17ed0 100644 --- a/src/subcommand/serve.rs +++ b/src/subcommand/serve.rs @@ -23,7 +23,7 @@ use warp::filters::ws::Message; use warp::{Filter, http::Response}; use crate::tokenizer::{simple_tokenizer, TokenType}; -use crate::update::{UpdateBuilder, IndexDocumentsMethod}; +use crate::update::{UpdateBuilder, IndexDocumentsMethod, UpdateFormat}; use crate::{Index, UpdateStore, SearchResult}; #[derive(Debug, StructOpt)] @@ -157,7 +157,7 @@ enum UpdateStatus { #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] enum UpdateMeta { - DocumentsAddition, + DocumentsAddition { method: String, format: String }, ClearDocuments, } @@ -218,17 +218,22 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { // we extract the update type and execute the update itself. let result: anyhow::Result<()> = match meta { - UpdateMeta::DocumentsAddition => { + UpdateMeta::DocumentsAddition { method, format } => { // We must use the write transaction of the update here. let mut wtxn = index_cloned.write_txn()?; let mut builder = update_builder.index_documents(&mut wtxn, &index_cloned); - let replace_documents = true; - if replace_documents { - builder.index_documents_method(IndexDocumentsMethod::ReplaceDocuments); - } else { - builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments); - } + match format.as_str() { + "json" => builder.update_format(UpdateFormat::Json), + "csv" => builder.update_format(UpdateFormat::Csv), + otherwise => panic!("invalid update format {:?}", otherwise), + }; + + match method.as_str() { + "replace" => builder.index_documents_method(IndexDocumentsMethod::ReplaceDocuments), + "update" => builder.index_documents_method(IndexDocumentsMethod::UpdateDocuments), + otherwise => panic!("invalid indexing method {:?}", otherwise), + }; let gzipped = false; let reader = if gzipped { @@ -461,6 +466,8 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { async fn buf_stream( update_store: Arc>, update_status_sender: broadcast::Sender>, + update_method: Option, + update_format: UpdateFormat, mut stream: impl futures::Stream> + Unpin, ) -> Result { @@ -475,7 +482,18 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { let file = file.into_std().await; let mmap = unsafe { memmap::Mmap::map(&file).unwrap() }; - let meta = UpdateMeta::DocumentsAddition; + let method = match update_method.as_deref() { + Some("replace") => String::from("replace"), + Some("update") => String::from("update"), + _ => String::from("replace"), + }; + + let format = match update_format { + UpdateFormat::Csv => String::from("csv"), + UpdateFormat::Json => String::from("json"), + }; + + let meta = UpdateMeta::DocumentsAddition { method, format }; let update_id = update_store.register_update(&meta, &mmap[..]).unwrap(); let _ = update_status_sender.send(UpdateStatus::Pending { update_id, meta }); eprintln!("update {} registered", update_id); @@ -483,14 +501,43 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { Ok(warp::reply()) } + #[derive(Deserialize)] + struct QueryUpdate { + method: Option, + } + let update_store_cloned = update_store.clone(); let update_status_sender_cloned = update_status_sender.clone(); let indexing_route_csv = warp::filters::method::post() .and(warp::path!("documents")) .and(warp::header::exact_ignore_case("content-type", "text/csv")) + .and(warp::filters::query::query()) .and(warp::body::stream()) - .and_then(move |stream| { - buf_stream(update_store_cloned.clone(), update_status_sender_cloned.clone(), stream) + .and_then(move |params: QueryUpdate, stream| { + buf_stream( + update_store_cloned.clone(), + update_status_sender_cloned.clone(), + params.method, + UpdateFormat::Csv, + stream, + ) + }); + + let update_store_cloned = update_store.clone(); + let update_status_sender_cloned = update_status_sender.clone(); + let indexing_route_json = warp::filters::method::post() + .and(warp::path!("documents")) + .and(warp::header::exact_ignore_case("content-type", "application/json")) + .and(warp::filters::query::query()) + .and(warp::body::stream()) + .and_then(move |params: QueryUpdate, stream| { + buf_stream( + update_store_cloned.clone(), + update_status_sender_cloned.clone(), + params.method, + UpdateFormat::Json, + stream, + ) }); let update_status_sender_cloned = update_status_sender.clone(); @@ -547,6 +594,7 @@ pub fn run(opt: Opt) -> anyhow::Result<()> { .or(dash_logo_black_route) .or(query_route) .or(indexing_route_csv) + .or(indexing_route_json) .or(clearing_route) .or(update_ws_route); diff --git a/src/update/mod.rs b/src/update/mod.rs index fcaffe9c0..949724bcd 100644 --- a/src/update/mod.rs +++ b/src/update/mod.rs @@ -8,6 +8,6 @@ mod update_store; pub use self::available_documents_ids::AvailableDocumentsIds; pub use self::clear_documents::ClearDocuments; pub use self::delete_documents::DeleteDocuments; -pub use self::index_documents::{IndexDocuments, IndexDocumentsMethod}; +pub use self::index_documents::{IndexDocuments, IndexDocumentsMethod, UpdateFormat}; pub use self::update_builder::UpdateBuilder; pub use self::update_store::UpdateStore;