diff options
author | David Pedersen <david.pdrsn@gmail.com> | 2021-07-05 16:18:39 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-07-05 16:18:39 +0200 |
commit | c4d266e94d14cedb02534a0dcd84cd5326002783 (patch) | |
tree | 1022d4a4f3eccfa70f1714241c0698d484cd58c8 | |
parent | 3fc7f1880f3842bde846c246dbd3b8a3d251aaf4 (diff) |
Allow errors (#26)
This changes error model to actually allow errors. I think if we're going to use this for things like tonic's route we need a more flexible error handling model. The same `handle_error` adaptors are still there but services aren't required to have `Infallible` as their error type. The error type is simply propagated all the way through.
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | examples/key_value_store.rs | 4 | ||||
-rw-r--r-- | src/buffer.rs | 190 | ||||
-rw-r--r-- | src/handler/mod.rs | 11 | ||||
-rw-r--r-- | src/lib.rs | 37 | ||||
-rw-r--r-- | src/routing.rs | 139 | ||||
-rw-r--r-- | src/service/mod.rs | 82 |
7 files changed, 311 insertions, 153 deletions
@@ -31,6 +31,7 @@ serde = "1.0" serde_json = "1.0" serde_urlencoded = "0.7" tokio = { version = "1", features = ["time"] } +tokio-util = "0.6" tower = { version = "0.4", features = ["util", "buffer", "make"] } tower-http = { version = "0.1", features = ["add-extension", "map-response-body"] } diff --git a/examples/key_value_store.rs b/examples/key_value_store.rs index d05446eb..3f685a50 100644 --- a/examples/key_value_store.rs +++ b/examples/key_value_store.rs @@ -11,6 +11,7 @@ use awebframework::{ prelude::*, response::IntoResponse, routing::BoxRoute, + service::ServiceExt, }; use bytes::Bytes; use http::StatusCode; @@ -53,7 +54,8 @@ async fn main() { .into_inner(), ) // Handle errors from middleware - .handle_error(handle_error); + .handle_error(handle_error) + .check_infallible(); // Run our app with hyper let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); diff --git a/src/buffer.rs b/src/buffer.rs new file mode 100644 index 00000000..21697c12 --- /dev/null +++ b/src/buffer.rs @@ -0,0 +1,190 @@ +use futures_util::ready; +use pin_project::pin_project; +use std::{ + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; +use tokio::sync::{mpsc, oneshot, OwnedSemaphorePermit, Semaphore}; +use tokio_util::sync::PollSemaphore; +use tower::{Service, ServiceExt}; + +/// A version of [`tower::buffer::Buffer`] which panicks on channel related errors, thus keeping +/// the error type of the service. +pub(crate) struct MpscBuffer<S, R> +where + S: Service<R>, +{ + tx: mpsc::UnboundedSender<Msg<S, R>>, + semaphore: PollSemaphore, + permit: Option<OwnedSemaphorePermit>, +} + +impl<S, R> Clone for MpscBuffer<S, R> +where + S: Service<R>, +{ + fn clone(&self) -> Self { + Self { + tx: self.tx.clone(), + semaphore: self.semaphore.clone(), + permit: None, + } + } +} + +impl<S, R> MpscBuffer<S, R> +where + S: Service<R>, +{ + pub(crate) fn new(svc: S) -> Self + where + S: Send + 'static, + R: Send + 'static, + S::Error: Send + 'static, + S::Future: Send + 'static, + { + let (tx, rx) = mpsc::unbounded_channel::<Msg<S, R>>(); + let semaphore = PollSemaphore::new(Arc::new(Semaphore::new(1024))); + + tokio::spawn(run_worker(svc, rx)); + + Self { + tx, + semaphore, + permit: None, + } + } +} + +async fn run_worker<S, R>(mut svc: S, mut rx: mpsc::UnboundedReceiver<Msg<S, R>>) +where + S: Service<R>, +{ + while let Some((req, reply_tx)) = rx.recv().await { + match svc.ready().await { + Ok(svc) => { + let future = svc.call(req); + let _ = reply_tx.send(WorkerReply::Future(future)); + } + Err(err) => { + let _ = reply_tx.send(WorkerReply::Error(err)); + } + } + } +} + +type Msg<S, R> = ( + R, + oneshot::Sender<WorkerReply<<S as Service<R>>::Future, <S as Service<R>>::Error>>, +); + +enum WorkerReply<F, E> { + Future(F), + Error(E), +} + +impl<S, R> Service<R> for MpscBuffer<S, R> +where + S: Service<R>, +{ + type Response = S::Response; + type Error = S::Error; + type Future = ResponseFuture<S::Future, S::Error>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { + if self.permit.is_some() { + return Poll::Ready(Ok(())); + } + + let permit = ready!(self.semaphore.poll_acquire(cx)) + .expect("buffer semaphore closed. This is a bug in awebframework and should never happen. Please file an issue"); + + self.permit = Some(permit); + + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: R) -> Self::Future { + let permit = self + .permit + .take() + .expect("semaphore permit missing. Did you forget to call `poll_ready`?"); + + let (reply_tx, reply_rx) = oneshot::channel::<WorkerReply<S::Future, S::Error>>(); + + self.tx.send((req, reply_tx)).unwrap_or_else(|_| { + panic!("buffer worker not running. This is a bug in awebframework and should never happen. Please file an issue") + }); + + ResponseFuture { + state: State::Channel(reply_rx), + permit, + } + } +} + +#[pin_project] +pub(crate) struct ResponseFuture<F, E> { + #[pin] + state: State<F, E>, + permit: OwnedSemaphorePermit, +} + +#[pin_project(project = StateProj)] +enum State<F, E> { + Channel(oneshot::Receiver<WorkerReply<F, E>>), + Future(#[pin] F), +} + +impl<F, E, T> Future for ResponseFuture<F, E> +where + F: Future<Output = Result<T, E>>, +{ + type Output = Result<T, E>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { + loop { + let mut this = self.as_mut().project(); + + let new_state = match this.state.as_mut().project() { + StateProj::Channel(reply_rx) => { + let msg = ready!(Pin::new(reply_rx).poll(cx)) + .expect("buffer worker not running. This is a bug in awebframework and should never happen. Please file an issue"); + + match msg { + WorkerReply::Future(future) => State::Future(future), + WorkerReply::Error(err) => return Poll::Ready(Err(err)), + } + } + StateProj::Future(future) => { + return future.poll(cx); + } + }; + + this.state.set(new_state); + } + } +} + +#[cfg(test)] +mod tests { + #[allow(unused_imports)] + use super::*; + use tower::ServiceExt; + + #[tokio::test] + async fn test_buffer() { + let mut svc = MpscBuffer::new(tower::service_fn(handle)); + + let res = svc.ready().await.unwrap().call(42).await.unwrap(); + + assert_eq!(res, "foo"); + } + + async fn handle(req: i32) -> Result<&'static str, std::convert::Infallible> { + assert_eq!(req, 42); + Ok("foo") + } +} diff --git a/src/handler/mod.rs b/src/handler/mod.rs index e1ea5d41..c015672a 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -263,7 +263,7 @@ pub trait Handler<B, In>: Sized { /// # }; /// ``` /// - /// When adding middleware that might fail its required to handle those + /// When adding middleware that might fail its recommended to handle those /// errors. See [`Layered::handle_error`] for more details. fn layer<L>(self, layer: L) -> Layered<L::Service, In> where @@ -397,11 +397,10 @@ impl<S, T> Layered<S, T> { /// Create a new [`Layered`] handler where errors will be handled using the /// given closure. /// - /// awebframework requires that services gracefully handles all errors. That - /// means when you apply a Tower middleware that adds a new failure - /// condition you have to handle that as well. + /// This is used to convert errors to responses rather than simply + /// terminating the connection. /// - /// That can be done using `handle_error` like so: + /// `handle_error` can be used like so: /// /// ```rust /// use awebframework::prelude::*; @@ -415,7 +414,7 @@ impl<S, T> Layered<S, T> { /// let layered_handler = handler /// .layer(TimeoutLayer::new(Duration::from_secs(30))); /// - /// // ...so we must handle that error + /// // ...so we should handle that error /// let layered_handler = layered_handler.handle_error(|error: BoxError| { /// if error.is::<tower::timeout::error::Elapsed>() { /// ( @@ -304,17 +304,17 @@ //! //! ## Error handling //! -//! awebframework requires all errors to be handled. That is done by using -//! [`std::convert::Infallible`] as the error type in all its [`Service`] -//! implementations. -//! -//! For handlers created from async functions this is works automatically since -//! handlers must return something that implements -//! [`IntoResponse`](response::IntoResponse), even if its a `Result`. -//! -//! However middleware might add new failure cases that has to be handled. For -//! that awebframework provides a [`handle_error`](handler::Layered::handle_error) -//! combinator: +//! Handlers created from async functions must always produce a response, even +//! when returning a `Result<T, E>` the error type must implement +//! [`IntoResponse`]. In practice this makes error handling very perdictable and +//! easier to reason about. +//! +//! However when applying middleware, or embedding other tower services, errors +//! might happen. For example [`Timeout`] will return an error if the timeout +//! elapses. By default these errors will be propagated all the way up to hyper +//! where the connection will be closed. If that isn't desireable you can call +//! [`handle_error`](handler::Layered::handle_error) to handle errors from +//! adding a middleware to a handler: //! //! ```rust,no_run //! use awebframework::prelude::*; @@ -488,7 +488,6 @@ //! "/static/Cargo.toml", //! service::get( //! ServeFile::new("Cargo.toml") -//! // Errors must be handled //! .handle_error(|error: std::io::Error| { /* ... */ }) //! ) //! ); @@ -508,7 +507,6 @@ //! use awebframework::{prelude::*, routing::BoxRoute, body::{Body, BoxBody}}; //! use tower_http::services::ServeFile; //! use http::Response; -//! use std::convert::Infallible; //! //! fn api_routes() -> BoxRoute<Body> { //! route("/users", get(|_: Request<Body>| async { /* ... */ })).boxed() @@ -527,7 +525,6 @@ //! use awebframework::{prelude::*, service::ServiceExt, routing::nest}; //! use tower_http::services::ServeDir; //! use http::Response; -//! use std::convert::Infallible; //! use tower::{service_fn, BoxError}; //! //! let app = nest( @@ -558,6 +555,8 @@ //! [tokio]: http://crates.io/crates/tokio //! [hyper]: http://crates.io/crates/hyper //! [feature flags]: https://doc.rust-lang.org/cargo/reference/features.html#the-features-section +//! [`IntoResponse`]: crate::response::IntoResponse +//! [`Timeout`]: tower::timeout::Timeout #![doc(html_root_url = "https://docs.rs/tower-http/0.1.0")] #![warn( @@ -609,12 +608,12 @@ use self::body::Body; use http::Request; use routing::{EmptyRouter, Route}; -use std::convert::Infallible; use tower::Service; #[macro_use] pub(crate) mod macros; +mod buffer; mod util; pub mod body; @@ -661,12 +660,6 @@ pub mod prelude { /// `service` is the [`Service`] that should receive the request if the path /// matches `description`. /// -/// Note that `service`'s error type must be [`Infallible`] meaning you must -/// handle all errors. If you're creating handlers from async functions that is -/// handled automatically but if you're routing to some other [`Service`] you -/// might need to use [`handle_error`](service::ServiceExt::handle_error) to map -/// errors into responses. -/// /// # Examples /// /// ```rust @@ -688,7 +681,7 @@ pub mod prelude { /// Panics if `description` doesn't start with `/`. pub fn route<S, B>(description: &str, service: S) -> Route<S, EmptyRouter> where - S: Service<Request<B>, Error = Infallible> + Clone, + S: Service<Request<B>> + Clone, { use routing::RoutingDsl; diff --git a/src/routing.rs b/src/routing.rs index 288b8500..fee6274f 100644 --- a/src/routing.rs +++ b/src/routing.rs @@ -1,11 +1,10 @@ //! Routing between [`Service`]s. -use crate::{body::BoxBody, response::IntoResponse, util::ByteStr}; +use crate::{body::BoxBody, buffer::MpscBuffer, response::IntoResponse, util::ByteStr}; use async_trait::async_trait; use bytes::Bytes; -use futures_util::{future, ready}; +use futures_util::future; use http::{Method, Request, Response, StatusCode, Uri}; -use http_body::Full; use pin_project::pin_project; use regex::Regex; use std::{ @@ -18,7 +17,6 @@ use std::{ task::{Context, Poll}, }; use tower::{ - buffer::Buffer, util::{BoxService, Oneshot, ServiceExt}, BoxError, Layer, Service, ServiceBuilder, }; @@ -106,7 +104,7 @@ pub trait RoutingDsl: crate::sealed::Sealed + Sized { /// ``` fn route<T, B>(self, description: &str, svc: T) -> Route<T, Self> where - T: Service<Request<B>, Error = Infallible> + Clone, + T: Service<Request<B>> + Clone, { Route { pattern: PathPattern::new(description), @@ -120,7 +118,7 @@ pub trait RoutingDsl: crate::sealed::Sealed + Sized { /// See [`nest`] for more details. fn nest<T, B>(self, description: &str, svc: T) -> Nested<T, Self> where - T: Service<Request<B>, Error = Infallible> + Clone, + T: Service<Request<B>> + Clone, { Nested { pattern: PathPattern::new(description), @@ -152,11 +150,10 @@ pub trait RoutingDsl: crate::sealed::Sealed + Sized { /// /// It also helps with compile times when you have a very large number of /// routes. - fn boxed<ReqBody, ResBody>(self) -> BoxRoute<ReqBody> + fn boxed<ReqBody, ResBody>(self) -> BoxRoute<ReqBody, Self::Error> where - Self: Service<Request<ReqBody>, Response = Response<ResBody>, Error = Infallible> - + Send - + 'static, + Self: Service<Request<ReqBody>, Response = Response<ResBody>> + Send + 'static, + <Self as Service<Request<ReqBody>>>::Error: Into<BoxError> + Send + Sync, <Self as Service<Request<ReqBody>>>::Future: Send, ReqBody: http_body::Body<Data = Bytes> + Send + Sync + 'static, ReqBody::Error: Into<BoxError> + Send + Sync + 'static, @@ -165,7 +162,7 @@ pub trait RoutingDsl: crate::sealed::Sealed + Sized { { ServiceBuilder::new() .layer_fn(BoxRoute) - .buffer(1024) + .layer_fn(MpscBuffer::new) .layer(BoxService::layer()) .layer(MapResponseBodyLayer::new(BoxBody::new)) .service(self) @@ -231,9 +228,6 @@ pub trait RoutingDsl: crate::sealed::Sealed + Sized { /// # hyper::Server::bind(&"".parse().unwrap()).serve(app.into_make_service()).await.unwrap(); /// # }; /// ``` - /// - /// When adding middleware that might fail its required to handle those - /// errors. See [`Layered::handle_error`] for more details. fn layer<L>(self, layer: L) -> Layered<L::Service> where L: Layer<Self>, @@ -275,11 +269,11 @@ impl<S, F> crate::sealed::Sealed for Route<S, F> {} impl<S, F, B> Service<Request<B>> for Route<S, F> where - S: Service<Request<B>, Response = Response<BoxBody>, Error = Infallible> + Clone, - F: Service<Request<B>, Response = Response<BoxBody>, Error = Infallible> + Clone, + S: Service<Request<B>, Response = Response<BoxBody>> + Clone, + F: Service<Request<B>, Response = Response<BoxBody>, Error = S::Error> + Clone, { type Response = Response<BoxBody>; - type Error = Infallible; + type Error = S::Error; type Future = RouteFuture<S, F, B>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { @@ -333,10 +327,10 @@ where impl<S, F, B> Future for RouteFuture<S, F, B> where - S: Service<Request<B>, Response = Response<BoxBody>, Error = Infallible>, - F: Service<Request<B>, Response = Response<BoxBody>, Error = Infallible>, + S: Service<Request<B>, Response = Response<BoxBody>>, + F: Service<Request<B>, Response = Response<BoxBody>, Error = S::Error>, { - type Output = Result<Response<BoxBody>, Infallible>; + type Output = Result<Response<BoxBody>, S::Error>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { match self.project().0.project() { @@ -491,28 +485,33 @@ type Captures = Vec<(String, String)>; /// A boxed route trait object. /// /// See [`RoutingDsl::boxed`] for more details. -pub struct BoxRoute<B>(Buffer<BoxService<Request<B>, Response<BoxBody>, Infallible>, Request<B>>); +pub struct BoxRoute<B, E = Infallible>( + MpscBuffer<BoxService<Request<B>, Response<BoxBody>, E>, Request<B>>, +); -impl<B> Clone for BoxRoute<B> { +impl<B, E> Clone for BoxRoute<B, E> { fn clone(&self) -> Self { Self(self.0.clone()) } } -impl<B> fmt::Debug for BoxRoute<B> { +impl<B, E> fmt::Debug for BoxRoute<B, E> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BoxRoute").finish() } } -impl<B> RoutingDsl for BoxRoute<B> {} +impl<B, E> RoutingDsl for BoxRoute<B, E> {} -impl<B> crate::sealed::Sealed for BoxRoute<B> {} +impl<B, E> crate::sealed::Sealed for BoxRoute<B, E> {} -impl<B> Service<Request<B>> for BoxRoute<B> { +impl<B, E> Service<Request<B>> for BoxRoute<B, E> +where + E: Into<BoxError>, +{ type Response = Response<BoxBody>; - type Error = Infallible; - type Future = BoxRouteFuture<B>; + type Error = E; + type Future = BoxRouteFuture<B, E>; #[inline] fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { @@ -521,64 +520,41 @@ impl<B> Service<Request<B>> for BoxRoute<B> { #[inline] fn call(&mut self, req: Request<B>) -> Self::Future { - BoxRouteFuture(self.0.clone().oneshot(req)) + BoxRouteFuture { + inner: self.0.clone().oneshot(req), + } } } /// The response future for [`BoxRoute`]. #[pin_project] -pub struct BoxRouteFuture<B>(#[pin] InnerFuture<B>); - -type InnerFuture<B> = - Oneshot<Buffer<BoxService<Request<B>, Response<BoxBody>, Infallible>, Request<B>>, Request<B>>; - -impl<B> fmt::Debug for BoxRouteFuture<B> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("BoxRouteFuture").finish() - } +pub struct BoxRouteFuture<B, E> +where + E: Into<BoxError>, +{ + #[pin] + inner: + Oneshot<MpscBuffer<BoxService<Request<B>, Response<BoxBody>, E>, Request<B>>, Request<B>>, } -impl<B> Future for BoxRouteFuture<B> { - type Output = Result<Response<BoxBody>, Infallible>; +impl<B, E> Future for BoxRouteFuture<B, E> +where + E: Into<BoxError>, +{ + type Output = Result<Response<BoxBody>, E>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { - match ready!(self.project().0.poll(cx)) { - Ok(res) => Poll::Ready(Ok(res)), - Err(err) => Poll::Ready(Ok(handle_buffer_error(err))), - } + self.project().inner.poll(cx) } } -fn handle_buffer_error(error: BoxError) -> Response<BoxBody> { - use tower::buffer::error::{Closed, ServiceError}; - - let error = match error.downcast::<Closed>() { - Ok(closed) => { - return Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(BoxBody::new(Full::from(closed.to_string()))) - .unwrap(); - } - Err(e) => e, - }; - - let error = match error.downcast::<ServiceError>() { - Ok(service_error) => { - return Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(BoxBody::new(Full::from(format!("Service error: {}. This is a bug in awebframework. All inner services should be infallible. Please file an issue", service_error)))) - .unwrap(); - } - Err(e) => e, - }; - - Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body(BoxBody::new(Full::from(format!( - "Uncountered an unknown error: {}. This should never happen. Please file an issue", - error - )))) - .unwrap() +impl<B, E> fmt::Debug for BoxRouteFuture<B, E> +where + E: Into<BoxError>, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BoxRouteFuture").finish() + } } /// A [`Service`] created from a router by applying a Tower middleware. @@ -622,9 +598,8 @@ impl<S> Layered<S> { /// Create a new [`Layered`] service where errors will be handled using the /// given closure. /// - /// awebframework requires that services gracefully handles all errors. That - /// means when you apply a Tower middleware that adds a new failure - /// condition you have to handle that as well. + /// This is used to convert errors to responses rather than simply + /// terminating the connection. /// /// That can be done using `handle_error` like so: /// @@ -640,7 +615,7 @@ impl<S> Layered<S> { /// let layered_app = route("/", get(handler)) /// .layer(TimeoutLayer::new(Duration::from_secs(30))); /// - /// // ...so we must handle that error + /// // ...so we should handle that error /// let with_errors_handled = layered_app.handle_error(|error: BoxError| { /// if error.is::<tower::timeout::error::Elapsed>() { /// ( @@ -771,7 +746,7 @@ where /// `nest`. pub fn nest<S, B>(description: &str, svc: S) -> Nested<S, EmptyRouter> where - S: Service<Request<B>, Error = Infallible> + Clone, + S: Service<Request<B>> + Clone, { Nested { pattern: PathPattern::new(description), @@ -796,11 +771,11 @@ impl<S, F> crate::sealed::Sealed for Nested<S, F> {} impl<S, F, B> Service<Request<B>> for Nested<S, F> where - S: Service<Request<B>, Response = Response<BoxBody>, Error = Infallible> + Clone, - F: Service<Request<B>, Response = Response<BoxBody>, Error = Infallible> + Clone, + S: Service<Request<B>, Response = Response<BoxBody>> + Clone, + F: Service<Request<B>, Response = Response<BoxBody>, Error = S::Error> + Clone, { type Response = Response<BoxBody>; - type Error = Infallible; + type Error = S::Error; type Future = RouteFuture<S, F, B>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { diff --git a/src/service/mod.rs b/src/service/mod.rs index 4e9da348..52c9c4d1 100644 --- a/src/service/mod.rs +++ b/src/service/mod.rs @@ -111,7 +111,7 @@ pub mod future; /// See [`get`] for an example. pub fn any<S, B>(svc: S) -> OnMethod<BoxResponseBody<S, B>, EmptyRouter> where - S: Service<Request<B>, Error = Infallible> + Clone, + S: Service<Request<B>> + Clone, { on(MethodFilter::Any, svc) } @@ -121,7 +121,7 @@ where /// See [`get`] for an example. pub fn connect<S, B>(svc: S) -> OnMethod<BoxResponseBody<S, B>, EmptyRouter> where - S: Service<Request<B>, Error = Infallible> + Clone, + S: Service<Request<B>> + Clone, { on(MethodFilter::Connect, svc) } @@ -131,7 +131,7 @@ where /// See [`get`] for an example. pub fn delete<S, B>(svc: S) -> OnMethod<BoxResponseBody<S, B>, EmptyRouter> where - S: Service<Request<B>, Error = Infallible> + Clone, + S: Service<Request<B>> + Clone, { on(MethodFilter::Delete, svc) } @@ -156,12 +156,9 @@ where /// # hyper::Server::bind(&"".parse().unwrap()).serve(app.into_make_service()).await.unwrap(); /// # }; /// ``` -/// -/// You can only add services who cannot fail (their error type must be -/// [`Infallible`]). To gracefully handle errors see [`ServiceExt::handle_error`]. pub fn get<S, B>(svc: S) -> OnMethod<BoxResponseBody<S, B>, EmptyRouter> where - S: Service<Request<B>, Error = Infallible> + Clone, + S: Service<Request<B>> + Clone, { on(MethodFilter::Get, svc) } @@ -171,7 +168,7 @@ where /// See [`get`] for an example. pub fn head<S, B>(svc: S) -> OnMethod<BoxResponseBody<S, B>, EmptyRouter> where - S: Service<Request<B>, Error = Infallible> + Clone, + S: Service<Request<B>> + Clone, { on(MethodFilter::Head, svc) } @@ -181,7 +178,7 @@ where /// See [`get`] for an example. pub fn options<S, B>(svc: S) -> OnMethod<BoxResponseBody<S, B>, EmptyRouter> where - S: Service<Request<B>, Error = Infallible> + Clone, + S: Service<Request<B>> + Clone, { on(MethodFilter::Options, svc) } @@ -191,7 +188,7 @@ where /// See [`get`] for an example. pub fn patch<S, B>(svc: S) -> OnMethod<BoxResponseBody<S, B>, EmptyRouter> where - S: Service<Request<B>, Error = Infallible> + Clone, + S: Service<Request<B>> + Clone, { on(MethodFilter::Patch, svc) } @@ -201,7 +198,7 @@ where /// See [`get`] for an example. pub fn post<S, B>(svc: S) -> OnMethod<BoxResponseBody<S, B>, EmptyRouter> where - S: Service<Request<B>, Error = Infallible> + Clone, + S: Service<Request<B>> + Clone, { on(MethodFilter::Post, svc) } @@ -211,7 +208,7 @@ where /// See [`get`] for an example. pub fn put<S, B>(svc: S) -> OnMethod<BoxResponseBody<S, B>, EmptyRouter> where - S: Service<Request<B>, Error = Infallible> + Clone, + S: Service<Request<B>> + Clone, { on(MethodFilter::Put, svc) } @@ -221,7 +218,7 @@ where /// See [`get`] for an example. pub fn trace<S, B>(svc: S) -> OnMethod<BoxResponseBody<S, B>, EmptyRouter> where - S: Service<Request<B>, Error = Infallible> + Clone, + S: Service<Request<B>> + Clone, { on(MethodFilter::Trace, svc) } @@ -248,7 +245,7 @@ where /// ``` pub fn on<S, B>(method: MethodFilter, svc: S) -> OnMethod<BoxResponseBody<S, B>, EmptyRouter> where - S: Service<Request<B>, Error = Infallible> + Clone, + S: Service<Request<B>> + Clone, { OnMethod { method, @@ -276,7 +273,7 @@ impl<S, F> OnMethod<S, F> { /// See [`OnMethod::get`] for an example. pub fn any<T, B>(self, svc: T) -> OnMethod<BoxResponseBody<T, B>, Self> where - T: Service<Request<B>, Error = Infallible> + Clone, + T: Service<Request<B>> + Clone, { self.on(MethodFilter::Any, svc) } @@ -286,7 +283,7 @@ impl<S, F> OnMethod<S, F> { /// See [`OnMethod::get`] for an example. pub fn connect<T, B>(self, svc: T) -> OnMethod<BoxResponseBody<T, B>, Self> where - T: Service<Request<B>, Error = Infallible> + Clone, + T: Service<Request<B>> + Clone, { self.on(MethodFilter::Connect, svc) } @@ -296,7 +293,7 @@ impl<S, F> OnMethod<S, F> { /// See [`OnMethod::get`] for an example. pub fn delete<T, B>(self, svc: T) -> OnMethod<BoxResponseBody<T, B>, Self> where - T: Service<Request<B>, Error = Infallible> + Clone, + T: Service<Request<B>> + Clone, { self.on(MethodFilter::Delete, svc) } @@ -326,13 +323,9 @@ impl<S, F> OnMethod<S, F> { /// # hyper::Server::bind(&"".parse().unwrap()).serve(app.into_make_service()).await.unwrap(); /// # }; /// ``` - /// - /// You can only add services who cannot fail (their error type must be - /// [`Infallible`]). To gracefully handle errors see - /// [`ServiceExt::handle_error`]. pub fn get<T, B>(self, svc: T) -> OnMethod<BoxResponseBody<T, B>, Self> where - T: Service<Request<B>, Error = Infallible> + Clone, + T: Service<Request<B>> + Clone, { self.on(MethodFilter::Get, svc) } @@ -342,7 +335,7 @@ impl<S, F> OnMethod<S, F> { /// See [`OnMethod::get`] for an example. pub fn head<T, B>(self, svc: T) -> OnMethod<BoxResponseBody<T, B>, Self> where - T: Service<Request<B>, Error = Infallible> + Clone, + T: Service<Request<B>> + Clone, { self.on(MethodFilter::Head, svc) } @@ -352,7 +345,7 @@ impl<S, F> OnMethod<S, F> { /// See [`OnMethod::get`] for an example. pub fn options<T, B>(self, svc: T) -> OnMethod<BoxResponseBody<T, B>, Self> where - T: Service<Request<B>, Error = Infallible> + Clone, + T: Service<Request<B>> + Clone, { self.on(MethodFilter::Options, svc) } @@ -362,7 +355,7 @@ impl<S, F> OnMethod<S, F> { /// See [`OnMethod::get`] for an example. pub fn patch<T, B>(self, svc: T) -> OnMethod<BoxResponseBody<T, B>, Self> where - T: Service<Request<B>, Error = Infallible> + Clone, + T: Service<Request<B>> + Clone, { self.on(MethodFilter::Patch, svc) } @@ -372,7 +365,7 @@ impl<S, F> OnMethod<S, F> { /// See [`OnMethod::get`] for an example. pub fn post<T, B>(self, svc: T) -> OnMethod<BoxResponseBody<T, B>, Self> where - T: Service<Request<B>, Error = Infallible> + Clone, + T: Service<Request<B>> + Clone, { self.on(MethodFilter::Post, svc) } @@ -382,7 +375,7 @@ impl<S, F> OnMethod<S, F> { /// See [`OnMethod::get`] for an example. pub fn put<T, B>(self, svc: T) -> OnMethod<BoxResponseBody<T, B>, Self> where - T: Service<Request<B>, Error = Infallible> + Clone, + T: Service<Request<B>> + Clone, { self.on(MethodFilter::Put, svc) } @@ -392,7 +385,7 @@ impl<S, F> OnMethod<S, F> { /// See [`OnMethod::get`] for an example. pub fn trace<T, B>(self, svc: T) -> OnMethod<BoxResponseBody<T, B>, Self> where - T: Service<Request<B>, Error = Infallible> + Clone, + T: Service<Request<B>> + Clone, { self.on(MethodFilter::Trace, svc) } @@ -424,7 +417,7 @@ impl<S, F> OnMethod<S, F> { /// ``` pub fn on<T, B>(self, method: MethodFilter, svc: T) -> OnMethod<BoxResponseBody<T, B>, Self> where - T: Service<Request<B>, Error = Infallible> + Clone, + T: Service<Request<B>> + Clone, { OnMethod { method, @@ -441,11 +434,11 @@ impl<S, F> OnMethod<S, F> { // that up, but not sure its possible. impl<S, F, B> Service<Request<B>> for OnMethod<S, F> where - S: Service<Request<B>, Response = Response<BoxBody>, Error = Infallible> + Clone, - F: Service<Request<B>, Response = Response<BoxBody>, Error = Infallible> + Clone, + S: Service<Request<B>, Response = Response<BoxBody>> + Clone, + F: Service<Request<B>, Response = Response<BoxBody>, Error = S::Error> + Clone, { type Response = Response<BoxBody>; - type Error = Infallible; + type Error = S::Error; type Future = RouteFuture<S, F, B>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { @@ -541,11 +534,6 @@ pub trait ServiceExt<ReqBody, ResBody>: { /// Handle errors from a service. /// - /// awebframework requires all handlers and services, that are part of the - /// router, to never return errors. If you route to [`Service`], not created - /// by awebframework, who's error isn't `Infallible` you can use this combinator - /// to handle the error. - /// /// `handle_error` takes a closure that will map errors from the service /// into responses. The closure's return type must implement /// [`IntoResponse`]. @@ -584,6 +572,16 @@ pub trait ServiceExt<ReqBody, ResBody>: { HandleError::new(self, f) } + + /// Check that your service cannot fail. + /// + /// That is its error type is [`Infallible`]. + fn check_infallible(self) -> Self + where + Self: Service<Request<ReqBody>, Response = Response<ResBody>, Error = Infallible> + Sized, + { + self + } } impl<S, ReqBody, ResBody> ServiceExt<ReqBody, ResBody> for S where @@ -622,12 +620,12 @@ where impl<S, ReqBody, ResBody> Service<Request<ReqBody>> for BoxResponseBody<S, ReqBody> where - S: Service<Request<ReqBody>, Response = Response<ResBody>, Error = Infallible> + Clone, + S: Service<Request<ReqBody>, Response = Response<ResBody>> + Clone, ResBody: http_body::Body<Data = Bytes> + Send + Sync + 'static, ResBody::Error: Into<BoxError> + Send + Sync + 'static, { type Response = Response<BoxBody>; - type Error = Infallible; + type Error = S::Error; type Future = BoxResponseBodyFuture<Oneshot<S, Request<ReqBody>>>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { @@ -645,13 +643,13 @@ where #[derive(Debug)] pub struct BoxResponseBodyFuture<F>(#[pin] F); -impl<F, B> Future for BoxResponseBodyFuture<F> +impl<F, B, E> Future for BoxResponseBodyFuture<F> where - F: Future<Output = Result<Response<B>, Infallible>>, + F: Future<Output = Result<Response<B>, E>>, B: http_body::Body<Data = Bytes> + Send + Sync + 'static, B::Error: Into<BoxError> + Send + Sync + 'static, { - type Output = Result<Response<BoxBody>, Infallible>; + type Output = Result<Response<BoxBody>, E>; fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { let res = ready!(self.project().0.poll(cx))?; |