From 02ef1d41d74680178c2ba6dc5b9f619fc53032b3 Mon Sep 17 00:00:00 2001 From: mpostma Date: Wed, 23 Dec 2020 16:12:37 +0100 Subject: [PATCH] route document add json --- Cargo.toml | 4 +-- src/data.rs | 15 +++++++---- src/routes/document.rs | 61 ++++++++++++++++++++++++++++++++++++++---- src/routes/mod.rs | 2 +- src/updates/mod.rs | 2 ++ 5 files changed, 71 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c6dd1db76..4395b2694 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -50,7 +50,7 @@ slice-group-by = "0.2.6" structopt = "0.3.20" tar = "0.4.29" tempfile = "3.1.0" -tokio = { version = "*", features = ["full"] } +tokio = { version = "0.2", features = ["full"] } ureq = { version = "1.5.1", default-features = false, features = ["tls"] } walkdir = "2.3.1" whoami = "1.0.0" @@ -68,7 +68,7 @@ version = "0.18.1" serde_url_params = "0.2.0" tempdir = "0.3.7" assert-json-diff = { branch = "master", git = "https://github.com/qdequele/assert-json-diff" } -tokio = { version = "0.2.18", features = ["macros", "time"] } +tokio = { version = "0.2", features = ["macros", "time"] } [features] default = ["sentry"] diff --git a/src/data.rs b/src/data.rs index c53042b8e..0702d7364 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,5 +1,6 @@ use std::ops::Deref; use std::sync::Arc; +use std::fs::create_dir_all; use async_compression::tokio_02::write::GzipEncoder; use futures_util::stream::StreamExt; @@ -27,7 +28,7 @@ impl Deref for Data { #[derive(Clone)] pub struct DataInner { pub indexes: Arc, - pub update_queue: UpdateQueue, + pub update_queue: Arc, api_keys: ApiKeys, options: Opt, } @@ -60,10 +61,11 @@ impl Data { pub fn new(options: Opt) -> anyhow::Result { let db_size = options.max_mdb_size.get_bytes() as usize; let path = options.db_path.join("main"); + create_dir_all(&path)?; let indexes = Index::new(&path, Some(db_size))?; let indexes = Arc::new(indexes); - let update_queue = UpdateQueue::new(&options, indexes.clone())?; + let update_queue = Arc::new(UpdateQueue::new(&options, indexes.clone())?); let mut api_keys = ApiKeys { master: options.clone().master_key, @@ -89,8 +91,8 @@ impl Data { B: Deref, E: std::error::Error + Send + Sync + 'static, { - let file = tokio::task::block_in_place(tempfile::tempfile)?; - let file = tokio::fs::File::from_std(file); + let file = tokio::task::spawn_blocking(tempfile::tempfile).await?; + let file = tokio::fs::File::from_std(file?); let mut encoder = GzipEncoder::new(file); while let Some(result) = stream.next().await { @@ -105,7 +107,10 @@ impl Data { let mmap = unsafe { memmap::Mmap::map(&file)? }; let meta = UpdateMeta::DocumentsAddition { method, format }; - let update_id = tokio::task::block_in_place(|| self.update_queue.register_update(&meta, &mmap[..]))?; + + let queue = self.update_queue.clone(); + let meta_cloned = meta.clone(); + let update_id = tokio::task::spawn_blocking(move || queue.register_update(&meta_cloned, &mmap[..])).await??; Ok(UpdateStatus::Pending { update_id, meta }) } diff --git a/src/routes/document.rs b/src/routes/document.rs index bdd9a8336..c24c01af8 100644 --- a/src/routes/document.rs +++ b/src/routes/document.rs @@ -1,14 +1,31 @@ +use actix_web::web::Payload; use actix_web::{delete, get, post, put}; use actix_web::{web, HttpResponse}; use indexmap::IndexMap; -use serde_json::Value; +use log::error; +use milli::update::{IndexDocumentsMethod, UpdateFormat}; use serde::Deserialize; +use serde_json::Value; use crate::Data; use crate::error::ResponseError; use crate::helpers::Authentication; use crate::routes::IndexParam; +macro_rules! guard_content_type { + ($fn_name:ident, $guard_value:literal) => { + fn $fn_name(head: &actix_web::dev::RequestHead) -> bool { + if let Some(content_type) = head.headers.get("Content-Type") { + content_type.to_str().map(|v| v.contains($guard_value)).unwrap_or(false) + } else { + false + } + } + }; +} + +guard_content_type!(guard_json, "application/json"); + type Document = IndexMap; #[derive(Deserialize)] @@ -21,7 +38,7 @@ pub fn services(cfg: &mut web::ServiceConfig) { cfg.service(get_document) .service(delete_document) .service(get_all_documents) - .service(add_documents) + .service(add_documents_json) .service(update_documents) .service(delete_documents) .service(clear_all_documents); @@ -91,12 +108,46 @@ async fn update_multiple_documents( todo!() } -#[post("/indexes/{index_uid}/documents", wrap = "Authentication::Private")] -async fn add_documents( +/// Route used when the payload type is "application/json" +#[post( + "/indexes/{index_uid}/documents", + wrap = "Authentication::Private", + guard = "guard_json" +)] +async fn add_documents_json( data: web::Data, + path: web::Path, + params: web::Query, + body: Payload, +) -> Result { + let addition_result = data + .add_documents( + IndexDocumentsMethod::UpdateDocuments, + UpdateFormat::Json, + body + ).await; + + match addition_result { + Ok(update) => { + let value = serde_json::to_string(&update).unwrap(); + let response = HttpResponse::Ok().body(value); + Ok(response) + } + Err(e) => { + error!("{}", e); + todo!() + } + } +} + + +/// Default route for addign documents, this should return an error en redirect to the docuentation +#[post("/indexes/{index_uid}/documents", wrap = "Authentication::Private")] +async fn add_documents_default( + _data: web::Data, _path: web::Path, _params: web::Query, - body: web::Json>, + _body: web::Json>, ) -> Result { todo!() } diff --git a/src/routes/mod.rs b/src/routes/mod.rs index 71cb2a8d8..4b0813067 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -14,7 +14,7 @@ pub mod synonym; #[derive(Deserialize)] pub struct IndexParam { - _index_uid: String, + index_uid: String, } #[derive(Serialize)] diff --git a/src/updates/mod.rs b/src/updates/mod.rs index 00faa4d85..f08439c74 100644 --- a/src/updates/mod.rs +++ b/src/updates/mod.rs @@ -5,6 +5,7 @@ pub use settings::{Settings, Facets}; use std::io; use std::sync::Arc; use std::ops::Deref; +use std::fs::create_dir_all; use anyhow::Result; use flate2::read::GzDecoder; @@ -336,6 +337,7 @@ impl UpdateQueue { let handler = UpdateHandler::new(&opt.indexer_options, indexes, sender)?; let size = opt.max_udb_size.get_bytes() as usize; let path = opt.db_path.join("updates.mdb"); + create_dir_all(&path)?; let inner = UpdateStore::open( Some(size), path,