Skip to content

Commit

Permalink
Update Axum example to use axum 0.7 and hyper 1. (#154)
Browse files Browse the repository at this point in the history
* Update Axum example to use axum 0.7 and hyper 1.

* update hyper-util dependency
  • Loading branch information
TannerRogalsky authored Jan 10, 2024
1 parent 79556aa commit b46f60e
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 50 deletions.
8 changes: 5 additions & 3 deletions examples/axum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ edition = "2021"
publish = false

[dependencies]
axum = "0.6"
hyper = { version = "0.14", features = ["full"] }
async-stream = "0.3"
axum = "0.7"
hyper = { version = "1", features = ["full"] }
hyper-util = { version = "0.1.2", features = ["full"] }
http-body-util = "0.1"
turmoil = { path = "../.." }
tracing = "0.1"
tracing-subscriber = "0.3"
tokio = "1"
tower = "0.4"
pin-project-lite = "0.2"
114 changes: 67 additions & 47 deletions examples/axum/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
use axum::extract::Path;
use axum::response::Response;
use axum::routing::get;
use axum::Router;
use axum::{body::Body, http::Request};
use hyper::server::accept::from_stream;
use hyper::{Client, Server, Uri};
use axum::{body::Body, extract::Path, http::Request, routing::get, Router};
use http_body_util::BodyExt as _;
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
use std::net::{IpAddr, Ipv4Addr};
use tower::make::Shared;
use tracing::{info_span, Instrument};
use turmoil::{net, Builder};

Expand All @@ -29,15 +24,27 @@ fn main() {
sim.host("server", move || {
let router = router.clone();
async move {
Server::builder(from_stream(async_stream::stream! {
let listener = net::TcpListener::bind(addr).await?;
loop {
yield listener.accept().await.map(|(s, _)| s);
let listener = net::TcpListener::bind(addr).await?;
loop {
let (tcp_stream, _remote_addr) = listener.accept().await?;
let tcp_stream = hyper_util::rt::TokioIo::new(tcp_stream);

let hyper_service = hyper_util::service::TowerToHyperService::new(router.clone());

let result = hyper_util::server::conn::auto::Builder::new(
hyper_util::rt::TokioExecutor::new(),
)
.serve_connection_with_upgrades(tcp_stream, hyper_service)
.await;
if result.is_err() {
// This error only appears when the client doesn't send a request and
// terminate the connection.
//
// If client sends one request then terminate connection whenever, it doesn't
// appear.
break;
}
}))
.serve(Shared::new(router))
.await
.unwrap();
}

Ok(())
}
Expand All @@ -47,15 +54,15 @@ fn main() {
sim.client(
"client",
async move {
let client = Client::builder().build(connector::connector());
let client = Client::builder(TokioExecutor::new()).build(connector::connector());

let mut request = Request::new(Body::empty());
*request.uri_mut() = Uri::from_static("http://server:9999/greet/foo");
*request.uri_mut() = hyper::Uri::from_static("http://server:9999/greet/foo");
let res = client.request(request).await?;

let (parts, body) = res.into_parts();
let body = hyper::body::to_bytes(body).await?;
let res = Response::from_parts(parts, body);
let body = body.collect().await?.to_bytes();
let res = hyper::Response::from_parts(parts, body);

tracing::info!("Got response: {:?}", res);

Expand All @@ -68,68 +75,81 @@ fn main() {
}

mod connector {
use std::{future::Future, pin::Pin};

use hyper::{
client::connect::{Connected, Connection},
Uri,
};
use tokio::io::{AsyncRead, AsyncWrite};
use hyper::Uri;
use pin_project_lite::pin_project;
use std::{future::Future, io::Error, pin::Pin};
use tokio::io::AsyncWrite;
use tower::Service;
use turmoil::net::TcpStream;

type Fut = Pin<Box<dyn Future<Output = Result<TurmoilConnection, std::io::Error>> + Send>>;
type Fut = Pin<Box<dyn Future<Output = Result<TurmoilConnection, Error>> + Send>>;

pub fn connector(
) -> impl Service<Uri, Response = TurmoilConnection, Error = std::io::Error, Future = Fut> + Clone
{
) -> impl Service<Uri, Response = TurmoilConnection, Error = Error, Future = Fut> + Clone {
tower::service_fn(|uri: Uri| {
Box::pin(async move {
let conn = TcpStream::connect(uri.authority().unwrap().as_str()).await?;
Ok::<_, std::io::Error>(TurmoilConnection(conn))
Ok::<_, Error>(TurmoilConnection { fut: conn })
}) as Fut
})
}

pub struct TurmoilConnection(turmoil::net::TcpStream);
pin_project! {
pub struct TurmoilConnection{
#[pin]
fut: turmoil::net::TcpStream
}
}

impl AsyncRead for TurmoilConnection {
impl hyper::rt::Read for TurmoilConnection {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_read(cx, buf)
mut buf: hyper::rt::ReadBufCursor<'_>,
) -> std::task::Poll<Result<(), Error>> {
let n = unsafe {
let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
let result = tokio::io::AsyncRead::poll_read(self.project().fut, cx, &mut tbuf);
match result {
std::task::Poll::Ready(Ok(())) => tbuf.filled().len(),
other => return other,
}
};

unsafe {
buf.advance(n);
}
std::task::Poll::Ready(Ok(()))
}
}

impl AsyncWrite for TurmoilConnection {
impl hyper::rt::Write for TurmoilConnection {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
Pin::new(&mut self.0).poll_write(cx, buf)
) -> std::task::Poll<Result<usize, Error>> {
Pin::new(&mut self.fut).poll_write(cx, buf)
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.0).poll_flush(cx)
) -> std::task::Poll<Result<(), Error>> {
Pin::new(&mut self.fut).poll_flush(cx)
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.0).poll_shutdown(cx)
) -> std::task::Poll<Result<(), Error>> {
Pin::new(&mut self.fut).poll_shutdown(cx)
}
}

impl Connection for TurmoilConnection {
fn connected(&self) -> hyper::client::connect::Connected {
Connected::new()
impl hyper_util::client::legacy::connect::Connection for TurmoilConnection {
fn connected(&self) -> hyper_util::client::legacy::connect::Connected {
hyper_util::client::legacy::connect::Connected::new()
}
}
}

0 comments on commit b46f60e

Please sign in to comment.