Skip to content

Commit

Permalink
feat(upgrade): Moved HTTP upgrades off Body to a new API
Browse files Browse the repository at this point in the history
BREAKING CHANGE: The method `Body::on_upgrade()` is gone. It is
  essentially replaced with `hyper::upgrade::on(msg)`.
  • Loading branch information
seanmonstar committed Nov 19, 2020
1 parent ed2b22a commit 42535ef
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 20 deletions.
4 changes: 2 additions & 2 deletions examples/upgrades.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn server_upgraded_io(mut upgraded: Upgraded) -> Result<()> {
}

/// Our server HTTP handler to initiate HTTP upgrades.
async fn server_upgrade(req: Request<Body>) -> Result<Response<Body>> {
async fn server_upgrade(mut req: Request<Body>) -> Result<Response<Body>> {
let mut res = Response::new(Body::empty());

// Send a 400 to any request that doesn't have
Expand All @@ -52,7 +52,7 @@ async fn server_upgrade(req: Request<Body>) -> Result<Response<Body>> {
// is returned below, so it's better to spawn this future instead
// waiting for it to complete to then return a response.
tokio::task::spawn(async move {
match req.into_body().on_upgrade().await {
match hyper::upgrade::on(&mut req).await {
Ok(upgraded) => {
if let Err(e) = server_upgraded_io(upgraded).await {
eprintln!("server foobar io error: {}", e)
Expand Down
16 changes: 9 additions & 7 deletions src/body/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,13 +187,15 @@ impl Body {
Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
}

/// Converts this `Body` into a `Future` of a pending HTTP upgrade.
///
/// See [the `upgrade` module](crate::upgrade) for more.
pub fn on_upgrade(self) -> OnUpgrade {
self.extra
.map(|ex| ex.on_upgrade)
.unwrap_or_else(OnUpgrade::none)
// TODO: Eventually the pending upgrade should be stored in the
// `Extensions`, and all these pieces can be removed. In v0.14, we made
// the breaking changes, so now this TODO can be done without breakage.
pub(crate) fn take_upgrade(&mut self) -> OnUpgrade {
if let Some(ref mut extra) = self.extra {
std::mem::replace(&mut extra.on_upgrade, OnUpgrade::none())
} else {
OnUpgrade::none()
}
}

fn new(kind: Kind) -> Body {
Expand Down
57 changes: 46 additions & 11 deletions src/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,16 @@ pub struct Parts<T> {
_inner: (),
}

/// Gets a pending HTTP upgrade from this message.
pub fn on<T: sealed::CanUpgrade>(msg: T) -> OnUpgrade {
msg.on_upgrade()
}

#[cfg(feature = "http1")]
pub(crate) struct Pending {
tx: oneshot::Sender<crate::Result<Upgraded>>,
}

/// Error cause returned when an upgrade was expected but canceled
/// for whatever reason.
///
/// This likely means the actual `Conn` future wasn't polled and upgraded.
#[derive(Debug)]
struct UpgradeExpected(());

#[cfg(feature = "http1")]
pub(crate) fn pending() -> (Pending, OnUpgrade) {
let (tx, rx) = oneshot::channel();
Expand Down Expand Up @@ -162,9 +160,7 @@ impl Future for OnUpgrade {
Some(ref mut rx) => Pin::new(rx).poll(cx).map(|res| match res {
Ok(Ok(upgraded)) => Ok(upgraded),
Ok(Err(err)) => Err(err),
Err(_oneshot_canceled) => {
Err(crate::Error::new_canceled().with(UpgradeExpected(())))
}
Err(_oneshot_canceled) => Err(crate::Error::new_canceled().with(UpgradeExpected)),
}),
None => Poll::Ready(Err(crate::Error::new_user_no_upgrade())),
}
Expand Down Expand Up @@ -196,9 +192,16 @@ impl Pending {

// ===== impl UpgradeExpected =====

/// Error cause returned when an upgrade was expected but canceled
/// for whatever reason.
///
/// This likely means the actual `Conn` future wasn't polled and upgraded.
#[derive(Debug)]
struct UpgradeExpected;

impl fmt::Display for UpgradeExpected {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "upgrade expected but not completed")
f.write_str("upgrade expected but not completed")
}
}

Expand Down Expand Up @@ -277,6 +280,38 @@ impl<T: AsyncRead + AsyncWrite + Unpin + 'static> Io for ForwardsWriteBuf<T> {
}
}

mod sealed {
use super::OnUpgrade;

pub trait CanUpgrade {
fn on_upgrade(self) -> OnUpgrade;
}

impl CanUpgrade for http::Request<crate::Body> {
fn on_upgrade(self) -> OnUpgrade {
self.into_body().take_upgrade()
}
}

impl CanUpgrade for &'_ mut http::Request<crate::Body> {
fn on_upgrade(self) -> OnUpgrade {
self.body_mut().take_upgrade()
}
}

impl CanUpgrade for http::Response<crate::Body> {
fn on_upgrade(self) -> OnUpgrade {
self.into_body().take_upgrade()
}
}

impl CanUpgrade for &'_ mut http::Response<crate::Body> {
fn on_upgrade(self) -> OnUpgrade {
self.body_mut().take_upgrade()
}
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 42535ef

Please sign in to comment.