2204: Fix blocking auth r=Kerollmops a=MarinPostma

Fix auth blocking runtime

I have decided to remove async code from `meilisearch-auth` and let `meilisearch-http` handle that.

Because Actix polls the extractor futures concurrently, I have made a wrapper extractor that forces the errors from the futures to be returned sequentially (though is still polls them sequentially).
close #2201

Co-authored-by: ad hoc <postma.marin@protonmail.com>
This commit is contained in:
bors[bot] 2022-03-07 15:39:24 +00:00 committed by GitHub
commit 833f7fbdbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 316 additions and 95 deletions

2
Cargo.lock generated
View File

@ -1780,7 +1780,7 @@ dependencies = [
"once_cell",
"parking_lot 0.11.2",
"paste",
"pin-project",
"pin-project-lite",
"platform-dirs",
"rand",
"rayon",

View File

@ -40,18 +40,18 @@ impl AuthController {
})
}
pub async fn create_key(&self, value: Value) -> Result<Key> {
pub fn create_key(&self, value: Value) -> Result<Key> {
let key = Key::create_from_value(value)?;
self.store.put_api_key(key)
}
pub async fn update_key(&self, key: impl AsRef<str>, value: Value) -> Result<Key> {
let mut key = self.get_key(key).await?;
pub fn update_key(&self, key: impl AsRef<str>, value: Value) -> Result<Key> {
let mut key = self.get_key(key)?;
key.update_from_value(value)?;
self.store.put_api_key(key)
}
pub async fn get_key(&self, key: impl AsRef<str>) -> Result<Key> {
pub fn get_key(&self, key: impl AsRef<str>) -> Result<Key> {
self.store
.get_api_key(&key)?
.ok_or_else(|| AuthControllerError::ApiKeyNotFound(key.as_ref().to_string()))
@ -101,11 +101,11 @@ impl AuthController {
Ok(filters)
}
pub async fn list_keys(&self) -> Result<Vec<Key>> {
pub fn list_keys(&self) -> Result<Vec<Key>> {
self.store.list_api_keys()
}
pub async fn delete_key(&self, key: impl AsRef<str>) -> Result<()> {
pub fn delete_key(&self, key: impl AsRef<str>) -> Result<()> {
if self.store.delete_api_key(&key)? {
Ok(())
} else {

View File

@ -54,7 +54,6 @@ num_cpus = "1.13.0"
obkv = "0.2.0"
once_cell = "1.8.0"
parking_lot = "0.11.2"
pin-project = "1.0.8"
platform-dirs = "0.3.0"
rand = "0.8.4"
rayon = "1.5.1"
@ -78,6 +77,7 @@ tokio = { version = "1.11.0", features = ["full"] }
tokio-stream = "0.1.7"
uuid = { version = "0.8.2", features = ["serde"] }
walkdir = "2.3.2"
pin-project-lite = "0.2.8"
[dev-dependencies]
actix-rt = "2.2.0"

View File

@ -5,7 +5,7 @@ pub enum AuthenticationError {
#[error("The Authorization header is missing. It must use the bearer authorization method.")]
MissingAuthorizationHeader,
#[error("The provided API key is invalid.")]
InvalidToken(String),
InvalidToken,
// Triggered on configuration error.
#[error("An internal error has occurred. `Irretrievable state`.")]
IrretrievableState,
@ -15,7 +15,7 @@ impl ErrorCode for AuthenticationError {
fn error_code(&self) -> Code {
match self {
AuthenticationError::MissingAuthorizationHeader => Code::MissingAuthorizationHeader,
AuthenticationError::InvalidToken(_) => Code::InvalidToken,
AuthenticationError::InvalidToken => Code::InvalidToken,
AuthenticationError::IrretrievableState => Code::Internal,
}
}

View File

@ -2,28 +2,83 @@ mod error;
use std::marker::PhantomData;
use std::ops::Deref;
use std::pin::Pin;
use actix_web::FromRequest;
use futures::future::err;
use futures::future::{ok, Ready};
use meilisearch_error::ResponseError;
use futures::Future;
use meilisearch_error::{Code, ResponseError};
use error::AuthenticationError;
use meilisearch_auth::{AuthController, AuthFilter};
pub struct GuardedData<T, D> {
pub struct GuardedData<P, D> {
data: D,
filters: AuthFilter,
_marker: PhantomData<T>,
_marker: PhantomData<P>,
}
impl<T, D> GuardedData<T, D> {
impl<P, D> GuardedData<P, D> {
pub fn filters(&self) -> &AuthFilter {
&self.filters
}
async fn auth_bearer(
auth: AuthController,
token: String,
index: Option<String>,
data: Option<D>,
) -> Result<Self, ResponseError>
where
P: Policy + 'static,
{
match Self::authenticate(auth, token, index).await? {
Some(filters) => match data {
Some(data) => Ok(Self {
data,
filters,
_marker: PhantomData,
}),
None => Err(AuthenticationError::IrretrievableState.into()),
},
None => Err(AuthenticationError::InvalidToken.into()),
}
}
async fn auth_token(auth: AuthController, data: Option<D>) -> Result<Self, ResponseError>
where
P: Policy + 'static,
{
match Self::authenticate(auth, String::new(), None).await? {
Some(filters) => match data {
Some(data) => Ok(Self {
data,
filters,
_marker: PhantomData,
}),
None => Err(AuthenticationError::IrretrievableState.into()),
},
None => Err(AuthenticationError::MissingAuthorizationHeader.into()),
}
}
async fn authenticate(
auth: AuthController,
token: String,
index: Option<String>,
) -> Result<Option<AuthFilter>, ResponseError>
where
P: Policy + 'static,
{
Ok(tokio::task::spawn_blocking(move || {
P::authenticate(auth, token.as_ref(), index.as_deref())
})
.await
.map_err(|e| ResponseError::from_msg(e.to_string(), Code::Internal))?)
}
}
impl<T, D> Deref for GuardedData<T, D> {
impl<P, D> Deref for GuardedData<P, D> {
type Target = D;
fn deref(&self) -> &Self::Target {
@ -34,7 +89,7 @@ impl<T, D> Deref for GuardedData<T, D> {
impl<P: Policy + 'static, D: 'static + Clone> FromRequest for GuardedData<P, D> {
type Error = ResponseError;
type Future = Ready<Result<Self, Self::Error>>;
type Future = Pin<Box<dyn Future<Output = Result<Self, Self::Error>>>>;
fn from_request(
req: &actix_web::HttpRequest,
@ -51,40 +106,22 @@ impl<P: Policy + 'static, D: 'static + Clone> FromRequest for GuardedData<P, D>
// TODO: find a less hardcoded way?
let index = req.match_info().get("index_uid");
match type_token.next() {
Some(token) => match P::authenticate(auth, token, index) {
Some(filters) => match req.app_data::<D>().cloned() {
Some(data) => ok(Self {
data,
filters,
_marker: PhantomData,
}),
None => err(AuthenticationError::IrretrievableState.into()),
},
None => {
let token = token.to_string();
err(AuthenticationError::InvalidToken(token).into())
}
},
None => {
err(AuthenticationError::InvalidToken("unknown".to_string()).into())
}
Some(token) => Box::pin(Self::auth_bearer(
auth,
token.to_string(),
index.map(String::from),
req.app_data::<D>().cloned(),
)),
None => Box::pin(err(AuthenticationError::InvalidToken.into())),
}
}
_otherwise => err(AuthenticationError::MissingAuthorizationHeader.into()),
},
None => match P::authenticate(auth, "", None) {
Some(filters) => match req.app_data::<D>().cloned() {
Some(data) => ok(Self {
data,
filters,
_marker: PhantomData,
}),
None => err(AuthenticationError::IrretrievableState.into()),
},
None => err(AuthenticationError::MissingAuthorizationHeader.into()),
_otherwise => {
Box::pin(err(AuthenticationError::MissingAuthorizationHeader.into()))
}
},
None => Box::pin(Self::auth_token(auth, req.app_data::<D>().cloned())),
},
None => err(AuthenticationError::IrretrievableState.into()),
None => Box::pin(err(AuthenticationError::IrretrievableState.into())),
}
}
}

View File

@ -1,3 +1,4 @@
pub mod payload;
#[macro_use]
pub mod authentication;
pub mod sequential_extractor;

View File

@ -0,0 +1,148 @@
#![allow(non_snake_case)]
use std::{future::Future, pin::Pin, task::Poll};
use actix_web::{dev::Payload, FromRequest, Handler, HttpRequest};
use pin_project_lite::pin_project;
/// `SeqHandler` is an actix `Handler` that enforces that extractors errors are returned in the
/// same order as they are defined in the wrapped handler. This is needed because, by default, actix
/// resolves the extractors concurrently, whereas we always need the authentication extractor to
/// throw first.
#[derive(Clone)]
pub struct SeqHandler<H>(pub H);
pub struct SeqFromRequest<T>(T);
/// This macro implements `FromRequest` for arbitrary arity handler, except for one, which is
/// useless anyway.
macro_rules! gen_seq {
($ty:ident; $($T:ident)+) => {
pin_project! {
pub struct $ty<$($T: FromRequest), +> {
$(
#[pin]
$T: ExtractFuture<$T::Future, $T, $T::Error>,
)+
}
}
impl<$($T: FromRequest), +> Future for $ty<$($T),+> {
type Output = Result<SeqFromRequest<($($T),+)>, actix_web::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
let mut count_fut = 0;
let mut count_finished = 0;
$(
count_fut += 1;
match this.$T.as_mut().project() {
ExtractProj::Future { fut } => match fut.poll(cx) {
Poll::Ready(Ok(output)) => {
count_finished += 1;
let _ = this
.$T
.as_mut()
.project_replace(ExtractFuture::Done { output });
}
Poll::Ready(Err(error)) => {
count_finished += 1;
let _ = this
.$T
.as_mut()
.project_replace(ExtractFuture::Error { error });
}
Poll::Pending => (),
},
ExtractProj::Done { .. } => count_finished += 1,
ExtractProj::Error { .. } => {
// short circuit if all previous are finished and we had an error.
if count_finished == count_fut {
match this.$T.project_replace(ExtractFuture::Empty) {
ExtractReplaceProj::Error { error } => {
return Poll::Ready(Err(error.into()))
}
_ => unreachable!("Invalid future state"),
}
} else {
count_finished += 1;
}
}
ExtractProj::Empty => unreachable!("From request polled after being finished. {}", stringify!($T)),
}
)+
if count_fut == count_finished {
let result = (
$(
match this.$T.project_replace(ExtractFuture::Empty) {
ExtractReplaceProj::Done { output } => output,
ExtractReplaceProj::Error { error } => return Poll::Ready(Err(error.into())),
_ => unreachable!("Invalid future state"),
},
)+
);
Poll::Ready(Ok(SeqFromRequest(result)))
} else {
Poll::Pending
}
}
}
impl<$($T: FromRequest,)+> FromRequest for SeqFromRequest<($($T,)+)> {
type Error = actix_web::Error;
type Future = $ty<$($T),+>;
fn from_request(req: &HttpRequest, payload: &mut Payload) -> Self::Future {
$ty {
$(
$T: ExtractFuture::Future {
fut: $T::from_request(req, payload),
},
)+
}
}
}
impl<Han, $($T: FromRequest),+> Handler<SeqFromRequest<($($T),+)>> for SeqHandler<Han>
where
Han: Handler<($($T),+)>,
{
type Output = Han::Output;
type Future = Han::Future;
fn call(&self, args: SeqFromRequest<($($T),+)>) -> Self::Future {
self.0.call(args.0)
}
}
};
}
// Not working for a single argument, but then, it is not really necessary.
// gen_seq! { SeqFromRequestFut1; A }
gen_seq! { SeqFromRequestFut2; A B }
gen_seq! { SeqFromRequestFut3; A B C }
gen_seq! { SeqFromRequestFut4; A B C D }
gen_seq! { SeqFromRequestFut5; A B C D E }
gen_seq! { SeqFromRequestFut6; A B C D E F }
pin_project! {
#[project = ExtractProj]
#[project_replace = ExtractReplaceProj]
enum ExtractFuture<Fut, Res, Err> {
Future {
#[pin]
fut: Fut,
},
Done {
output: Res,
},
Error {
error: Err,
},
Empty,
}
}

View File

@ -2,25 +2,28 @@ use std::str;
use actix_web::{web, HttpRequest, HttpResponse};
use meilisearch_auth::{Action, AuthController, Key};
use meilisearch_auth::{error::AuthControllerError, Action, AuthController, Key};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use time::OffsetDateTime;
use crate::extractors::authentication::{policies::*, GuardedData};
use meilisearch_error::ResponseError;
use crate::extractors::{
authentication::{policies::*, GuardedData},
sequential_extractor::SeqHandler,
};
use meilisearch_error::{Code, ResponseError};
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(
web::resource("")
.route(web::post().to(create_api_key))
.route(web::get().to(list_api_keys)),
.route(web::post().to(SeqHandler(create_api_key)))
.route(web::get().to(SeqHandler(list_api_keys))),
)
.service(
web::resource("/{api_key}")
.route(web::get().to(get_api_key))
.route(web::patch().to(patch_api_key))
.route(web::delete().to(delete_api_key)),
.route(web::get().to(SeqHandler(get_api_key)))
.route(web::patch().to(SeqHandler(patch_api_key)))
.route(web::delete().to(SeqHandler(delete_api_key))),
);
}
@ -29,8 +32,13 @@ pub async fn create_api_key(
body: web::Json<Value>,
_req: HttpRequest,
) -> Result<HttpResponse, ResponseError> {
let key = auth_controller.create_key(body.into_inner()).await?;
let res = KeyView::from_key(key, &auth_controller);
let v = body.into_inner();
let res = tokio::task::spawn_blocking(move || -> Result<_, AuthControllerError> {
let key = auth_controller.create_key(v)?;
Ok(KeyView::from_key(key, &auth_controller))
})
.await
.map_err(|e| ResponseError::from_msg(e.to_string(), Code::Internal))??;
Ok(HttpResponse::Created().json(res))
}
@ -39,11 +47,16 @@ pub async fn list_api_keys(
auth_controller: GuardedData<MasterPolicy, AuthController>,
_req: HttpRequest,
) -> Result<HttpResponse, ResponseError> {
let keys = auth_controller.list_keys().await?;
let res: Vec<_> = keys
.into_iter()
.map(|k| KeyView::from_key(k, &auth_controller))
.collect();
let res = tokio::task::spawn_blocking(move || -> Result<_, AuthControllerError> {
let keys = auth_controller.list_keys()?;
let res: Vec<_> = keys
.into_iter()
.map(|k| KeyView::from_key(k, &auth_controller))
.collect();
Ok(res)
})
.await
.map_err(|e| ResponseError::from_msg(e.to_string(), Code::Internal))??;
Ok(HttpResponse::Ok().json(KeyListView::from(res)))
}
@ -52,8 +65,13 @@ pub async fn get_api_key(
auth_controller: GuardedData<MasterPolicy, AuthController>,
path: web::Path<AuthParam>,
) -> Result<HttpResponse, ResponseError> {
let key = auth_controller.get_key(&path.api_key).await?;
let res = KeyView::from_key(key, &auth_controller);
let api_key = path.into_inner().api_key;
let res = tokio::task::spawn_blocking(move || -> Result<_, AuthControllerError> {
let key = auth_controller.get_key(&api_key)?;
Ok(KeyView::from_key(key, &auth_controller))
})
.await
.map_err(|e| ResponseError::from_msg(e.to_string(), Code::Internal))??;
Ok(HttpResponse::Ok().json(res))
}
@ -63,10 +81,14 @@ pub async fn patch_api_key(
body: web::Json<Value>,
path: web::Path<AuthParam>,
) -> Result<HttpResponse, ResponseError> {
let key = auth_controller
.update_key(&path.api_key, body.into_inner())
.await?;
let res = KeyView::from_key(key, &auth_controller);
let api_key = path.into_inner().api_key;
let body = body.into_inner();
let res = tokio::task::spawn_blocking(move || -> Result<_, AuthControllerError> {
let key = auth_controller.update_key(&api_key, body)?;
Ok(KeyView::from_key(key, &auth_controller))
})
.await
.map_err(|e| ResponseError::from_msg(e.to_string(), Code::Internal))??;
Ok(HttpResponse::Ok().json(res))
}
@ -75,7 +97,10 @@ pub async fn delete_api_key(
auth_controller: GuardedData<MasterPolicy, AuthController>,
path: web::Path<AuthParam>,
) -> Result<HttpResponse, ResponseError> {
auth_controller.delete_key(&path.api_key).await?;
let api_key = path.into_inner().api_key;
tokio::task::spawn_blocking(move || auth_controller.delete_key(&api_key))
.await
.map_err(|e| ResponseError::from_msg(e.to_string(), Code::Internal))??;
Ok(HttpResponse::NoContent().finish())
}

View File

@ -7,10 +7,13 @@ use serde_json::json;
use crate::analytics::Analytics;
use crate::extractors::authentication::{policies::*, GuardedData};
use crate::extractors::sequential_extractor::SeqHandler;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::post().to(create_dump)))
.service(web::resource("/{dump_uid}/status").route(web::get().to(get_dump_status)));
cfg.service(web::resource("").route(web::post().to(SeqHandler(create_dump))))
.service(
web::resource("/{dump_uid}/status").route(web::get().to(SeqHandler(get_dump_status))),
);
}
pub async fn create_dump(

View File

@ -20,6 +20,7 @@ use crate::analytics::Analytics;
use crate::error::MeilisearchHttpError;
use crate::extractors::authentication::{policies::*, GuardedData};
use crate::extractors::payload::Payload;
use crate::extractors::sequential_extractor::SeqHandler;
use crate::task::SummarizedTaskView;
const DEFAULT_RETRIEVE_DOCUMENTS_OFFSET: usize = 0;
@ -71,17 +72,17 @@ pub struct DocumentParam {
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(
web::resource("")
.route(web::get().to(get_all_documents))
.route(web::post().to(add_documents))
.route(web::put().to(update_documents))
.route(web::delete().to(clear_all_documents)),
.route(web::get().to(SeqHandler(get_all_documents)))
.route(web::post().to(SeqHandler(add_documents)))
.route(web::put().to(SeqHandler(update_documents)))
.route(web::delete().to(SeqHandler(clear_all_documents))),
)
// this route needs to be before the /documents/{document_id} to match properly
.service(web::resource("/delete-batch").route(web::post().to(delete_documents)))
.service(web::resource("/delete-batch").route(web::post().to(SeqHandler(delete_documents))))
.service(
web::resource("/{document_id}")
.route(web::get().to(get_document))
.route(web::delete().to(delete_document)),
.route(web::get().to(SeqHandler(get_document)))
.route(web::delete().to(SeqHandler(delete_document))),
);
}

View File

@ -9,6 +9,7 @@ use time::OffsetDateTime;
use crate::analytics::Analytics;
use crate::extractors::authentication::{policies::*, GuardedData};
use crate::extractors::sequential_extractor::SeqHandler;
use crate::task::SummarizedTaskView;
pub mod documents;
@ -20,17 +21,17 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(
web::resource("")
.route(web::get().to(list_indexes))
.route(web::post().to(create_index)),
.route(web::post().to(SeqHandler(create_index))),
)
.service(
web::scope("/{index_uid}")
.service(
web::resource("")
.route(web::get().to(get_index))
.route(web::put().to(update_index))
.route(web::delete().to(delete_index)),
.route(web::get().to(SeqHandler(get_index)))
.route(web::put().to(SeqHandler(update_index)))
.route(web::delete().to(SeqHandler(delete_index))),
)
.service(web::resource("/stats").route(web::get().to(get_index_stats)))
.service(web::resource("/stats").route(web::get().to(SeqHandler(get_index_stats))))
.service(web::scope("/documents").configure(documents::configure))
.service(web::scope("/search").configure(search::configure))
.service(web::scope("/tasks").configure(tasks::configure))

View File

@ -9,12 +9,13 @@ use serde_json::Value;
use crate::analytics::{Analytics, SearchAggregator};
use crate::extractors::authentication::{policies::*, GuardedData};
use crate::extractors::sequential_extractor::SeqHandler;
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(
web::resource("")
.route(web::get().to(search_with_url_query))
.route(web::post().to(search_with_post)),
.route(web::get().to(SeqHandler(search_with_url_query)))
.route(web::post().to(SeqHandler(search_with_post))),
);
}

View File

@ -23,6 +23,7 @@ macro_rules! make_setting_route {
use crate::analytics::Analytics;
use crate::extractors::authentication::{policies::*, GuardedData};
use crate::extractors::sequential_extractor::SeqHandler;
use crate::task::SummarizedTaskView;
use meilisearch_error::ResponseError;
@ -98,9 +99,9 @@ macro_rules! make_setting_route {
pub fn resources() -> Resource {
Resource::new($route)
.route(web::get().to(get))
.route(web::post().to(update))
.route(web::delete().to(delete))
.route(web::get().to(SeqHandler(get)))
.route(web::post().to(SeqHandler(update)))
.route(web::delete().to(SeqHandler(delete)))
}
}
};
@ -226,11 +227,12 @@ make_setting_route!(
macro_rules! generate_configure {
($($mod:ident),*) => {
pub fn configure(cfg: &mut web::ServiceConfig) {
use crate::extractors::sequential_extractor::SeqHandler;
cfg.service(
web::resource("")
.route(web::post().to(update_all))
.route(web::get().to(get_all))
.route(web::delete().to(delete_all)))
.route(web::post().to(SeqHandler(update_all)))
.route(web::get().to(SeqHandler(get_all)))
.route(web::delete().to(SeqHandler(delete_all))))
$(.service($mod::resources()))*;
}
};

View File

@ -8,11 +8,12 @@ use time::OffsetDateTime;
use crate::analytics::Analytics;
use crate::extractors::authentication::{policies::*, GuardedData};
use crate::extractors::sequential_extractor::SeqHandler;
use crate::task::{TaskListView, TaskView};
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::get().to(get_all_tasks_status)))
.service(web::resource("{task_id}").route(web::get().to(get_task_status)));
cfg.service(web::resource("").route(web::get().to(SeqHandler(get_all_tasks_status))))
.service(web::resource("{task_id}").route(web::get().to(SeqHandler(get_task_status))));
}
#[derive(Debug, Serialize)]

View File

@ -7,11 +7,12 @@ use serde_json::json;
use crate::analytics::Analytics;
use crate::extractors::authentication::{policies::*, GuardedData};
use crate::extractors::sequential_extractor::SeqHandler;
use crate::task::{TaskListView, TaskView};
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(web::resource("").route(web::get().to(get_tasks)))
.service(web::resource("/{task_id}").route(web::get().to(get_task)));
cfg.service(web::resource("").route(web::get().to(SeqHandler(get_tasks))))
.service(web::resource("/{task_id}").route(web::get().to(SeqHandler(get_task))));
}
async fn get_tasks(