From b685e5cc87cd9914aa374b5e7c41e4733e10713c Mon Sep 17 00:00:00 2001 From: Dylan McKay Date: Mon, 30 Jul 2018 13:18:18 +1200 Subject: [PATCH] Update to Hyper 0.12 This updates hyperlocal to the new Hyper 0.12 API. --- Cargo.toml | 5 +-- README.md | 66 ++++++++++++++++----------------- examples/client.rs | 32 ++++++++-------- examples/server.rs | 35 +++++++----------- src/client/mod.rs | 72 ++++++++++-------------------------- src/lib.rs | 31 ++++++++++------ src/server/mod.rs | 92 ++++++++++++++++++++-------------------------- 7 files changed, 140 insertions(+), 193 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index dc952ab..ab9a6d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,8 +12,7 @@ readme = "README.md" [dependencies] futures = "0.1" hex = "0.2" -hyper = "0.11" +hyper = "0.12" tokio-core = "0.1" tokio-io = "0.1" -tokio-service = "0.1" -tokio-uds = "0.1" \ No newline at end of file +tokio-uds = "0.2" diff --git a/README.md b/README.md index f95ea9e..e1c8516 100644 --- a/README.md +++ b/README.md @@ -33,28 +33,26 @@ note the example below uses a crate called `service_fn` which exists [here](http extern crate hyper; extern crate hyperlocal; extern crate futures; -extern crate tokio_service; -extern crate service_fn; -use hyper::{Result, Response}; -use hyper::header::{ContentType, ContentLength}; -use service_fn::service_fn; +use hyper::{header, Body, Request, Response}; +use hyper::service::service_fn; +use std::io; const PHRASE: &'static str = "It's a Unix system. I know this."; -fn run() -> Result<()> { +fn hello(_: Request) -> impl futures::Future, Error = io::Error> + Send { + futures::future::ok( + Response::builder() + .header(header::CONTENT_TYPE, "text/plain") + .header(header::CONTENT_LENGTH, PHRASE.len() as u64) + .body(PHRASE.into()) + .expect("failed to create response") + ) +} + +fn run() -> io::Result<()> { let path = "test.sock"; - let hello = || { - Ok(service_fn(|_| { - Ok( - Response::::new() - .with_header(ContentLength(PHRASE.len() as u64)) - .with_header(ContentType::plaintext()) - .with_body(PHRASE), - ) - })) - }; - let svr = hyperlocal::server::Http::new().bind(path, hello)?; + let svr = hyperlocal::server::Http::new().bind(path, || service_fn(hello))?; println!("Listening on unix://{path} with 1 thread.", path = path); svr.run()?; Ok(()) @@ -87,35 +85,33 @@ use std::io::{self, Write}; use futures::Stream; use futures::Future; -use hyper::{Client, Result}; +use hyper::{Client, rt}; use hyperlocal::{Uri, UnixConnector}; -use tokio_core::reactor::Core; - -fn run() -> Result<()> { - let mut core = Core::new()?; - let handle = core.handle(); - let client = Client::configure() - .connector(UnixConnector::new(handle)) - .build(&core.handle()); + +fn main() { + let client = Client::builder(). + build::<_, ::hyper::Body>(UnixConnector::new()); + let url = Uri::new("test.sock", "/").into(); + let work = client - .get(Uri::new("test.sock", "/").into()) + .get(url) .and_then(|res| { println!("Response: {}", res.status()); - println!("Headers: \n{}", res.headers()); + println!("Headers: {:#?}", res.headers()); - res.body().for_each(|chunk| { - io::stdout().write_all(&chunk).map_err(From::from) + res.into_body().for_each(|chunk| { + io::stdout().write_all(&chunk) + .map_err(|e| panic!("example expects stdout is open, error={}", e)) }) }) .map(|_| { println!("\n\nDone."); + }) + .map_err(|err| { + eprintln!("Error {}", err); }); - core.run(work) -} - -fn main() { - run().unwrap() + rt::run(work); } ``` diff --git a/examples/client.rs b/examples/client.rs index 2ebda9a..6c59e16 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -7,33 +7,31 @@ use std::io::{self, Write}; use futures::Stream; use futures::Future; -use hyper::{Client, Result}; +use hyper::{Client, rt}; use hyperlocal::{Uri, UnixConnector}; -use tokio_core::reactor::Core; -fn run() -> Result<()> { - let mut core = Core::new()?; - let handle = core.handle(); - let client = Client::configure() - .connector(UnixConnector::new(handle)) - .build(&core.handle()); +fn main() { + let client = Client::builder(). + build::<_, ::hyper::Body>(UnixConnector::new()); + let url = Uri::new("test.sock", "/").into(); + let work = client - .get(Uri::new("test.sock", "/").into()) + .get(url) .and_then(|res| { println!("Response: {}", res.status()); - println!("Headers: \n{}", res.headers()); + println!("Headers: {:#?}", res.headers()); - res.body().for_each(|chunk| { - io::stdout().write_all(&chunk).map_err(From::from) + res.into_body().for_each(|chunk| { + io::stdout().write_all(&chunk) + .map_err(|e| panic!("example expects stdout is open, error={}", e)) }) }) .map(|_| { println!("\n\nDone."); + }) + .map_err(|err| { + eprintln!("Error {}", err); }); - core.run(work) -} - -fn main() { - run().unwrap() + rt::run(work); } diff --git a/examples/server.rs b/examples/server.rs index 1e619a2..964db9c 100644 --- a/examples/server.rs +++ b/examples/server.rs @@ -1,35 +1,26 @@ extern crate hyper; extern crate hyperlocal; extern crate futures; -extern crate tokio_service; -use futures::future::FutureResult; -use hyper::{Result, Request, Response}; -use hyper::header::{ContentType, ContentLength}; -use tokio_service::Service; +use hyper::{header, Body, Request, Response}; +use hyper::service::service_fn; +use std::io; const PHRASE: &'static str = "It's a Unix system. I know this."; -struct Hello; - -impl Service for Hello { - type Request = Request; - type Response = Response; - type Error = hyper::Error; - type Future = FutureResult; - fn call(&self, _req: Request) -> Self::Future { - futures::future::ok( - Response::new() - .with_header(ContentLength(PHRASE.len() as u64)) - .with_header(ContentType::plaintext()) - .with_body(PHRASE), - ) - } +fn hello(_: Request) -> impl futures::Future, Error = io::Error> + Send { + futures::future::ok( + Response::builder() + .header(header::CONTENT_TYPE, "text/plain") + .header(header::CONTENT_LENGTH, PHRASE.len() as u64) + .body(PHRASE.into()) + .expect("failed to create response") + ) } -fn run() -> Result<()> { +fn run() -> io::Result<()> { let path = "test.sock"; - let svr = hyperlocal::server::Http::new().bind(path, || Ok(Hello))?; + let svr = hyperlocal::server::Http::new().bind(path, || service_fn(hello))?; println!("Listening on unix://{path} with 1 thread.", path = path); svr.run()?; Ok(()) diff --git a/src/client/mod.rs b/src/client/mod.rs index 25ca961..5b08507 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,13 +1,10 @@ //! Hyper client bindings for unix domain sockets use std::io; -use std::clone::Clone; -use futures::IntoFuture; +use futures::{Future, IntoFuture}; use futures::future::{self, FutureResult}; -use hyper::Uri as HyperUri; -use tokio_core::reactor::Handle; -use tokio_service::Service; +use hyper::client::connect::{Connect, Connected, Destination}; use tokio_uds::UnixStream; use super::Uri; @@ -26,70 +23,41 @@ const UNIX_SCHEME: &str = "unix"; /// ```no_run /// extern crate hyper; /// extern crate hyperlocal; -/// extern crate tokio_core; /// -/// let core = tokio_core::reactor::Core::new().unwrap(); -/// let client = hyper::Client::configure() -/// .connector( -/// hyperlocal::UnixConnector::new(core.handle()) -/// ) -/// .build(&core.handle()); +/// let client = hyper::Client::builder() +/// .build::<_, hyper::Body>(hyperlocal::UnixConnector::new()); /// ``` -pub struct UnixConnector(Handle); +#[derive(Clone)] +pub struct UnixConnector; impl UnixConnector { - pub fn new(handle: Handle) -> Self { - UnixConnector(handle) + pub fn new() -> Self { + UnixConnector } } -impl Service for UnixConnector { - type Request = HyperUri; - type Response = UnixStream; +impl Connect for UnixConnector { + type Transport = UnixStream; type Error = io::Error; - type Future = FutureResult; + type Future = FutureResult<(UnixStream, Connected), io::Error>; - fn call(&self, uri: HyperUri) -> Self::Future { - if uri.scheme() != Some(UNIX_SCHEME) { + fn connect(&self, destination: Destination) -> Self::Future { + if destination.scheme() != UNIX_SCHEME { return future::err(io::Error::new( io::ErrorKind::InvalidInput, - format!("Invalid uri {}", uri), + format!("Invalid uri {:?}", destination), )); } - match Uri::socket_path(&uri) { - Some(path) => UnixStream::connect(path, &self.0).into_future(), + match Uri::socket_path_dest(&destination) { + Some(ref path) => UnixStream::connect(path) + .wait() // We have to block because we + .map(|s| (s, Connected::new())) + .into_future(), _ => future::err(io::Error::new( io::ErrorKind::InvalidInput, - format!("Invalid uri {}", uri), + format!("Invalid uri {:?}", destination), )), } } } -impl Clone for UnixConnector { - fn clone(&self) -> Self { - UnixConnector(self.0.clone()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use tokio_core::reactor::Core; - - #[test] - fn connector_rejects_non_unix_uris() { - let mut core = Core::new().unwrap(); - let connector = UnixConnector::new(core.handle()); - let work = connector.call("http://google.com".parse().unwrap()); - assert!(core.run(work).is_err()) - } - - #[test] - fn connector_rejects_hand_crafted_unix_uris() { - let mut core = Core::new().unwrap(); - let connector = UnixConnector::new(core.handle()); - let work = connector.call("unix://google.com".parse().unwrap()); - assert!(core.run(work).is_err()) - } -} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index d3f1614..22de7a9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,7 +8,6 @@ extern crate hyper; extern crate tokio_uds; extern crate tokio_core; extern crate tokio_io; -extern crate tokio_service; extern crate hex; use std::borrow::Cow; @@ -32,14 +31,10 @@ pub use client::UnixConnector; /// extern crate hyper; /// extern crate hyperlocal; /// -/// let url = hyperlocal::Uri::new( +/// let url: hyper::Uri = hyperlocal::Uri::new( /// "/path/to/socket", "/urlpath?key=value" -/// ); -/// let req: hyper::Request = -/// hyper::Request::new( -/// hyper::Get, -/// url.into() -/// ); +/// ).into(); +/// let req = hyper::Request::get(url).body(()).unwrap(); /// ``` #[derive(Debug)] pub struct Uri<'a> { @@ -77,6 +72,10 @@ impl<'a> Uri<'a> { }) .next() } + + fn socket_path_dest(dest: &hyper::client::connect::Destination) -> Option { + format!("unix://{}", dest.host()).parse().ok().and_then(|uri| Self::socket_path(&uri)) + } } @@ -94,9 +93,19 @@ mod tests { #[test] fn unix_uris_resolve_socket_path() { - let unix: HyperUri = "unix://666f6f2e736f636b:0/".parse().unwrap(); - let path = Uri::socket_path(&unix).unwrap(); + let path = Uri::socket_path(&"unix://666f6f2e736f636b:0/".parse().unwrap()).unwrap(); let expected = "foo.sock"; assert_eq!(path, expected); } -} \ No newline at end of file + + #[test] + fn connector_rejects_non_unix_uris() { + assert_eq!(None, Uri::socket_path(&"http://google.com".parse().unwrap())); + } + + #[test] + fn connector_rejects_hand_crafted_unix_uris() { + assert_eq!(None, Uri::socket_path(&"unix://google.com".parse().unwrap())); + } +} + diff --git a/src/server/mod.rs b/src/server/mod.rs index a805b80..04003be 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,63 +1,62 @@ //! Hyper server bindings for unix domain sockets -use std::marker::PhantomData; +use std::io; use std::path::Path; use futures::future::Future; use futures::stream::Stream; -use hyper::{Request, Response}; -use hyper::server::Http as HyperHttp; +use hyper::server::conn::Http as HyperHttp; +use hyper::service::NewService; use tokio_core::reactor::Core; -use tokio_service::NewService; use tokio_uds::UnixListener; /// An instance of a server created through `Http::bind`. // /// This structure is used to create instances of Servers to spawn off tasks /// which handle a connection to an HTTP server. -pub struct Server +pub struct Server where - B: Stream, - B::Item: AsRef<[u8]>, + S: NewService + Send + 'static { - protocol: HyperHttp, new_service: S, core: Core, listener: UnixListener, } -impl Server +impl Server where - S: NewService, Error = ::hyper::Error> + S: NewService + Send + Sync + 'static, - B: Stream + 'static, - B::Item: AsRef<[u8]>, + S::InitError: ::std::fmt::Display, + ::Future: Send, { - pub fn run(self) -> ::hyper::Result<()> { + pub fn run(self) -> io::Result<()> { let Server { - protocol, new_service, mut core, listener, .. } = self; - let handle = core.handle(); + let server = listener .incoming() - .for_each(move |(sock, _)| { - protocol.bind_connection( - &handle, - sock, - ([127, 0, 0, 1], 0).into(), - new_service.new_service()?, - ); - Ok(()) - }) - .map_err(|_| ()); - core.run(server); - Ok(()) + .for_each(move |sock| { + new_service.new_service() + .map_err(|e| io::Error::new(io::ErrorKind::Other, format!("failed to create service: {}", e))) + .and_then(|service| { + HyperHttp::new() + .serve_connection( + sock, + service, + ).map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("failed to serve connection: {}", e)) + }) + }) + }); + + core.run(server) } } @@ -76,42 +75,29 @@ where /// //) /// /// ``` -pub struct Http { - _marker: PhantomData, -} - -impl Clone for Http { - fn clone(&self) -> Http { - Http { ..*self } - } -} +#[derive(Clone)] +pub struct Http; -impl + 'static> Http { +impl Http { /// Creates a new instance of the HTTP protocol, ready to spawn a server or /// start accepting connections. - pub fn new() -> Http { - Http { _marker: PhantomData } + pub fn new() -> Self { + Http } /// binds a new server instance to a unix domain socket path - pub fn bind(&self, path: P, new_service: S) -> ::hyper::Result> + pub fn bind(&self, path: P, new_service: S) -> ::std::io::Result> where P: AsRef, - S: NewService, Error = ::hyper::Error> - + Send - + Sync - + 'static, - Bd: Stream + 'static, + S: NewService + Send + 'static, + S::Error: Into>, + S::Service: Send, + ::Future: Send + 'static, + B: ::hyper::body::Payload, { let core = Core::new()?; - let handle = core.handle(); - let listener = UnixListener::bind(path.as_ref(), &handle)?; + let listener = UnixListener::bind(path.as_ref())?; - Ok(Server { - protocol: HyperHttp::new(), - new_service: new_service, - core: core, - listener: listener, - }) + Ok(Server { core, listener, new_service }) } }