232: Fix payload size limit r=MarinPostma a=MarinPostma

Fix #223

This was due to the fact that Payload ignores the limit payload size limit. I fixed it by implementing my own `Payload` extractor that checks that the size of the payload is not too large.

I also refactored the `create_app` a bit.

Co-authored-by: marin postma <postma.marin@protonmail.com>
This commit is contained in:
bors[bot] 2021-06-23 16:06:08 +00:00 committed by GitHub
commit 8638c9ab77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 231 additions and 110 deletions

View File

@ -1,9 +1,9 @@
use actix_web::web::Payload;
use milli::update::{IndexDocumentsMethod, UpdateFormat};
use super::Data;
use crate::extractors::payload::Payload;
use crate::index::{Checked, Settings};
use crate::index_controller::{error::Result, IndexMetadata, IndexSettings, UpdateStatus};
use crate::Data;
impl Data {
pub async fn add_documents(

View File

@ -103,7 +103,7 @@ impl ErrorCode for MilliError<'_> {
milli::Error::UserError(ref error) => {
match error {
// TODO: wait for spec for new error codes.
| UserError::Csv(_)
UserError::Csv(_)
| UserError::SerdeJson(_)
| UserError::MaxDatabaseSizeReached
| UserError::InvalidCriterionName { .. }
@ -148,9 +148,10 @@ impl ErrorCode for PayloadError {
PayloadError::Json(err) => match err {
JsonPayloadError::Overflow => Code::PayloadTooLarge,
JsonPayloadError::ContentType => Code::UnsupportedMediaType,
JsonPayloadError::Payload(aweb::error::PayloadError::Overflow) => Code::PayloadTooLarge,
JsonPayloadError::Deserialize(_)
| JsonPayloadError::Payload(_) => Code::BadRequest,
JsonPayloadError::Payload(aweb::error::PayloadError::Overflow) => {
Code::PayloadTooLarge
}
JsonPayloadError::Deserialize(_) | JsonPayloadError::Payload(_) => Code::BadRequest,
JsonPayloadError::Serialize(_) => Code::Internal,
_ => Code::Internal,
},

View File

@ -0,0 +1 @@
pub mod payload;

View File

@ -0,0 +1,69 @@
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_http::error::PayloadError;
use actix_web::{dev, web, FromRequest, HttpRequest};
use futures::future::{ready, Ready};
use futures::Stream;
pub struct Payload {
payload: dev::Payload,
limit: usize,
}
pub struct PayloadConfig {
limit: usize,
}
impl PayloadConfig {
pub fn new(limit: usize) -> Self {
Self { limit }
}
}
impl Default for PayloadConfig {
fn default() -> Self {
Self { limit: 256 * 1024 }
}
}
impl FromRequest for Payload {
type Config = PayloadConfig;
type Error = PayloadError;
type Future = Ready<Result<Payload, Self::Error>>;
#[inline]
fn from_request(req: &HttpRequest, payload: &mut dev::Payload) -> Self::Future {
let limit = req
.app_data::<PayloadConfig>()
.map(|c| c.limit)
.unwrap_or(Self::Config::default().limit);
ready(Ok(Payload {
payload: payload.take(),
limit,
}))
}
}
impl Stream for Payload {
type Item = Result<web::Bytes, PayloadError>;
#[inline]
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.payload).poll_next(cx) {
Poll::Ready(Some(result)) => match result {
Ok(bytes) => match self.limit.checked_sub(bytes.len()) {
Some(new_limit) => {
self.limit = new_limit;
Poll::Ready(Some(Ok(bytes)))
}
None => Poll::Ready(Some(Err(PayloadError::Overflow))),
},
x => Poll::Ready(Some(x)),
},
otherwise => otherwise,
}
}
}

View File

@ -3,7 +3,7 @@ use std::io::{BufRead, BufReader, Write};
use std::path::Path;
use std::sync::Arc;
use anyhow::{Context, bail};
use anyhow::{bail, Context};
use heed::RoTxn;
use indexmap::IndexMap;
use milli::update::{IndexDocumentsMethod, UpdateFormat::JsonStream};

View File

@ -13,7 +13,7 @@ use serde_json::{Map, Value};
use crate::helpers::EnvSizer;
use error::Result;
pub use search::{SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT, default_crop_length};
pub use search::{default_crop_length, SearchQuery, SearchResult, DEFAULT_SEARCH_LIMIT};
pub use updates::{Checked, Facets, Settings, Unchecked};
use self::error::IndexError;

View File

@ -233,8 +233,8 @@ impl Index {
fn compute_matches<A: AsRef<[u8]>>(
matcher: &impl Matcher,
document: &Document,
analyzer: &Analyzer<A>
) -> MatchesInfo {
analyzer: &Analyzer<A>,
) -> MatchesInfo {
let mut matches = BTreeMap::new();
for (key, value) in document {
@ -1174,6 +1174,9 @@ mod test {
let analyzer = Analyzer::new(config);
let matches = compute_matches(&matcher, &value, &analyzer);
assert_eq!(format!("{:?}", matches), r##"{"about": [MatchInfo { start: 0, length: 6 }, MatchInfo { start: 31, length: 7 }, MatchInfo { start: 191, length: 7 }, MatchInfo { start: 225, length: 7 }, MatchInfo { start: 233, length: 6 }], "color": [MatchInfo { start: 0, length: 3 }]}"##);
assert_eq!(
format!("{:?}", matches),
r##"{"about": [MatchInfo { start: 0, length: 6 }, MatchInfo { start: 31, length: 7 }, MatchInfo { start: 191, length: 7 }, MatchInfo { start: 225, length: 7 }, MatchInfo { start: 233, length: 6 }], "color": [MatchInfo { start: 0, length: 3 }]}"##
);
}
}

View File

@ -3,7 +3,7 @@ use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use actix_web::web::{Bytes, Payload};
use actix_web::web::Bytes;
use chrono::{DateTime, Utc};
use futures::stream::StreamExt;
use log::error;
@ -22,6 +22,7 @@ use update_actor::UpdateActorHandle;
pub use updates::*;
use uuid_resolver::{error::UuidResolverError, UuidResolverHandle};
use crate::extractors::payload::Payload;
use crate::index::{Checked, Document, SearchQuery, SearchResult, Settings};
use crate::option::Opt;
use error::Result;

View File

@ -122,7 +122,7 @@ where
&self,
uuid: Uuid,
meta: UpdateMeta,
mut payload: mpsc::Receiver<PayloadData<D>>,
payload: mpsc::Receiver<PayloadData<D>>,
) -> Result<UpdateStatus> {
let file_path = match meta {
UpdateMeta::DocumentsAddition { .. } => {
@ -137,21 +137,41 @@ where
.open(&path)
.await?;
let mut file_len = 0;
while let Some(bytes) = payload.recv().await {
let bytes = bytes?;
file_len += bytes.as_ref().len();
file.write_all(bytes.as_ref()).await?;
async fn write_to_file<D>(
file: &mut fs::File,
mut payload: mpsc::Receiver<PayloadData<D>>,
) -> Result<usize>
where
D: AsRef<[u8]> + Sized + 'static,
{
let mut file_len = 0;
while let Some(bytes) = payload.recv().await {
let bytes = bytes?;
file_len += bytes.as_ref().len();
file.write_all(bytes.as_ref()).await?;
}
file.flush().await?;
Ok(file_len)
}
if file_len != 0 {
file.flush().await?;
let file = file.into_std().await;
Some((file, update_file_id))
} else {
// empty update, delete the empty file.
fs::remove_file(&path).await?;
None
let file_len = write_to_file(&mut file, payload).await;
match file_len {
Ok(len) if len > 0 => {
let file = file.into_std().await;
Some((file, update_file_id))
}
Err(e) => {
fs::remove_file(&path).await?;
return Err(e);
}
_ => {
fs::remove_file(&path).await?;
None
}
}
}
_ => None,

View File

@ -21,6 +21,8 @@ pub enum UpdateActorError {
FatalUpdateStoreError,
#[error("invalid payload: {0}")]
InvalidPayload(Box<dyn Error + Send + Sync + 'static>),
#[error("payload error: {0}")]
PayloadError(#[from] actix_web::error::PayloadError),
}
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for UpdateActorError {
@ -39,7 +41,6 @@ internal_error!(
UpdateActorError: heed::Error,
std::io::Error,
serde_json::Error,
actix_http::error::PayloadError,
tokio::task::JoinError
);
@ -51,6 +52,10 @@ impl ErrorCode for UpdateActorError {
UpdateActorError::IndexActor(e) => e.error_code(),
UpdateActorError::FatalUpdateStoreError => Code::Internal,
UpdateActorError::InvalidPayload(_) => Code::BadRequest,
UpdateActorError::PayloadError(error) => match error {
actix_http::error::PayloadError::Overflow => Code::PayloadTooLarge,
_ => Code::Internal,
},
}
}
}

View File

@ -572,7 +572,10 @@ fn update_uuid_to_file_path(root: impl AsRef<Path>, uuid: Uuid) -> PathBuf {
#[cfg(test)]
mod test {
use super::*;
use crate::index_controller::{UpdateResult, index_actor::{MockIndexActorHandle, error::IndexActorError}};
use crate::index_controller::{
index_actor::{error::IndexActorError, MockIndexActorHandle},
UpdateResult,
};
use futures::future::ok;
@ -651,7 +654,9 @@ mod test {
if processing.id() == 0 {
Box::pin(ok(Ok(processing.process(UpdateResult::Other))))
} else {
Box::pin(ok(Err(processing.fail(IndexActorError::ExistingPrimaryKey.into()))))
Box::pin(ok(Err(
processing.fail(IndexActorError::ExistingPrimaryKey.into())
)))
}
});

View File

@ -3,7 +3,10 @@ use milli::update::{DocumentAdditionResult, IndexDocumentsMethod, UpdateFormat};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::{error::ResponseError, index::{Settings, Unchecked}};
use crate::{
error::ResponseError,
index::{Settings, Unchecked},
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateResult {

View File

@ -1,6 +1,7 @@
pub mod data;
#[macro_use]
pub mod error;
pub mod extractors;
pub mod helpers;
mod index;
mod index_controller;
@ -13,85 +14,97 @@ pub mod analytics;
pub use self::data::Data;
pub use option::Opt;
use actix_web::web;
use extractors::payload::PayloadConfig;
pub fn configure_data(config: &mut web::ServiceConfig, data: Data) {
let http_payload_size_limit = data.http_payload_size_limit();
config
.data(data)
.app_data(
web::JsonConfig::default()
.limit(http_payload_size_limit)
.content_type(|_mime| true) // Accept all mime types
.error_handler(|err, _req| error::payload_error_handler(err).into()),
)
.app_data(PayloadConfig::new(http_payload_size_limit))
.app_data(
web::QueryConfig::default()
.error_handler(|err, _req| error::payload_error_handler(err).into()),
);
}
#[cfg(feature = "mini-dashboard")]
pub fn dashboard(config: &mut web::ServiceConfig, enable_frontend: bool) {
use actix_web_static_files::Resource;
use actix_web::HttpResponse;
mod generated {
include!(concat!(env!("OUT_DIR"), "/generated.rs"));
}
if enable_frontend {
let generated = generated::generate();
let mut scope = web::scope("/");
// Generate routes for mini-dashboard assets
for (path, resource) in generated.into_iter() {
let Resource {mime_type, data, ..} = resource;
// Redirect index.html to /
if path == "index.html" {
config.service(web::resource("/").route(web::get().to(move || {
HttpResponse::Ok().content_type(mime_type).body(data)
})));
} else {
scope = scope.service(web::resource(path).route(web::get().to(move || {
HttpResponse::Ok().content_type(mime_type).body(data)
})));
}
}
config.service(scope);
} else {
config.service(routes::running);
}
}
#[cfg(not(feature = "mini-dashboard"))]
pub fn dashboard(config: &mut web::ServiceConfig, _enable_frontend: bool) {
config.service(routes::running);
}
#[macro_export]
macro_rules! create_app {
($data:expr, $enable_frontend:expr) => {
{
use actix_cors::Cors;
use actix_web::middleware::TrailingSlash;
use actix_web::{App, HttpResponse};
use actix_web::{middleware, web};
use meilisearch_http::error::payload_error_handler;
use meilisearch_http::routes::*;
($data:expr, $enable_frontend:expr) => {{
use actix_cors::Cors;
use actix_web::middleware::TrailingSlash;
use actix_web::App;
use actix_web::{middleware, web};
use meilisearch_http::routes::*;
use meilisearch_http::{configure_data, dashboard};
#[cfg(feature = "mini-dashboard")]
use actix_web_static_files::Resource;
#[cfg(feature = "mini-dashboard")]
mod dashboard {
include!(concat!(env!("OUT_DIR"), "/generated.rs"));
}
let app = App::new()
.data($data.clone())
.app_data(
web::JsonConfig::default()
.limit($data.http_payload_size_limit())
.content_type(|_mime| true) // Accept all mime types
.error_handler(|err, _req| payload_error_handler(err).into()),
)
.app_data(
web::QueryConfig::default()
.error_handler(|err, _req| payload_error_handler(err).into()),
)
.configure(document::services)
.configure(index::services)
.configure(search::services)
.configure(settings::services)
.configure(health::services)
.configure(stats::services)
.configure(key::services)
.configure(dump::services);
#[cfg(feature = "mini-dashboard")]
let app = if $enable_frontend {
let mut app = app;
let generated = dashboard::generate();
let mut scope = web::scope("/");
// Generate routes for mini-dashboard assets
for (path, resource) in generated.into_iter() {
let Resource {mime_type, data, ..} = resource;
// Redirect index.html to /
if path == "index.html" {
app = app.service(web::resource("/").route(web::get().to(move || {
HttpResponse::Ok().content_type(mime_type).body(data)
})));
} else {
scope = scope.service(web::resource(path).route(web::get().to(move || {
HttpResponse::Ok().content_type(mime_type).body(data)
})));
}
}
app.service(scope)
} else {
app.service(running)
};
#[cfg(not(feature = "mini-dashboard"))]
let app = app.service(running);
app.wrap(
App::new()
.configure(|s| configure_data(s, $data.clone()))
.configure(document::services)
.configure(index::services)
.configure(search::services)
.configure(settings::services)
.configure(health::services)
.configure(stats::services)
.configure(key::services)
.configure(dump::services)
.configure(|s| dashboard(s, $enable_frontend))
.wrap(
Cors::default()
.send_wildcard()
.allowed_headers(vec!["content-type", "x-meili-api-key"])
.allow_any_origin()
.allow_any_method()
.max_age(86_400), // 24h
.send_wildcard()
.allowed_headers(vec!["content-type", "x-meili-api-key"])
.allow_any_origin()
.allow_any_method()
.max_age(86_400), // 24h
)
.wrap(middleware::Logger::default())
.wrap(middleware::Compress::default())
.wrap(middleware::NormalizePath::new(TrailingSlash::Trim))
.default_service(
web::route().to(|| HttpResponse::NotFound()))
}
};
.wrap(middleware::Logger::default())
.wrap(middleware::Compress::default())
.wrap(middleware::NormalizePath::new(
middleware::TrailingSlash::Trim,
))
}};
}

View File

@ -74,7 +74,7 @@ async fn main() -> Result<(), MainError> {
async fn run_http(data: Data, opt: Opt) -> Result<(), Box<dyn std::error::Error>> {
let _enable_dashboard = &opt.env == "development";
let http_server = HttpServer::new(move || create_app!(&data, _enable_dashboard))
let http_server = HttpServer::new(move || create_app!(data, _enable_dashboard))
// Disable signals allows the server to terminate immediately when a user enter CTRL-C
.disable_signals();

View File

@ -1,4 +1,3 @@
use actix_web::web::Payload;
use actix_web::{delete, get, post, put};
use actix_web::{web, HttpResponse};
use indexmap::IndexMap;
@ -8,6 +7,7 @@ use serde::Deserialize;
use serde_json::Value;
use crate::error::ResponseError;
use crate::extractors::payload::Payload;
use crate::helpers::Authentication;
use crate::routes::IndexParam;
use crate::Data;
@ -174,7 +174,7 @@ async fn update_documents(
data: web::Data<Data>,
path: web::Path<IndexParam>,
params: web::Query<UpdateDocumentsQuery>,
body: web::Payload,
body: Payload,
) -> Result<HttpResponse, ResponseError> {
let update = data
.add_documents(

View File

@ -6,7 +6,7 @@ use serde_json::Value;
use crate::error::ResponseError;
use crate::helpers::Authentication;
use crate::index::{SearchQuery, default_crop_length, DEFAULT_SEARCH_LIMIT};
use crate::index::{default_crop_length, SearchQuery, DEFAULT_SEARCH_LIMIT};
use crate::routes::IndexParam;
use crate::Data;