Skip to content

Commit

Permalink
Migrate Gotham to std Futures to support Async/Await
Browse files Browse the repository at this point in the history
With the anticipated release of async-await in stable rust this week, I took an effort to migrate Gotham to run on std futures using the pre-release versions of the dependencies (tokio, hyper, etc).

This doesn't attempt to introduce `await` or `async fn` into the codebase (there was a single unavoidable case due to an `tokio::File::metadata` API change).  That is designed as a future activity.

This migration involved a few key efforts:

1. Convert from Futures 0.1 to Futures-preview 0.3 (mostly `Future<Item=>` to `Future<Output=Result<>>`).
2. Update dependencies to pre-release versions (`tokio-0.2` and `hyper-0.13`).  There's still other dependencies that are outstanding and blocking the full release.
3. Migrate Handler trait to a pinned box HandlerFuture.

This is a work-in-progress with a few blockers before this would be ready:

Gotham Dependencies:

[ ] Update Futures from `futures-preview` to `futures = 0.3` when the other dependencies (hyper, tokio, etc) update in concert.
[ ] Update Tokio to `0.2` from alpha pre-releases
[ ] Update Hyper to `0.13` from alpha pre-releases
[ ] Update Tower-Service to `0.3` from alpha pre-releases.  Hyper is migrating many of its traits to `tower-service::Service` and so is now a direct dependency.
[ ] Released version of `futures_rustls` which is currently a branch of `tokio-rustls` ported to Futures-preview
[ ] Released version of `futures-tokio-compat` or suggested `tokio::compat` library for bridging `futures::AsyncRead` and `tokio::AsyncRead`.  See tokio-rs/tokio#1297
    and async-rs/async-std#54

Middleware Dependencies:

[ ] Diesel - Requires updated release of `tokio-threadpool`
[ ] JWT - Requires updated release of `jsonwebtoken`
  • Loading branch information
kevinastone committed Nov 8, 2019
1 parent 1446956 commit 935e356
Show file tree
Hide file tree
Showing 37 changed files with 719 additions and 585 deletions.
17 changes: 11 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ members = [

## Middleware
"middleware/template",
"middleware/diesel",
"middleware/jwt",
# TODO: Re-enable middleware when their dependencies are updated
# "middleware/diesel",
# "middleware/jwt",

## Examples (these crates are not published)
"examples/hello_world",
"examples/hello_world_tls",
"examples/hello_world_until",
# TODO: Re-enable when the tokio-signal dependency is updated
# "examples/hello_world_until",
"examples/shared_state",

# Tera template
Expand Down Expand Up @@ -65,15 +67,18 @@ members = [
"examples/static_assets",

# diesel
"examples/diesel",
# TODO: Re-enable when the middleware is updated
# "examples/diesel",

# openssl
"examples/openssl",
# TODO: Re-enable when this example is updated
# "examples/openssl",

# example_contribution_template
"examples/example_contribution_template/name",

"examples/websocket",
# TODO: Re-enable when tokio-tungstenite is updated
# "examples/websocket",
]

[patch.crates-io]
Expand Down
13 changes: 7 additions & 6 deletions gotham/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,20 @@ edition = "2018"

[features]
default = ["rustls"]
rustls = ["tokio-rustls"]
rustls = ["futures-rustls"]

[dependencies]
log = "0.4"
hyper = "0.12"
hyper = { version = "0.13.0-alpha.4", features = ["unstable-stream"] }
serde = "1.0"
serde_derive = "1.0"
bincode = "1.0"
mime = "0.3"
# Using alpha version of mime_guess until mime crate stabilizes (releases 1.0).
# see https://github.com/hyperium/mime/issues/52
mime_guess = "2.0.1"
futures = "0.1"
tokio = "0.1"
futures-preview = { version = "0.3.0-alpha.19", features = ["io-compat"] }
tokio = "=0.2.0-alpha.6"
bytes = "0.4"
mio = "0.6"
borrow-bag = "1.0"
Expand All @@ -47,8 +47,9 @@ cookie = "0.12"
http = "0.1"
httpdate = "0.3"
failure = "0.1"
tokio-rustls = {version = "0.9", optional = true }
tokio-io = "0.1"
futures-rustls = { git = "https://github.com/quininer/tokio-rustls", branch = "futures-rustls", optional = true }
tower-service = "=0.3.0-alpha.2"
futures-tokio-compat = { git = "https://github.com/nemo157/futures-tokio-compat/" }

[dev-dependencies]
gotham_derive = "0.5.0-dev"
Expand Down
115 changes: 63 additions & 52 deletions gotham/src/handler/assets/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ mod accepted_encoding;

use crate::error::Result;
use bytes::{BufMut, BytesMut};
use futures::{stream, try_ready, Future, Stream};
use futures::prelude::*;
use futures::ready;
use futures::task::Poll;
use http;
use httpdate::parse_http_date;
use hyper::header::*;
Expand All @@ -31,6 +33,7 @@ use std::fs::Metadata;
use std::io;
use std::iter::FromIterator;
use std::path::{Component, Path, PathBuf};
use std::pin::Pin;
use std::time::UNIX_EPOCH;

/// Represents a handler for any files under a directory.
Expand Down Expand Up @@ -176,7 +179,7 @@ impl NewHandler for DirHandler {
}

impl Handler for DirHandler {
fn handle(self, state: State) -> Box<HandlerFuture> {
fn handle(self, state: State) -> Pin<Box<HandlerFuture>> {
let path = {
let mut base_path = self.options.path;
let file_path = PathBuf::from_iter(&FilePathExtractor::borrow_from(&state).parts);
Expand All @@ -194,59 +197,62 @@ impl Handler for DirHandler {
}

impl Handler for FileHandler {
fn handle(self, state: State) -> Box<HandlerFuture> {
fn handle(self, state: State) -> Pin<Box<HandlerFuture>> {
create_file_response(self.options, state)
}
}

// Creates the `HandlerFuture` response based on the given `FileOptions`.
fn create_file_response(options: FileOptions, state: State) -> Box<HandlerFuture> {
fn create_file_response(options: FileOptions, state: State) -> Pin<Box<HandlerFuture>> {
let mime_type = mime_for_path(&options.path);
let headers = HeaderMap::borrow_from(&state).clone();

let (path, encoding) = check_compressed_options(&options, &headers);

let response_future =
File::open(path)
.and_then(File::metadata)
.and_then(move |(file, meta)| {
if not_modified(&meta, &headers) {
return Ok(http::Response::builder()
.status(StatusCode::NOT_MODIFIED)
.body(Body::empty())
.unwrap());
}
let len = meta.len();
let buf_size = optimal_buf_size(&meta);

let stream = file_stream(file, buf_size, len);
let body = Body::wrap_stream(stream);
let mut response = http::Response::builder();
response.status(StatusCode::OK);
response.header(CONTENT_LENGTH, len);
response.header(CONTENT_TYPE, mime_type.as_ref());
response.header(CACHE_CONTROL, options.cache_control);

if let Some(etag) = entity_tag(&meta) {
response.header(ETAG, etag);
}
if let Some(content_encoding) = encoding {
response.header(CONTENT_ENCODING, content_encoding);
}

Ok(response.body(body).unwrap())
});
Box::new(response_future.then(|result| match result {
Ok(response) => Ok((state, response)),
Err(err) => {
let status = match err.kind() {
io::ErrorKind::NotFound => StatusCode::NOT_FOUND,
io::ErrorKind::PermissionDenied => StatusCode::FORBIDDEN,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};
Err((state, err.into_handler_error().with_status(status)))
let response_future = File::open(path).and_then(|file| {
async move {
let meta = file.metadata().await?;
if not_modified(&meta, &headers) {
return Ok(http::Response::builder()
.status(StatusCode::NOT_MODIFIED)
.body(Body::empty())
.unwrap());
}
let len = meta.len();
let buf_size = optimal_buf_size(&meta);

let stream = file_stream(file, buf_size, len);
let body = Body::wrap_stream(stream.into_stream());
let mut response = http::Response::builder();
response.status(StatusCode::OK);
response.header(CONTENT_LENGTH, len);
response.header(CONTENT_TYPE, mime_type.as_ref());
response.header(CACHE_CONTROL, options.cache_control);

if let Some(etag) = entity_tag(&meta) {
response.header(ETAG, etag);
}
if let Some(content_encoding) = encoding {
response.header(CONTENT_ENCODING, content_encoding);
}

Ok(response.body(body).unwrap())
}
}))
});

response_future
.map(|result| match result {
Ok(response) => Ok((state, response)),
Err(err) => {
let status = match err.kind() {
io::ErrorKind::NotFound => StatusCode::NOT_FOUND,
io::ErrorKind::PermissionDenied => StatusCode::FORBIDDEN,
_ => StatusCode::INTERNAL_SERVER_ERROR,
};
Err((state, err.into_handler_error().with_status(status)))
}
})
.boxed()
}

// Checks for existence of compressed files if `FileOptions` and
Expand Down Expand Up @@ -370,23 +376,28 @@ fn file_stream(
mut f: File,
buf_size: usize,
mut len: u64,
) -> impl Stream<Item = Chunk, Error = io::Error> + Send {
let mut buf = BytesMut::new();
stream::poll_fn(move || {
) -> impl TryStream<Ok = Chunk, Error = io::Error> + Send {
let mut buf = BytesMut::with_capacity(buf_size);
stream::poll_fn(move |cx| {
if len == 0 {
return Ok(None.into());
return Poll::Ready(None);
}
if buf.remaining_mut() < buf_size {
buf.reserve(buf_size);
}
let n = try_ready!(f.read_buf(&mut buf).map_err(|err| {

let read = Pin::new(&mut f).poll_read_buf(cx, &mut buf);
let n = ready!(read).map_err(|err| {
debug!("file read error: {}", err);
err
})) as u64;
})? as u64;

if n == 0 {
debug!("file read found EOF before expected length");
return Ok(None.into());
return Poll::Ready(Some(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"file read found EOF before expected length",
))));
}

let mut chunk = buf.take().freeze();
Expand All @@ -397,7 +408,7 @@ fn file_stream(
len -= n;
}

Ok(Some(Chunk::from(chunk)).into())
Poll::Ready(Some(Ok(Chunk::from(chunk))))
})
}

Expand Down
15 changes: 9 additions & 6 deletions gotham/src/handler/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,15 @@ pub struct HandlerError {
/// # extern crate futures;
/// #
/// # use std::fs::File;
/// # use std::pin::Pin;
/// # use gotham::state::State;
/// # use gotham::handler::{IntoHandlerError, HandlerFuture};
/// # use futures::future;
/// # use futures::prelude::*;
/// #
/// # #[allow(dead_code)]
/// fn my_handler(state: State) -> Box<HandlerFuture> {
/// fn my_handler(state: State) -> Pin<Box<HandlerFuture>> {
/// match File::open("config.toml") {
/// Err(e) => Box::new(future::err((state, e.into_handler_error()))),
/// Err(e) => future::err((state, e.into_handler_error())).boxed(),
/// Ok(_) => // Create and return a response
/// # unimplemented!(),
/// }
Expand Down Expand Up @@ -95,21 +96,23 @@ impl HandlerError {
/// # extern crate hyper;
/// # extern crate futures;
/// #
/// # use futures::future;
/// # use std::pin::Pin;
/// #
/// # use futures::prelude::*;
/// # use hyper::StatusCode;
/// # use gotham::state::State;
/// # use gotham::handler::{IntoHandlerError, HandlerFuture};
/// # use gotham::test::TestServer;
/// #
/// fn handler(state: State) -> Box<HandlerFuture> {
/// fn handler(state: State) -> Pin<Box<HandlerFuture>> {
/// // It's OK if this is bogus, we just need something to convert into a `HandlerError`.
/// let io_error = std::io::Error::last_os_error();
///
/// let handler_error = io_error
/// .into_handler_error()
/// .with_status(StatusCode::IM_A_TEAPOT);
///
/// Box::new(future::err((state, handler_error)))
/// future::err((state, handler_error)).boxed()
/// }
///
/// # fn main() {
Expand Down
Loading

0 comments on commit 935e356

Please sign in to comment.