From e8694ae991f84266fbb1b9d0f27f3f288b4ee8cc Mon Sep 17 00:00:00 2001 From: tottoto Date: Sat, 24 Feb 2024 08:37:35 +0900 Subject: [PATCH] Update to hyper 1 --- Cargo.toml | 9 ++++++--- src/mock.rs | 9 ++++++--- src/request.rs | 20 ++++++++++--------- src/response.rs | 16 ++++++++++----- src/server.rs | 52 ++++++++++++++++++++++++++++++++++--------------- 5 files changed, 70 insertions(+), 36 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e132f0e..a073486 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,8 +22,11 @@ assert-json-diff = "2.0" bytes = "1" colored = { version = "2.0", optional = true } futures-core = "0.3" -http = "0.2" -hyper = { version = "0.14", features = ["http1", "http2", "server", "stream"] } +http = "1" +http-body = "1" +http-body-util = "0.1" +hyper = "1" +hyper-util = { version = "0.1", features = ["server-auto", "tokio"] } log = "0.4" rand = "0.8" regex = "1.7" @@ -36,7 +39,7 @@ tokio = { version = "1.25", features = ["net", "parking_lot", "rt", "sync"] } env_logger = "0.8" testing_logger = "0.1" futures = { version = "0.3", default-features = false, features = ["alloc", "async-await"] } -reqwest = "0.11" +reqwest = { version = "0.12", default-features = false, features = ["http2"] } tokio = { version = "1.25", features = ["macros", "rt-multi-thread"] } [features] diff --git a/src/mock.rs b/src/mock.rs index 30514ac..ca3e21f 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -5,6 +5,7 @@ use crate::server::RemoteMock; use crate::server::State; use crate::Request; use crate::{Error, ErrorKind}; +use bytes::Bytes; use http::{HeaderMap, HeaderName, StatusCode}; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; @@ -354,7 +355,7 @@ impl Mock { /// ``` /// pub fn with_body>(mut self, body: StrOrBytes) -> Self { - self.inner.response.body = Body::Bytes(body.as_ref().to_owned()); + self.inner.response.body = Body::Bytes(Bytes::from(body.as_ref().to_owned())); self } @@ -423,7 +424,8 @@ impl Mock { mut self, callback: impl Fn(&Request) -> Vec + Send + Sync + 'static, ) -> Self { - self.inner.response.body = Body::FnWithRequest(Arc::new(callback)); + self.inner.response.body = + Body::FnWithRequest(Arc::new(move |req| Bytes::from(callback(req)))); self } @@ -444,7 +446,8 @@ impl Mock { self.inner.response.body = Body::Bytes( std::fs::read(path) .map_err(|_| Error::new(ErrorKind::FileNotFound)) - .unwrap(), + .unwrap() + .into(), ); self } diff --git a/src/request.rs b/src/request.rs index 1f99601..4b1fd48 100644 --- a/src/request.rs +++ b/src/request.rs @@ -1,21 +1,20 @@ use crate::{Error, ErrorKind}; -use bytes::Buf; use http::header::{AsHeaderName, HeaderValue}; use http::Request as HttpRequest; -use hyper::body; -use hyper::Body as HyperBody; +use http_body_util::BodyExt; +use hyper::body::Incoming; /// /// Stores a HTTP request /// #[derive(Debug)] pub struct Request { - inner: HttpRequest, + inner: HttpRequest, body: Option>, } impl Request { - pub(crate) fn new(request: HttpRequest) -> Self { + pub(crate) fn new(request: HttpRequest) -> Self { Request { inner: request, body: None, @@ -63,12 +62,15 @@ impl Request { pub(crate) async fn read_body(&mut self) -> &Vec { if self.body.is_none() { let raw_body = self.inner.body_mut(); - let mut buf = body::aggregate(raw_body) + + let bytes = raw_body + .collect() .await .map_err(|err| Error::new_with_context(ErrorKind::RequestBodyFailure, err)) - .unwrap(); - let bytes = buf.copy_to_bytes(buf.remaining()).to_vec(); - self.body = Some(bytes); + .unwrap() + .to_bytes(); + + self.body = Some(bytes.to_vec()); } self.body.as_ref().unwrap() diff --git a/src/response.rs b/src/response.rs index 3ded407..d41e012 100644 --- a/src/response.rs +++ b/src/response.rs @@ -1,7 +1,9 @@ use crate::error::Error; use crate::Request; +use bytes::Bytes; use futures_core::stream::Stream; use http::{HeaderMap, StatusCode}; +use http_body::Frame; use std::fmt; use std::io; use std::sync::Arc; @@ -17,11 +19,11 @@ pub(crate) struct Response { } type BodyFnWithWriter = dyn Fn(&mut dyn io::Write) -> io::Result<()> + Send + Sync + 'static; -type BodyFnWithRequest = dyn Fn(&Request) -> Vec + Send + Sync + 'static; +type BodyFnWithRequest = dyn Fn(&Request) -> Bytes + Send + Sync + 'static; #[derive(Clone)] pub(crate) enum Body { - Bytes(Vec), + Bytes(Bytes), FnWithWriter(Arc), FnWithRequest(Arc), } @@ -60,7 +62,7 @@ impl Default for Response { Self { status: StatusCode::OK, headers, - body: Body::Bytes(Vec::new()), + body: Body::Bytes(Bytes::new()), } } } @@ -115,7 +117,7 @@ impl Drop for ChunkedStream { } impl Stream for ChunkedStream { - type Item = io::Result>; + type Item = io::Result>; fn poll_next( mut self: std::pin::Pin<&mut Self>, @@ -123,7 +125,11 @@ impl Stream for ChunkedStream { ) -> Poll> { self.receiver .as_mut() - .map(move |r| r.poll_recv(cx)) + .map(move |receiver| { + receiver.poll_recv(cx).map(|received| { + received.map(|result| result.map(|data| Frame::data(Bytes::from(data)))) + }) + }) .unwrap_or(Poll::Ready(None)) } } diff --git a/src/server.rs b/src/server.rs index 03afd89..43cf848 100644 --- a/src/server.rs +++ b/src/server.rs @@ -4,10 +4,13 @@ use crate::response::{Body as ResponseBody, ChunkedStream}; use crate::ServerGuard; use crate::{Error, ErrorKind, Matcher, Mock}; use http::{Request as HttpRequest, Response, StatusCode}; -use hyper::server::conn::Http; +use http_body_util::{BodyExt, Empty, Full, StreamBody}; +use hyper::body::Incoming; use hyper::service::service_fn; -use hyper::Body; +use hyper_util::rt::{TokioExecutor, TokioIo}; +use hyper_util::server::conn::auto::Builder as ConnectionBuilder; use std::default::Default; +use std::error::Error as StdError; use std::fmt; use std::net::{IpAddr, SocketAddr}; use std::ops::Drop; @@ -353,10 +356,10 @@ impl Server { let mutex = state.clone(); spawn_local(async move { - let _ = Http::new() + let _ = ConnectionBuilder::new(TokioExecutor::new()) .serve_connection( - stream, - service_fn(move |request: HttpRequest| { + TokioIo::new(stream), + service_fn(move |request: HttpRequest| { handle_request(request, mutex.clone()) }), ) @@ -442,10 +445,27 @@ impl fmt::Display for Server { } } +type BoxError = Box; +type BoxBody = http_body_util::combinators::UnsyncBoxBody; + +trait IntoBoxBody { + fn into_box_body(self) -> BoxBody; +} + +impl IntoBoxBody for B +where + B: http_body::Body + Send + 'static, + B::Error: Into, +{ + fn into_box_body(self) -> BoxBody { + self.map_err(Into::into).boxed_unsync() + } +} + async fn handle_request( - hyper_request: HttpRequest, + hyper_request: HttpRequest, state: Arc>, -) -> Result, Error> { +) -> Result, Error> { let mut request = Request::new(hyper_request); request.read_body().await; log::debug!("Request received: {}", request.formatted()); @@ -478,7 +498,7 @@ async fn handle_request( } } -fn respond_with_mock(request: Request, mock: &RemoteMock) -> Result, Error> { +fn respond_with_mock(request: Request, mock: &RemoteMock) -> Result, Error> { let status: StatusCode = mock.inner.response.status; let mut response = Response::builder().status(status); @@ -492,32 +512,32 @@ fn respond_with_mock(request: Request, mock: &RemoteMock) -> Result { let stream = ChunkedStream::new(Arc::clone(body_fn))?; - Body::wrap_stream(stream) + StreamBody::new(stream).into_box_body() } ResponseBody::FnWithRequest(body_fn) => { let bytes = body_fn(&request); - Body::from(bytes) + Full::new(bytes.to_owned()).into_box_body() } } } else { - Body::empty() + Empty::new().into_box_body() }; - let response: Response = response + let response: Response = response .body(body) .map_err(|err| Error::new_with_context(ErrorKind::ResponseFailure, err))?; Ok(response) } -fn respond_with_mock_not_found() -> Result, Error> { - let response: Response = Response::builder() +fn respond_with_mock_not_found() -> Result, Error> { + let response: Response = Response::builder() .status(StatusCode::NOT_IMPLEMENTED) - .body(Body::empty()) + .body(Empty::new().into_box_body()) .map_err(|err| Error::new_with_context(ErrorKind::ResponseFailure, err))?; Ok(response)