diff --git a/Cargo.toml b/Cargo.toml index c9ae0ff8..c6c62514 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,7 +89,6 @@ features = ["std", "std-future"] [dev-dependencies] env_logger = "0.8" flate2 = "1.0" -futures = "0.3" indicatif = "0.15" rayon = "1" static_assertions = "1.1" diff --git a/examples/async.rs b/examples/async.rs index 3daf55d0..c999b0c6 100644 --- a/examples/async.rs +++ b/examples/async.rs @@ -4,12 +4,12 @@ use isahc::prelude::*; fn main() -> Result<(), isahc::Error> { - futures::executor::block_on(async { + futures_lite::future::block_on(async { let mut response = isahc::get_async("http://example.org").await?; println!("Status: {}", response.status()); println!("Headers:\n{:?}", response.headers()); - println!("Body: {}", response.text_async().await?); + println!("Body: {}", response.text().await?); Ok(()) }) diff --git a/examples/stream_cancellation.rs b/examples/stream_cancellation.rs index 062303d3..a69f0ffc 100644 --- a/examples/stream_cancellation.rs +++ b/examples/stream_cancellation.rs @@ -2,7 +2,7 @@ //! a program that aborts downloading a response if it contains the byte `0x3F` //! (ASCII "?"). -use futures::{executor::block_on, io::AsyncReadExt}; +use futures_lite::{future::block_on, io::AsyncReadExt}; fn main() -> Result<(), isahc::Error> { block_on(async { diff --git a/examples/upload_file.rs b/examples/upload_file.rs new file mode 100644 index 00000000..309db195 --- /dev/null +++ b/examples/upload_file.rs @@ -0,0 +1,26 @@ +//! Sample program that demonstrates how to upload a file using a `PUT` request. +//! Since 1.0, you can use a `File` as the request body directly, or anything +//! implementing `Read`, when using the synchronous APIs. +//! +//! If using the asynchronous APIs, you will need an asynchronous version of +//! `File` such as the one provided by [`async-fs`](https://docs.rs/async-fs). +//! Isahc does not provide an implementation for you. + +use isahc::prelude::*; +use std::fs::File; + +fn main() -> Result<(), isahc::Error> { + // We're opening the source code file you are looking at right now for + // reading. + let file = File::open(file!())?; + + // Perform the upload. + let mut response = isahc::put("https://httpbin.org/put", file)?; + + // Print interesting info from the response. + println!("Status: {}", response.status()); + println!("Headers: {:#?}", response.headers()); + print!("{}", response.text()?); + + Ok(()) +} diff --git a/src/agent.rs b/src/agent.rs index 6f89fa11..64090e68 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -336,7 +336,7 @@ impl AgentContext { let handle = self.requests.remove(token); let mut handle = self.multi.remove2(handle)?; - handle.get_mut().on_result(result); + handle.get_mut().set_result(result.map_err(Error::from)); Ok(()) } diff --git a/src/body.rs b/src/body.rs deleted file mode 100644 index 49c6de6c..00000000 --- a/src/body.rs +++ /dev/null @@ -1,247 +0,0 @@ -//! Provides types for working with request and response bodies. - -use futures_lite::{future::block_on, io::{AsyncRead, AsyncReadExt}}; -use std::{ - borrow::Cow, - fmt, - io::{self, Cursor, Read}, - pin::Pin, - str, - task::{Context, Poll}, -}; - -/// Contains the body of an HTTP request or response. -/// -/// This type is used to encapsulate the underlying stream or region of memory -/// where the contents of the body are stored. A `Body` can be created from many -/// types of sources using the [`Into`](std::convert::Into) trait or one of its -/// constructor functions. -/// -/// Since the entire request life-cycle in Isahc is asynchronous, bodies must -/// also be asynchronous. You can create a body from anything that implements -/// [`AsyncRead`], which [`Body`] itself also implements. -pub struct Body(Inner); - -/// All possible body implementations. -enum Inner { - /// An empty body. - Empty, - - /// A body stored in memory. - Buffer(Cursor>), - - /// An asynchronous reader. - AsyncRead(Pin>, Option), -} - -impl Body { - /// Create a new empty body. - /// - /// An empty body represents the *absence* of a body, which is semantically - /// different than the presence of a body of zero length. - pub const fn empty() -> Self { - Self(Inner::Empty) - } - - /// Create a new body from a potentially static byte buffer. - /// - /// The body will have a known length equal to the number of bytes given. - /// - /// This will try to prevent a copy if the type passed in can be re-used, - /// otherwise the buffer will be copied first. This method guarantees to not - /// require a copy for the following types: - /// - /// - `&'static [u8]` - /// - `&'static str` - /// - /// # Examples - /// - /// ``` - /// use isahc::Body; - /// - /// // Create a body from a static string. - /// let body = Body::from_bytes_static("hello world"); - /// ``` - #[inline] - pub fn from_bytes_static(bytes: B) -> Self - where - B: AsRef<[u8]> + 'static - { - match_type! { - => Self::from_static_impl(bytes), - => Self::from_static_impl(bytes.as_bytes()), - > => Self::from(bytes), - => Self::from(bytes.into_bytes()), - bytes => Self::from(bytes.as_ref().to_vec()), - } - } - - #[inline] - fn from_static_impl(bytes: &'static [u8]) -> Self { - Self(Inner::Buffer(Cursor::new(Cow::Borrowed(bytes)))) - } - - /// Create a streaming body that reads from the given reader. - /// - /// The body will have an unknown length. When used as a request body, - /// chunked transfer encoding might be used to send the request. - pub fn from_reader(read: impl AsyncRead + Send + Sync + 'static) -> Self { - Body(Inner::AsyncRead(Box::pin(read), None)) - } - - /// Create a streaming body with a known length. - /// - /// If the size of the body is known in advance, such as with a file, then - /// this function can be used to create a body that can determine its - /// `Content-Length` while still reading the bytes asynchronously. - /// - /// Giving a value for `length` that doesn't actually match how much data - /// the reader will produce may result in errors when sending the body in a - /// request. - pub fn from_reader_sized(read: impl AsyncRead + Send + Sync + 'static, length: u64) -> Self { - Body(Inner::AsyncRead(Box::pin(read), Some(length))) - } - - /// Report if this body is empty. - pub fn is_empty(&self) -> bool { - match self.0 { - Inner::Empty => true, - _ => false, - } - } - - /// Get the size of the body, if known. - /// - /// The value reported by this method is used to set the `Content-Length` - /// for outgoing requests. - /// - /// When coming from a response, this method will report the value of the - /// `Content-Length` response header if present. If this method returns - /// `None` then there's a good chance that the server used something like - /// chunked transfer encoding to send the response body. - /// - /// Since the length may be determined totally separately from the actual - /// bytes, even if a value is returned it should not be relied on as always - /// being accurate, and should be treated as a "hint". - pub fn len(&self) -> Option { - match &self.0 { - Inner::Empty => Some(0), - Inner::Buffer(bytes) => Some(bytes.get_ref().len() as u64), - Inner::AsyncRead(_, len) => *len, - } - } - - /// If this body is repeatable, reset the body stream back to the start of - /// the content. Returns `false` if the body cannot be reset. - pub fn reset(&mut self) -> bool { - match &mut self.0 { - Inner::Empty => true, - Inner::Buffer(cursor) => { - cursor.set_position(0); - true - } - Inner::AsyncRead(_, _) => false, - } - } -} - -impl Read for Body { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - match &mut self.0 { - Inner::Empty => Ok(0), - Inner::Buffer(cursor) => cursor.read(buf), - Inner::AsyncRead(reader, _) => block_on(reader.read(buf)), - } - } -} - -impl AsyncRead for Body { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - match &mut self.0 { - Inner::Empty => Poll::Ready(Ok(0)), - Inner::Buffer(cursor) => Poll::Ready(cursor.read(buf)), - Inner::AsyncRead(read, _) => AsyncRead::poll_read(read.as_mut(), cx, buf), - } - } -} - -impl Default for Body { - fn default() -> Self { - Self::empty() - } -} - -impl From<()> for Body { - fn from(_: ()) -> Self { - Self::empty() - } -} - -impl From> for Body { - fn from(body: Vec) -> Self { - Self(Inner::Buffer(Cursor::new(Cow::Owned(body)))) - } -} - -impl From<&'_ [u8]> for Body { - fn from(body: &[u8]) -> Self { - body.to_vec().into() - } -} - -impl From for Body { - fn from(body: String) -> Self { - body.into_bytes().into() - } -} - -impl From<&'_ str> for Body { - fn from(body: &str) -> Self { - body.as_bytes().into() - } -} - -impl> From> for Body { - fn from(body: Option) -> Self { - match body { - Some(body) => body.into(), - None => Self::empty(), - } - } -} - -impl fmt::Debug for Body { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self.len() { - Some(len) => write!(f, "Body({})", len), - None => write!(f, "Body(?)"), - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - static_assertions::assert_impl_all!(Body: Send, Sync); - - #[test] - fn empty_body() { - let body = Body::empty(); - - assert!(body.is_empty()); - assert_eq!(body.len(), Some(0)); - } - - #[test] - fn zero_length_body() { - let body = Body::from(vec![]); - - assert!(!body.is_empty()); - assert_eq!(body.len(), Some(0)); - } -} diff --git a/src/body/mod.rs b/src/body/mod.rs new file mode 100644 index 00000000..09a95e74 --- /dev/null +++ b/src/body/mod.rs @@ -0,0 +1,349 @@ +//! Provides types for working with request and response bodies. + +use futures_lite::io::{AsyncRead, BlockOn}; +use std::{ + borrow::Cow, + fmt, + io::{self, Cursor, Read}, + pin::Pin, + str, + task::{Context, Poll}, +}; + +mod sync; + +#[allow(unreachable_pub)] +pub use sync::Body; + +/// Contains the body of an asynchronous HTTP request or response. +/// +/// This type is used to encapsulate the underlying stream or region of memory +/// where the contents of the body are stored. An [`AsyncBody`] can be created +/// from many types of sources using the [`Into`](std::convert::Into) trait or +/// one of its constructor functions. +/// +/// For asynchronous requests, you must use an asynchronous body, because the +/// entire request lifecycle is also asynchronous. You can create a body from +/// anything that implements [`AsyncRead`], which [`AsyncBody`] itself also +/// implements. +/// +/// For synchronous requests, use [`Body`] instead. +pub struct AsyncBody(Inner); + +/// All possible body implementations. +enum Inner { + /// An empty body. + Empty, + + /// A body stored in memory. + Buffer(Cursor>), + + /// An asynchronous reader. + Reader(Pin>, Option), +} + +impl AsyncBody { + /// Create a new empty body. + /// + /// An empty body represents the *absence* of a body, which is semantically + /// different than the presence of a body of zero length. + pub const fn empty() -> Self { + Self(Inner::Empty) + } + + /// Create a new body from a potentially static byte buffer. + /// + /// The body will have a known length equal to the number of bytes given. + /// + /// This will try to prevent a copy if the type passed in can be re-used, + /// otherwise the buffer will be copied first. This method guarantees to not + /// require a copy for the following types: + /// + /// - `&'static [u8]` + /// - `&'static str` + /// + /// # Examples + /// + /// ``` + /// use isahc::Body; + /// + /// // Create a body from a static string. + /// let body = Body::from_bytes_static("hello world"); + /// ``` + #[inline] + pub fn from_bytes_static(bytes: B) -> Self + where + B: AsRef<[u8]> + 'static, + { + match_type! { + >> => Self(Inner::Buffer(bytes)), + => Self::from_static_impl(bytes), + => Self::from_static_impl(bytes.as_bytes()), + > => Self::from(bytes), + => Self::from(bytes.into_bytes()), + bytes => Self::from(bytes.as_ref().to_vec()), + } + } + + #[inline] + fn from_static_impl(bytes: &'static [u8]) -> Self { + Self(Inner::Buffer(Cursor::new(Cow::Borrowed(bytes)))) + } + + /// Create a streaming body that reads from the given reader. + /// + /// The body will have an unknown length. When used as a request body, + /// [chunked transfer + /// encoding](https://tools.ietf.org/html/rfc7230#section-4.1) might be used + /// to send the request. + pub fn from_reader(read: R) -> Self + where + R: AsyncRead + Send + Sync + 'static, + { + Self(Inner::Reader(Box::pin(read), None)) + } + + /// Create a streaming body with a known length. + /// + /// If the size of the body is known in advance, such as with a file, then + /// this function can be used to create a body that can determine its + /// `Content-Length` while still reading the bytes asynchronously. + /// + /// Giving a value for `length` that doesn't actually match how much data + /// the reader will produce may result in errors when sending the body in a + /// request. + pub fn from_reader_sized(read: R, length: u64) -> Self + where + R: AsyncRead + Send + Sync + 'static, + { + Self(Inner::Reader(Box::pin(read), Some(length))) + } + + /// Report if this body is empty. + /// + /// This is not necessarily the same as checking for `self.len() == + /// Some(0)`. Since HTTP message bodies are optional, there is a semantic + /// difference between the absence of a body and the presence of a + /// zero-length body. This method will only return `true` for the former. + pub fn is_empty(&self) -> bool { + match self.0 { + Inner::Empty => true, + _ => false, + } + } + + /// Get the size of the body, if known. + /// + /// The value reported by this method is used to set the `Content-Length` + /// for outgoing requests. + /// + /// When coming from a response, this method will report the value of the + /// `Content-Length` response header if present. If this method returns + /// `None` then there's a good chance that the server used something like + /// chunked transfer encoding to send the response body. + /// + /// Since the length may be determined totally separately from the actual + /// bytes, even if a value is returned it should not be relied on as always + /// being accurate, and should be treated as a "hint". + pub fn len(&self) -> Option { + match &self.0 { + Inner::Empty => Some(0), + Inner::Buffer(bytes) => Some(bytes.get_ref().len() as u64), + Inner::Reader(_, len) => *len, + } + } + + /// If this body is repeatable, reset the body stream back to the start of + /// the content. Returns `false` if the body cannot be reset. + pub fn reset(&mut self) -> bool { + match &mut self.0 { + Inner::Empty => true, + Inner::Buffer(cursor) => { + cursor.set_position(0); + true + } + Inner::Reader(_, _) => false, + } + } + + /// Turn this asynchronous body into a synchronous one. This is how the + /// response body is implemented for the synchronous API. + /// + /// We do not expose this publicly because while we know that this + /// implementation works for the bodies _we_ create, it may not work + /// generally if the underlying reader only supports blocking under a + /// specific runtime. + pub(crate) fn into_sync(self) -> sync::Body { + match self.0 { + Inner::Empty => sync::Body::empty(), + Inner::Buffer(cursor) => sync::Body::from_bytes_static(cursor.into_inner()), + Inner::Reader(reader, Some(len)) => { + sync::Body::from_reader_sized(BlockOn::new(reader), len) + } + Inner::Reader(reader, None) => sync::Body::from_reader(BlockOn::new(reader)), + } + } +} + +impl AsyncRead for AsyncBody { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + match &mut self.0 { + Inner::Empty => Poll::Ready(Ok(0)), + Inner::Buffer(cursor) => Poll::Ready(cursor.read(buf)), + Inner::Reader(read, _) => AsyncRead::poll_read(read.as_mut(), cx, buf), + } + } +} + +impl Default for AsyncBody { + fn default() -> Self { + Self::empty() + } +} + +impl From<()> for AsyncBody { + fn from(_: ()) -> Self { + Self::empty() + } +} + +impl From> for AsyncBody { + fn from(body: Vec) -> Self { + Self(Inner::Buffer(Cursor::new(Cow::Owned(body)))) + } +} + +impl From<&'_ [u8]> for AsyncBody { + fn from(body: &[u8]) -> Self { + body.to_vec().into() + } +} + +impl From for AsyncBody { + fn from(body: String) -> Self { + body.into_bytes().into() + } +} + +impl From<&'_ str> for AsyncBody { + fn from(body: &str) -> Self { + body.as_bytes().into() + } +} + +impl> From> for AsyncBody { + fn from(body: Option) -> Self { + match body { + Some(body) => body.into(), + None => Self::empty(), + } + } +} + +impl fmt::Debug for AsyncBody { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.len() { + Some(len) => write!(f, "AsyncBody({})", len), + None => write!(f, "AsyncBody(?)"), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures_lite::{ + future::{block_on, zip}, + io::AsyncReadExt, + }; + + static_assertions::assert_impl_all!(AsyncBody: Send, Sync); + + #[test] + fn empty_body() { + let body = AsyncBody::empty(); + + assert!(body.is_empty()); + assert_eq!(body.len(), Some(0)); + } + + #[test] + fn zero_length_body() { + let body = AsyncBody::from(vec![]); + + assert!(!body.is_empty()); + assert_eq!(body.len(), Some(0)); + } + + #[test] + fn reader_with_unknown_length() { + let body = AsyncBody::from_reader(futures_lite::io::empty()); + + assert!(!body.is_empty()); + assert_eq!(body.len(), None); + } + + #[test] + fn reader_with_known_length() { + let body = AsyncBody::from_reader_sized(futures_lite::io::empty(), 0); + + assert!(!body.is_empty()); + assert_eq!(body.len(), Some(0)); + } + + #[test] + fn reset_memory_body() { + block_on(async { + let mut body = AsyncBody::from("hello world"); + let mut buf = String::new(); + + assert_eq!(body.read_to_string(&mut buf).await.unwrap(), 11); + assert_eq!(buf, "hello world"); + assert!(body.reset()); + buf.clear(); // read_to_string panics if the destination isn't empty + assert_eq!(body.read_to_string(&mut buf).await.unwrap(), 11); + assert_eq!(buf, "hello world"); + }); + } + + #[test] + fn cannot_reset_reader() { + let mut body = AsyncBody::from_reader(futures_lite::io::empty()); + + assert_eq!(body.reset(), false); + } + + #[test] + fn sync_memory_into_async() { + let (body, writer) = Body::from("hello world").into_async(); + + assert!(writer.is_none()); + assert_eq!(body.len(), Some(11)); + } + + #[test] + fn sync_reader_into_async() { + block_on(async { + let (mut body, writer) = Body::from_reader("hello world".as_bytes()).into_async(); + + assert!(writer.is_some()); + + // Write from the writer concurrently as we read from the body. + zip( + async move { + writer.unwrap().write().await.unwrap(); + }, + async move { + let mut buf = String::new(); + body.read_to_string(&mut buf).await.unwrap(); + assert_eq!(buf, "hello world"); + }, + ) + .await; + }); + } +} diff --git a/src/body/sync.rs b/src/body/sync.rs new file mode 100644 index 00000000..d7efce6f --- /dev/null +++ b/src/body/sync.rs @@ -0,0 +1,336 @@ +use super::AsyncBody; +use futures_lite::{future::yield_now, io::AsyncWriteExt}; +use sluice::pipe::{pipe, PipeWriter}; +use std::{ + borrow::Cow, + fmt, + fs::File, + io::{Cursor, ErrorKind, Read, Result}, +}; + +/// Contains the body of a synchronous HTTP request or response. +/// +/// This type is used to encapsulate the underlying stream or region of memory +/// where the contents of the body are stored. A [`Body`] can be created from +/// many types of sources using the [`Into`](std::convert::Into) trait or one of +/// its constructor functions. It can also be created from anything that +/// implements [`Read`], which [`Body`] itself also implements. +/// +/// For asynchronous requests, use [`AsyncBody`] instead. +pub struct Body(Inner); + +enum Inner { + Empty, + Buffer(Cursor>), + Reader(Box, Option), +} + +impl Body { + /// Create a new empty body. + /// + /// An empty body represents the *absence* of a body, which is semantically + /// different than the presence of a body of zero length. + pub const fn empty() -> Self { + Self(Inner::Empty) + } + + /// Create a new body from a potentially static byte buffer. + /// + /// The body will have a known length equal to the number of bytes given. + /// + /// This will try to prevent a copy if the type passed in can be re-used, + /// otherwise the buffer will be copied first. This method guarantees to not + /// require a copy for the following types: + /// + /// - `&'static [u8]` + /// - `&'static str` + /// + /// # Examples + /// + /// ``` + /// use isahc::Body; + /// + /// // Create a body from a static string. + /// let body = Body::from_bytes_static("hello world"); + /// ``` + #[inline] + pub fn from_bytes_static(bytes: B) -> Self + where + B: AsRef<[u8]> + 'static + { + match_type! { + >> => Self(Inner::Buffer(bytes)), + > => Self::from(bytes), + => Self::from(bytes.into_bytes()), + bytes => Self::from(bytes.as_ref().to_vec()), + } + } + + /// Create a streaming body that reads from the given reader. + /// + /// The body will have an unknown length. When used as a request body, + /// [chunked transfer + /// encoding](https://tools.ietf.org/html/rfc7230#section-4.1) might be used + /// to send the request. + pub fn from_reader(reader: R) -> Self + where + R: Read + Send + Sync + 'static, + { + Self(Inner::Reader(Box::new(reader), None)) + } + + /// Create a streaming body with a known length. + /// + /// If the size of the body is known in advance, such as with a file, then + /// this function can be used to create a body that can determine its + /// `Content-Length` while still reading the bytes asynchronously. + /// + /// Giving a value for `length` that doesn't actually match how much data + /// the reader will produce may result in errors when sending the body in a + /// request. + pub fn from_reader_sized(reader: R, length: u64) -> Self + where + R: Read + Send + Sync + 'static, + { + Self(Inner::Reader(Box::new(reader), Some(length))) + } + + /// Report if this body is empty. + /// + /// This is not necessarily the same as checking for `self.len() == + /// Some(0)`. Since HTTP message bodies are optional, there is a semantic + /// difference between the absence of a body and the presence of a + /// zero-length body. This method will only return `true` for the former. + pub fn is_empty(&self) -> bool { + match self.0 { + Inner::Empty => true, + _ => false, + } + } + + /// Get the size of the body, if known. + /// + /// The value reported by this method is used to set the `Content-Length` + /// for outgoing requests. + /// + /// When coming from a response, this method will report the value of the + /// `Content-Length` response header if present. If this method returns + /// `None` then there's a good chance that the server used something like + /// chunked transfer encoding to send the response body. + /// + /// Since the length may be determined totally separately from the actual + /// bytes, even if a value is returned it should not be relied on as always + /// being accurate, and should be treated as a "hint". + pub fn len(&self) -> Option { + match &self.0 { + Inner::Empty => Some(0), + Inner::Buffer(bytes) => Some(bytes.get_ref().len() as u64), + Inner::Reader(_, len) => *len, + } + } + + /// If this body is repeatable, reset the body stream back to the start of + /// the content. Returns `false` if the body cannot be reset. + pub fn reset(&mut self) -> bool { + match &mut self.0 { + Inner::Empty => true, + Inner::Buffer(cursor) => { + cursor.set_position(0); + true + } + _ => false, + } + } + + /// Convert this body into an asynchronous one. + /// + /// Turning a synchronous operation into an asynchronous one can be quite + /// the challenge, so this method is used internally only for limited + /// scenarios in which this can work. If this body is an in-memory buffer, + /// then the translation is trivial. + /// + /// If this body was created from an underlying synchronous reader, then we + /// create a temporary asynchronous pipe and return a [`Writer`] which will + /// copy the bytes from the reader to the writing half of the pipe in a + /// blocking fashion. + pub(crate) fn into_async(self) -> (AsyncBody, Option) { + match self.0 { + Inner::Empty => (AsyncBody::empty(), None), + Inner::Buffer(cursor) => (AsyncBody::from_bytes_static(cursor.into_inner()), None), + Inner::Reader(reader, len) => { + let (pipe_reader, writer) = pipe(); + + ( + if let Some(len) = len { + AsyncBody::from_reader_sized(pipe_reader, len) + } else { + AsyncBody::from_reader(pipe_reader) + }, + Some(Writer { reader, writer }), + ) + } + } + } +} + +impl Read for Body { + fn read(&mut self, buf: &mut [u8]) -> Result { + match &mut self.0 { + Inner::Empty => Ok(0), + Inner::Buffer(cursor) => cursor.read(buf), + Inner::Reader(reader, _) => reader.read(buf), + } + } +} + +impl Default for Body { + fn default() -> Self { + Self::empty() + } +} + +impl From<()> for Body { + fn from(_: ()) -> Self { + Self::empty() + } +} + +impl From> for Body { + fn from(body: Vec) -> Self { + Self(Inner::Buffer(Cursor::new(Cow::Owned(body)))) + } +} + +impl From<&'_ [u8]> for Body { + fn from(body: &[u8]) -> Self { + body.to_vec().into() + } +} + +impl From for Body { + fn from(body: String) -> Self { + body.into_bytes().into() + } +} + +impl From<&'_ str> for Body { + fn from(body: &str) -> Self { + body.as_bytes().into() + } +} + +impl From for Body { + fn from(file: File) -> Self { + if let Ok(metadata) = file.metadata() { + Self::from_reader_sized(file, metadata.len()) + } else { + Self::from_reader(file) + } + } +} + +impl fmt::Debug for Body { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.len() { + Some(len) => write!(f, "Body({})", len), + None => write!(f, "Body(?)"), + } + } +} + +/// Helper struct for writing a synchronous reader into an asynchronous pipe. +pub(crate) struct Writer { + reader: Box, + writer: PipeWriter, +} + +impl Writer { + /// The size of the temporary buffer to use for writing. Larger buffers can + /// improve performance, but at the cost of more memory. + /// + /// Curl's internal buffer size just happens to default to 16 KiB as well, + /// so this is a natural choice. + const BUF_SIZE: usize = 16384; + + /// Write the response body from the synchronous reader. + /// + /// While this function is async, it isn't a well-behaved one as it blocks + /// frequently while reading from the request body reader. As long as this + /// method is invoked in a controlled environment within a thread dedicated + /// to blocking operations, this is OK. + pub(crate) async fn write(&mut self) -> Result<()> { + let mut buf = [0; Self::BUF_SIZE]; + + loop { + let len = match self.reader.read(&mut buf) { + Ok(0) => return Ok(()), + Ok(len) => len, + Err(e) if e.kind() == ErrorKind::Interrupted => { + yield_now().await; + continue; + } + Err(e) => return Err(e), + }; + + self.writer.write_all(&buf[..len]).await?; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + static_assertions::assert_impl_all!(Body: Send, Sync); + + #[test] + fn empty_body() { + let body = Body::empty(); + + assert!(body.is_empty()); + assert_eq!(body.len(), Some(0)); + } + + #[test] + fn zero_length_body() { + let body = Body::from(vec![]); + + assert!(!body.is_empty()); + assert_eq!(body.len(), Some(0)); + } + + #[test] + fn reader_with_unknown_length() { + let body = Body::from_reader(std::io::empty()); + + assert!(!body.is_empty()); + assert_eq!(body.len(), None); + } + + #[test] + fn reader_with_known_length() { + let body = Body::from_reader_sized(std::io::empty(), 0); + + assert!(!body.is_empty()); + assert_eq!(body.len(), Some(0)); + } + + #[test] + fn reset_memory_body() { + let mut body = Body::from("hello world"); + let mut buf = String::new(); + + assert_eq!(body.read_to_string(&mut buf).unwrap(), 11); + assert_eq!(buf, "hello world"); + assert!(body.reset()); + assert_eq!(body.read_to_string(&mut buf).unwrap(), 11); + assert_eq!(buf, "hello worldhello world"); + } + + #[test] + fn cannot_reset_reader() { + let mut body = Body::from_reader(std::io::empty()); + + assert_eq!(body.reset(), false); + } +} diff --git a/src/client.rs b/src/client.rs index 312685e9..3b3bbd42 100644 --- a/src/client.rs +++ b/src/client.rs @@ -3,16 +3,21 @@ use crate::{ agent::{self, AgentBuilder}, auth::{Authentication, Credentials}, - body::Body, + body::{AsyncBody, Body}, config::internal::{ConfigurableBase, SetOpt}, config::*, default_headers::DefaultHeadersInterceptor, error::{Error, ErrorKind}, handler::{RequestHandler, ResponseBodyReader}, - headers, + headers::HasHeaders, interceptor::{self, Interceptor, InterceptorObj}, + parsing::header_to_curl_string, +}; +use futures_lite::{ + future::{block_on, try_zip}, + io::AsyncRead, + pin, }; -use futures_lite::{future::block_on, io::AsyncRead, pin}; use http::{ header::{HeaderMap, HeaderName, HeaderValue}, Request, Response, @@ -30,11 +35,13 @@ use std::{ }; use tracing_futures::Instrument; -static USER_AGENT: Lazy = Lazy::new(|| format!( - "curl/{} isahc/{}", - curl::Version::get().version(), - env!("CARGO_PKG_VERSION") -)); +static USER_AGENT: Lazy = Lazy::new(|| { + format!( + "curl/{} isahc/{}", + curl::Version::get().version(), + env!("CARGO_PKG_VERSION") + ) +}); /// An HTTP client builder, capable of creating custom [`HttpClient`] instances /// with customized behavior. @@ -451,14 +458,20 @@ impl HttpClientBuilder { #[cfg(not(feature = "cookies"))] let inner = Inner { - agent: self.agent_builder.spawn().map_err(|e| Error::new(ErrorKind::ClientInitialization, e))?, + agent: self + .agent_builder + .spawn() + .map_err(|e| Error::new(ErrorKind::ClientInitialization, e))?, defaults: self.defaults, interceptors: self.interceptors, }; #[cfg(feature = "cookies")] let inner = Inner { - agent: self.agent_builder.spawn().map_err(|e| Error::new(ErrorKind::ClientInitialization, e))?, + agent: self + .agent_builder + .spawn() + .map_err(|e| Error::new(ErrorKind::ClientInitialization, e))?, defaults: self.defaults, interceptors: self.interceptors, cookie_jar: self.cookie_jar, @@ -614,8 +627,8 @@ impl HttpClient { /// TODO: Stabilize. #[tracing::instrument(level = "debug")] pub(crate) fn shared() -> &'static Self { - static SHARED: Lazy = Lazy::new(|| HttpClient::new() - .expect("shared client failed to initialize")); + static SHARED: Lazy = + Lazy::new(|| HttpClient::new().expect("shared client failed to initialize")); &SHARED } @@ -657,7 +670,10 @@ impl HttpClient { http::Uri: TryFrom, >::Error: Into, { - block_on(self.get_async(uri)) + match http::Request::get(uri).body(()) { + Ok(request) => self.send(request), + Err(e) => Err(Error::from_any(e)), + } } /// Send a GET request to the given URI asynchronously. @@ -669,7 +685,10 @@ impl HttpClient { http::Uri: TryFrom, >::Error: Into, { - self.send_builder_async(http::Request::get(uri), Body::empty()) + match http::Request::get(uri).body(()) { + Ok(request) => self.send_async(request), + Err(e) => ResponseFuture::error(Error::from_any(e)), + } } /// Send a HEAD request to the given URI. @@ -692,7 +711,10 @@ impl HttpClient { http::Uri: TryFrom, >::Error: Into, { - block_on(self.head_async(uri)) + match http::Request::head(uri).body(()) { + Ok(request) => self.send(request), + Err(e) => Err(Error::from_any(e)), + } } /// Send a HEAD request to the given URI asynchronously. @@ -704,7 +726,10 @@ impl HttpClient { http::Uri: TryFrom, >::Error: Into, { - self.send_builder_async(http::Request::head(uri), Body::empty()) + match http::Request::head(uri).body(()) { + Ok(request) => self.send_async(request), + Err(e) => ResponseFuture::error(Error::from_any(e)), + } } /// Send a POST request to the given URI with a given request body. @@ -725,12 +750,16 @@ impl HttpClient { /// }"#)?; /// # Ok::<(), isahc::Error>(()) #[inline] - pub fn post(&self, uri: U, body: impl Into) -> Result, Error> + pub fn post(&self, uri: U, body: B) -> Result, Error> where http::Uri: TryFrom, >::Error: Into, + B: Into, { - block_on(self.post_async(uri, body)) + match http::Request::post(uri).body(body) { + Ok(request) => self.send(request), + Err(e) => Err(Error::from_any(e)), + } } /// Send a POST request to the given URI asynchronously with a given request @@ -738,12 +767,16 @@ impl HttpClient { /// /// To customize the request further, see [`HttpClient::send_async`]. To /// execute the request synchronously, see [`HttpClient::post`]. - pub fn post_async(&self, uri: U, body: impl Into) -> ResponseFuture<'_> + pub fn post_async(&self, uri: U, body: B) -> ResponseFuture<'_> where http::Uri: TryFrom, >::Error: Into, + B: Into, { - self.send_builder_async(http::Request::post(uri), body.into()) + match http::Request::post(uri).body(body) { + Ok(request) => self.send_async(request), + Err(e) => ResponseFuture::error(Error::from_any(e)), + } } /// Send a PUT request to the given URI with a given request body. @@ -765,12 +798,16 @@ impl HttpClient { /// # Ok::<(), isahc::Error>(()) /// ``` #[inline] - pub fn put(&self, uri: U, body: impl Into) -> Result, Error> + pub fn put(&self, uri: U, body: B) -> Result, Error> where http::Uri: TryFrom, >::Error: Into, + B: Into, { - block_on(self.put_async(uri, body)) + match http::Request::put(uri).body(body) { + Ok(request) => self.send(request), + Err(e) => Err(Error::from_any(e)), + } } /// Send a PUT request to the given URI asynchronously with a given request @@ -778,12 +815,16 @@ impl HttpClient { /// /// To customize the request further, see [`HttpClient::send_async`]. To /// execute the request synchronously, see [`HttpClient::put`]. - pub fn put_async(&self, uri: U, body: impl Into) -> ResponseFuture<'_> + pub fn put_async(&self, uri: U, body: B) -> ResponseFuture<'_> where http::Uri: TryFrom, >::Error: Into, + B: Into, { - self.send_builder_async(http::Request::put(uri), body.into()) + match http::Request::put(uri).body(body) { + Ok(request) => self.send_async(request), + Err(e) => ResponseFuture::error(Error::from_any(e)), + } } /// Send a DELETE request to the given URI. @@ -796,7 +837,10 @@ impl HttpClient { http::Uri: TryFrom, >::Error: Into, { - block_on(self.delete_async(uri)) + match http::Request::delete(uri).body(()) { + Ok(request) => self.send(request), + Err(e) => Err(Error::from_any(e)), + } } /// Send a DELETE request to the given URI asynchronously. @@ -808,7 +852,10 @@ impl HttpClient { http::Uri: TryFrom, >::Error: Into, { - self.send_builder_async(http::Request::delete(uri), Body::empty()) + match http::Request::delete(uri).body(()) { + Ok(request) => self.send_async(request), + Err(e) => ResponseFuture::error(Error::from_any(e)), + } } /// Send an HTTP request and return the HTTP response. @@ -859,10 +906,42 @@ impl HttpClient { /// assert!(response.status().is_success()); /// # Ok::<(), isahc::Error>(()) /// ``` - #[inline] #[tracing::instrument(level = "debug", skip(self, request), err)] - pub fn send>(&self, request: Request) -> Result, Error> { - block_on(self.send_async(request)) + pub fn send(&self, request: Request) -> Result, Error> + where + B: Into, + { + let mut writer_maybe = None; + + let request = request.map(|body| { + let (async_body, writer) = body.into().into_async(); + writer_maybe = writer; + async_body + }); + + let response = block_on(async move { + // Instead of simply blocking the current thread until the response + // is received, we can use the current thread to read from the + // request body synchronously while concurrently waiting for the + // response. + if let Some(mut writer) = writer_maybe { + // Note that the `send_async` future is given first; this + // ensures that it is polled first and thus the request is + // initiated before we attempt to write the request body. + let (response, _) = try_zip( + self.send_async_inner(request), + async move { + writer.write().await.map_err(Error::from) + }, + ).await?; + + Ok(response) + } else { + self.send_async_inner(request).await + } + })?; + + Ok(response.map(|body| body.into_sync())) } /// Send an HTTP request and return the HTTP response asynchronously. @@ -889,24 +968,15 @@ impl HttpClient { /// # Ok(()) } /// ``` #[inline] - pub fn send_async>(&self, request: Request) -> ResponseFuture<'_> { + pub fn send_async(&self, request: Request) -> ResponseFuture<'_> + where + B: Into, + { ResponseFuture::new(self.send_async_inner(request.map(Into::into))) } - #[inline] - fn send_builder_async( - &self, - builder: http::request::Builder, - body: Body, - ) -> ResponseFuture<'_> { - ResponseFuture::new(async move { - self.send_async_inner(builder.body(body).map_err(Error::from_any)?) - .await - }) - } - /// Actually send the request. All the public methods go through here. - async fn send_async_inner(&self, mut request: Request) -> Result, Error> { + async fn send_async_inner(&self, mut request: Request) -> Result, Error> { let span = tracing::debug_span!( "send_async", method = ?request.method(), @@ -925,14 +995,12 @@ impl HttpClient { interceptors: &self.inner.interceptors, }; - ctx.send(request) - .instrument(span) - .await + ctx.send(request).instrument(span).await } fn create_easy_handle( &self, - mut request: Request, + mut request: Request, ) -> Result< ( curl::easy::Easy2, @@ -1070,7 +1138,7 @@ impl HttpClient { .unwrap_or(false); for (name, value) in request.headers().iter() { - headers.append(&headers::to_curl_string(name, value, title_case))?; + headers.append(&header_to_curl_string(name, value, title_case))?; } easy.http_headers(headers)?; @@ -1080,49 +1148,46 @@ impl HttpClient { } impl crate::interceptor::Invoke for &HttpClient { - fn invoke<'a>(&'a self, mut request: Request) -> crate::interceptor::InterceptorFuture<'a, Error> { - Box::pin( - async move { - // Set default user agent if not specified. - request - .headers_mut() - .entry(http::header::USER_AGENT) - .or_insert(USER_AGENT.parse().unwrap()); - - // Create and configure a curl easy handle to fulfil the request. - let (easy, future) = self.create_easy_handle(request)?; - - // Send the request to the agent to be executed. - self.inner.agent.submit_request(easy)?; - - // Await for the response headers. - let response = future.await?; - - // If a Content-Length header is present, include that information in - // the body as well. - let content_length = response - .headers() - .get(http::header::CONTENT_LENGTH) - .and_then(|v| v.to_str().ok()) - .and_then(|v| v.parse().ok()); - - // Convert the reader into an opaque Body. - Ok(response.map(|reader| { - let body = ResponseBody { - inner: reader, - // Extend the lifetime of the agent by including a reference - // to its handle in the response body. - _client: (*self).clone(), - }; - - if let Some(len) = content_length { - Body::from_reader_sized(body, len) - } else { - Body::from_reader(body) - } - })) - } - ) + fn invoke<'a>( + &'a self, + mut request: Request, + ) -> crate::interceptor::InterceptorFuture<'a, Error> { + Box::pin(async move { + // Set default user agent if not specified. + request + .headers_mut() + .entry(http::header::USER_AGENT) + .or_insert(USER_AGENT.parse().unwrap()); + + // Create and configure a curl easy handle to fulfil the request. + let (easy, future) = self.create_easy_handle(request)?; + + // Send the request to the agent to be executed. + self.inner.agent.submit_request(easy)?; + + // Await for the response headers. + let response = future.await?; + + // If a Content-Length header is present, include that information in + // the body as well. + let content_length = response.content_length(); + + // Convert the reader into an opaque Body. + Ok(response.map(|reader| { + let body = ResponseBody { + inner: reader, + // Extend the lifetime of the agent by including a reference + // to its handle in the response body. + _client: (*self).clone(), + }; + + if let Some(len) = content_length { + AsyncBody::from_reader_sized(body, len) + } else { + AsyncBody::from_reader(body) + } + })) + }) } } @@ -1133,16 +1198,28 @@ impl fmt::Debug for HttpClient { } /// A future for a request being executed. -pub struct ResponseFuture<'c>(Pin, Error>> + 'c + Send>>); +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct ResponseFuture<'c>( + Pin::Output> + 'c + Send>>, +); impl<'c> ResponseFuture<'c> { - fn new(future: impl Future, Error>> + Send + 'c) -> Self { + fn new(future: F) -> Self + where + F: Future::Output> + Send + 'c, + { ResponseFuture(Box::pin(future)) } + + fn error(error: Error) -> Self { + Self::new(async move { + Err(error) + }) + } } impl Future for ResponseFuture<'_> { - type Output = Result, Error>; + type Output = Result, Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.0.as_mut().poll(cx) diff --git a/src/cookies/interceptor.rs b/src/cookies/interceptor.rs index beb9f224..b793b04e 100644 --- a/src/cookies/interceptor.rs +++ b/src/cookies/interceptor.rs @@ -3,10 +3,10 @@ use super::{Cookie, CookieJar}; use crate::{ + body::AsyncBody, + error::Error, interceptor::{Context, Interceptor, InterceptorFuture}, response::ResponseExt, - Body, - Error, }; use http::Request; use std::convert::TryInto; @@ -28,7 +28,7 @@ impl CookieInterceptor { impl Interceptor for CookieInterceptor { type Err = Error; - fn intercept<'a>(&'a self, mut request: Request, ctx: Context<'a>) -> InterceptorFuture<'a, Self::Err> { + fn intercept<'a>(&'a self, mut request: Request, ctx: Context<'a>) -> InterceptorFuture<'a, Self::Err> { Box::pin(async move { // Determine the cookie jar to use for this request. If one is // attached to this specific request, use it, otherwise use the diff --git a/src/default_headers.rs b/src/default_headers.rs index c768740f..f876e46d 100644 --- a/src/default_headers.rs +++ b/src/default_headers.rs @@ -1,6 +1,7 @@ use crate::{ + body::AsyncBody, + error::Error, interceptor::{Context, Interceptor, InterceptorFuture}, - Body, Error, }; use http::{HeaderMap, HeaderValue, Request}; @@ -21,7 +22,7 @@ impl Interceptor for DefaultHeadersInterceptor { fn intercept<'a>( &'a self, - mut request: Request, + mut request: Request, ctx: Context<'a>, ) -> InterceptorFuture<'a, Self::Err> { Box::pin(async move { diff --git a/src/handler.rs b/src/handler.rs index c719622d..2682e43f 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -1,9 +1,11 @@ #![allow(unsafe_code)] use crate::{ - headers, + body::AsyncBody, + error::{Error, ErrorKind}, + parsing::{parse_header, parse_status_line}, + metrics::Metrics, response::{LocalAddr, RemoteAddr}, - Body, Error, Metrics, }; use crossbeam_utils::atomic::AtomicCell; use curl::easy::{InfoType, ReadError, SeekResult, WriteError}; @@ -28,7 +30,7 @@ use std::{ task::{Context, Poll, Waker}, }; -pub(crate) struct RequestBody(pub(crate) Body); +pub(crate) struct RequestBody(pub(crate) AsyncBody); /// Manages the state of a single request/response life cycle. /// @@ -36,14 +38,14 @@ pub(crate) struct RequestBody(pub(crate) Body); /// the progress of the request, and the handler will incrementally build up a /// response struct as the response is received. /// -/// Every request handler has an associated `Future` that can be used to poll +/// Every request handler has an associated [`Future`] that can be used to poll /// the state of the response. The handler will complete the future once the /// final HTTP response headers are received. The body of the response (if any) /// is made available to the consumer of the future, and is also driven by the /// request handler until the response body is fully consumed or discarded. /// /// If dropped before the response is finished, the associated future will be -/// completed with an `Aborted` error. +/// completed with an error. pub(crate) struct RequestHandler { /// A tracing span for grouping log events under. Since a request is /// processed asynchronously inside an agent thread, this span helps @@ -61,7 +63,7 @@ pub(crate) struct RequestHandler { sender: Option>>, /// The body to be sent in the request. - request_body: Body, + request_body: AsyncBody, /// A waker used with reading the request body asynchronously. Populated by /// an agent when the request is initialized. @@ -104,7 +106,7 @@ struct Shared { /// Set to the final result of the transfer received from curl. This is used /// to communicate an error while reading the response body if the handler /// suddenly aborts. - result: OnceCell>, + result: OnceCell>, /// Set to true whenever the response body is dropped. This is used in the /// opposite manner as the above flag; if the response body is dropped, then @@ -116,7 +118,7 @@ struct Shared { impl RequestHandler { /// Create a new request handler and an associated response future. pub(crate) fn new( - request_body: Body, + request_body: AsyncBody, ) -> ( Self, impl Future, Error>>, @@ -146,14 +148,14 @@ impl RequestHandler { // Create a future that resolves when the handler receives the response // headers. let future = async move { - let builder = receiver.recv_async().await.map_err(|e| Error::new(crate::error::ErrorKind::Unknown, e))??; + let builder = receiver.recv_async().await.map_err(|e| Error::new(ErrorKind::Unknown, e))??; let reader = ResponseBodyReader { inner: response_body_reader, shared, }; - builder.body(reader).map_err(|e| Error::new(crate::error::ErrorKind::ProtocolViolation, e)) + builder.body(reader).map_err(|e| Error::new(ErrorKind::ProtocolViolation, e)) }; (handler, future) @@ -208,76 +210,70 @@ impl RequestHandler { self.response_body_waker = Some(response_waker); } - /// Handle a result produced by curl for this handler's current transfer. - pub(crate) fn on_result(&mut self, result: Result<(), curl::Error>) { - let span = tracing::trace_span!(parent: &self.span, "on_result"); - let _enter = span.enter(); - - self.shared.result.set(result.clone()).unwrap(); - - match result { - Ok(()) => self.flush_response_headers(), - Err(e) => { - tracing::debug!("curl error: {}", e); - self.complete(Err(e.into())); - } + /// Set the final result for this transfer. + pub(crate) fn set_result(&mut self, result: Result<(), Error>) { + if self.shared.result.set(result).is_err() { + tracing::debug!("attempted to set error multiple times"); } + + // Complete the response future, if we haven't already. + self.complete_response_future(); } /// Mark the future as completed successfully with the response headers /// received so far. - fn flush_response_headers(&mut self) { - if self.sender.is_some() { - let mut builder = http::Response::builder(); - - if let Some(status) = self.response_status_code.take() { - builder = builder.status(status); - } - - if let Some(version) = self.response_version.take() { - builder = builder.version(version); - } + fn complete_response_future(&mut self) { + // If the sender has been taken already, then the future has already + // been completed. + if let Some(sender) = self.sender.take() { + // If our request has already failed early with an error, return that instead. + let result = if let Some(Err(e)) = self.shared.result.get() { + tracing::warn!("request completed with error: {}", e); + Err(e.clone()) + } else { + Ok(self.build_response()) + }; - if let Some(headers) = builder.headers_mut() { - headers.extend(self.response_headers.drain()); + if sender.send(result).is_err() { + tracing::debug!("request canceled by user"); } + } + } - if let Some(addr) = self.get_local_addr() { - builder = builder.extension(LocalAddr(addr)); - } + fn build_response(&mut self) -> http::response::Builder { + let mut builder = http::Response::builder(); - if let Some(addr) = self.get_primary_addr() { - builder = builder.extension(RemoteAddr(addr)); - } + if let Some(status) = self.response_status_code.take() { + builder = builder.status(status); + } - // Keep the request body around in case interceptors need access to - // it. Otherwise we're just going to drop it later. - builder = builder.extension(RequestBody(mem::take(&mut self.request_body))); + if let Some(version) = self.response_version.take() { + builder = builder.version(version); + } - // Include metrics in response, but only if it was created. If - // metrics are disabled then it won't have been created. - if let Some(metrics) = self.metrics.clone() { - builder = builder.extension(metrics); - } + if let Some(headers) = builder.headers_mut() { + headers.extend(self.response_headers.drain()); + } - self.complete(Ok(builder)); + if let Some(addr) = self.get_local_addr() { + builder = builder.extension(LocalAddr(addr)); } - } - /// Complete the associated future with a result. - fn complete(&mut self, result: Result) { - let span = tracing::trace_span!(parent: &self.span, "complete"); - let _enter = span.enter(); + if let Some(addr) = self.get_primary_addr() { + builder = builder.extension(RemoteAddr(addr)); + } - if let Some(sender) = self.sender.take() { - if let Err(e) = result.as_ref() { - tracing::warn!("request completed with error: {}", e); - } + // Keep the request body around in case interceptors need access to + // it. Otherwise we're just going to drop it later. + builder = builder.extension(RequestBody(mem::take(&mut self.request_body))); - if sender.send(result).is_err() { - tracing::debug!("request canceled by user"); - } + // Include metrics in response, but only if it was created. If + // metrics are disabled then it won't have been created. + if let Some(metrics) = self.metrics.clone() { + builder = builder.extension(metrics); } + + builder } fn get_primary_addr(&mut self) -> Option { @@ -393,7 +389,7 @@ impl curl::easy::Handler for RequestHandler { // HTTP/1.1 connection ourselves. // Is this the status line? - if let Some((version, status)) = headers::parse_status_line(data) { + if let Some((version, status)) = parse_status_line(data) { self.response_version = Some(version); self.response_status_code = Some(status); @@ -405,7 +401,7 @@ impl curl::easy::Handler for RequestHandler { } // Is this a header line? - if let Some((name, value)) = headers::parse_header(data) { + if let Some((name, value)) = parse_header(data) { self.response_headers.append(name, value); return true; } @@ -446,6 +442,15 @@ impl curl::easy::Handler for RequestHandler { Poll::Ready(Ok(len)) => Ok(len), Poll::Ready(Err(e)) => { tracing::error!("error reading request body: {}", e); + + // While we could just return an error here to curl and let + // the error bubble up through naturally, right now we have + // the most information about the underlying error that we + // will ever have. That's why we set the error now, to + // improve the error message. Otherwise we'll return a + // rather generic-sounding I/O error to the caller. + self.set_result(Err(e.into())); + Err(ReadError::Abort) } } @@ -490,7 +495,7 @@ impl curl::easy::Handler for RequestHandler { // Now that we've started receiving the response body, we know no more // redirects can happen and we can complete the future safely. - self.flush_response_headers(); + self.complete_response_future(); // Create a task context using a waker provided by the agent so we can // do an asynchronous write. @@ -652,7 +657,7 @@ impl AsyncRead for ResponseBodyReader { Some(Ok(())) => Poll::Ready(Ok(0)), // The transfer finished with an error, so return the error. - Some(Err(e)) => Poll::Ready(Err(io::Error::from(Error::from(e.clone())))), + Some(Err(e)) => Poll::Ready(Err(io::Error::from(e.clone()))), // The transfer did not finish properly at all, so return an error. None => Poll::Ready(Err(io::ErrorKind::ConnectionAborted.into())), diff --git a/src/headers.rs b/src/headers.rs index 176bb110..ebb660ca 100644 --- a/src/headers.rs +++ b/src/headers.rs @@ -1,191 +1,43 @@ -use http::header::{HeaderName, HeaderValue}; -use http::{StatusCode, Version}; - -pub(crate) fn parse_status_line(line: &[u8]) -> Option<(Version, StatusCode)> { - let mut parts = line.split(u8::is_ascii_whitespace); - - let version = match parts.next()? { - b"HTTP/3" => Version::HTTP_3, - b"HTTP/2" => Version::HTTP_2, - b"HTTP/1.1" => Version::HTTP_11, - b"HTTP/1.0" => Version::HTTP_10, - b"HTTP/0.9" => Version::HTTP_09, - _ => return None, - }; - - let status_code = parts - .find(|s| !s.is_empty()) - .map(StatusCode::from_bytes)? - .ok()?; - - Some((version, status_code)) -} - -pub(crate) fn parse_header(line: &[u8]) -> Option<(HeaderName, HeaderValue)> { - let split_index = line.iter().position(|&f| f == b':')?; - - let name = HeaderName::from_bytes(&line[..split_index]).ok()?; - let mut value = &line[split_index + 1..]; - - // Trim whitespace - while let Some((byte, right)) = value.split_first() { - if byte.is_ascii_whitespace() { - value = right; - } else { - break; - } - } - - while let Some((byte, left)) = value.split_last() { - if byte.is_ascii_whitespace() { - value = left; - } else { - break; - } +use http::header::HeaderMap; + +/// Extension trait for HTTP requests and responses for accessing common headers +/// in a typed way. +/// +/// Eventually this trait can be made public once the types are cleaned up a +/// bit. +pub(crate) trait HasHeaders { + fn headers(&self) -> &HeaderMap; + + fn content_length(&self) -> Option { + self + .headers() + .get(http::header::CONTENT_LENGTH) + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.parse().ok()) + } + + fn content_type(&self) -> Option<&str> { + self + .headers() + .get(http::header::CONTENT_TYPE) + .and_then(|v| v.to_str().ok()) } - - let value = HeaderValue::from_bytes(value).ok()?; - - Some((name, value)) } -pub(crate) fn to_curl_string( - name: &HeaderName, - value: &HeaderValue, - title_case: bool, -) -> String { - let header_value = value.to_str().expect("request header value is not valid UTF-8!"); - - let mut string = String::new(); - - if title_case { - let name_bytes: &[u8] = name.as_ref(); - let mut at_start_of_word = true; - - for &byte in name_bytes { - if at_start_of_word { - string.push(byte.to_ascii_uppercase().into()); - } else { - string.push(byte.into()); - } - - at_start_of_word = !byte.is_ascii_alphanumeric(); - } - } else { - string.push_str(name.as_str()); - } - - // libcurl requires a special syntax to set a header with an explicit empty - // value. See https://curl.haxx.se/libcurl/c/CURLOPT_HTTPHEADER.html. - if header_value.trim().is_empty() { - string.push(';'); - } else { - string.push(':'); - string.push_str(header_value); +impl HasHeaders for HeaderMap { + fn headers(&self) -> &HeaderMap { + self } - - string } -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn parse_valid_status_line() { - assert_eq!( - parse_status_line(b"HTTP/0.9 200 \r\n"), - Some((Version::HTTP_09, StatusCode::OK,)) - ); - assert_eq!( - parse_status_line(b"HTTP/1.0 500 Internal Server Error\r\n"), - Some((Version::HTTP_10, StatusCode::INTERNAL_SERVER_ERROR,)) - ); - assert_eq!( - parse_status_line(b"HTTP/1.1 404 not found \r\n"), - Some((Version::HTTP_11, StatusCode::NOT_FOUND,)) - ); - assert_eq!( - parse_status_line(b"HTTP/2 200\r\n"), - Some((Version::HTTP_2, StatusCode::OK,)) - ); - assert_eq!( - parse_status_line(b"HTTP/3 200\r\n"), - Some((Version::HTTP_3, StatusCode::OK,)) - ); - } - - #[test] - fn parse_invalid_status_line() { - assert_eq!(parse_status_line(b""), None); - assert_eq!(parse_status_line(b" \r\n"), None); - assert_eq!(parse_status_line(b"HTP/foo bar baz\r\n"), None); - assert_eq!(parse_status_line(b"a-header: bar\r\n"), None); - assert_eq!( - parse_status_line(b" HTTP/1.1 500 Internal Server Error\r\n"), - None - ); - assert_eq!(parse_status_line(b"HTTP/4 200\r\n"), None); - } - - #[test] - fn parse_valid_headers() { - assert_eq!( - parse_header(b"Empty:"), - Some(("empty".parse().unwrap(), "".parse().unwrap(),)) - ); - assert_eq!( - parse_header(b"CONTENT-LENGTH:20\r\n"), - Some(("content-length".parse().unwrap(), "20".parse().unwrap(),)) - ); - assert_eq!( - parse_header(b"x-Server: Rust \r"), - Some(("x-server".parse().unwrap(), "Rust".parse().unwrap(),)) - ); - assert_eq!( - parse_header(b"X-val: Hello World\r"), - Some(("x-val".parse().unwrap(), "Hello World".parse().unwrap(),)) - ); - - assert_eq!( - parse_header(b"Location: https://example.com/\r"), - Some(( - "location".parse().unwrap(), - "https://example.com/".parse().unwrap(), - )) - ); - } - - #[test] - fn parse_invalid_headers() { - assert_eq!(parse_header(b""), None); - assert_eq!(parse_header(b":"), None); - assert_eq!(parse_header(b": bar"), None); - assert_eq!(parse_header(b"a\nheader: bar"), None); - assert_eq!(parse_header(b"foo : bar\r"), None); +impl HasHeaders for http::Request { + fn headers(&self) -> &HeaderMap { + self.headers() } +} - #[test] - fn normal_header_to_curl_string() { - let name = "User-Agent".parse().unwrap(); - let value = "foo".parse().unwrap(); - - assert_eq!(to_curl_string(&name, &value, false), "user-agent:foo"); - } - - #[test] - fn blank_header_to_curl_string() { - let name = "User-Agent".parse().unwrap(); - let value = "".parse().unwrap(); - - assert_eq!(to_curl_string(&name, &value, false), "user-agent;"); - } - - #[test] - fn normal_header_to_curl_string_title_case() { - let name = "User-Agent".parse().unwrap(); - let value = "foo".parse().unwrap(); - - assert_eq!(to_curl_string(&name, &value, true), "User-Agent:foo"); +impl HasHeaders for http::Response { + fn headers(&self) -> &HeaderMap { + self.headers() } } diff --git a/src/interceptor/context.rs b/src/interceptor/context.rs index e6ac28cf..853c3ff7 100644 --- a/src/interceptor/context.rs +++ b/src/interceptor/context.rs @@ -1,4 +1,4 @@ -use crate::{Body, Error}; +use crate::{body::AsyncBody, error::Error}; use super::{Interceptor, InterceptorFuture, InterceptorObj}; use http::{Request, Response}; use std::{ @@ -15,7 +15,7 @@ pub struct Context<'a> { impl<'a> Context<'a> { /// Send a request asynchronously, executing the next interceptor in the /// chain, if any. - pub async fn send(&self, request: Request) -> Result, Error> { + pub async fn send(&self, request: Request) -> Result, Error> { if let Some(interceptor) = self.interceptors.first() { let inner_context = Self { invoker: self.invoker.clone(), @@ -36,5 +36,5 @@ impl fmt::Debug for Context<'_> { } pub(crate) trait Invoke { - fn invoke<'a>(&'a self, request: Request) -> InterceptorFuture<'a, Error>; + fn invoke<'a>(&'a self, request: Request) -> InterceptorFuture<'a, Error>; } diff --git a/src/interceptor/mod.rs b/src/interceptor/mod.rs index f266ec42..f3d88a73 100644 --- a/src/interceptor/mod.rs +++ b/src/interceptor/mod.rs @@ -27,7 +27,7 @@ /// [`unstable-interceptors`](../index.html#unstable-interceptors) feature is /// enabled. -use crate::Body; +use crate::body::AsyncBody; use http::{Request, Response}; use std::{ error::Error, @@ -45,6 +45,8 @@ pub(crate) use self::{ obj::InterceptorObj, }; +type InterceptorResult = Result, E>; + /// Defines an inline interceptor using a closure-like syntax. /// /// Closures are not supported due to a limitation in Rust's type inference. @@ -53,9 +55,9 @@ pub(crate) use self::{ macro_rules! interceptor { ($request:ident, $ctx:ident, $body:expr) => {{ async fn interceptor( - mut $request: $crate::http::Request<$crate::Body>, + mut $request: $crate::http::Request<$crate::AsyncBody>, $ctx: $crate::interceptor::Context<'_>, - ) -> Result<$crate::http::Response, $crate::Error> { + ) -> Result<$crate::http::Response<$crate::AsyncBody>, $crate::Error> { (move || async move { $body })().await.map_err(Into::into) @@ -78,16 +80,16 @@ pub trait Interceptor: Send + Sync { /// /// The returned future is allowed to borrow the interceptor for the /// duration of its execution. - fn intercept<'a>(&'a self, request: Request, ctx: Context<'a>) -> InterceptorFuture<'a, Self::Err>; + fn intercept<'a>(&'a self, request: Request, ctx: Context<'a>) -> InterceptorFuture<'a, Self::Err>; } /// The type of future returned by an interceptor. -pub type InterceptorFuture<'a, E> = Pin, E>> + Send + 'a>>; +pub type InterceptorFuture<'a, E> = Pin> + Send + 'a>>; /// Creates an interceptor from an arbitrary closure or function. pub fn from_fn(f: F) -> InterceptorFn where - F: for<'a> private::AsyncFn2, Context<'a>, Output = Result, E>> + Send + Sync + 'static, + F: for<'a> private::AsyncFn2, Context<'a>, Output = InterceptorResult> + Send + Sync + 'static, E: Error + Send + Sync + 'static, { InterceptorFn(f) @@ -100,11 +102,11 @@ pub struct InterceptorFn(F); impl Interceptor for InterceptorFn where E: Error + Send + Sync + 'static, - F: for<'a> private::AsyncFn2, Context<'a>, Output = Result, E>> + Send + Sync + 'static, + F: for<'a> private::AsyncFn2, Context<'a>, Output = InterceptorResult> + Send + Sync + 'static, { type Err = E; - fn intercept<'a>(&self, request: Request, ctx: Context<'a>) -> InterceptorFuture<'a, Self::Err> { + fn intercept<'a>(&self, request: Request, ctx: Context<'a>) -> InterceptorFuture<'a, Self::Err> { Box::pin(self.0.call(request, ctx)) } } diff --git a/src/interceptor/obj.rs b/src/interceptor/obj.rs index 98a03842..62d65d0e 100644 --- a/src/interceptor/obj.rs +++ b/src/interceptor/obj.rs @@ -1,4 +1,4 @@ -use crate::{Body, Error}; +use crate::{body::AsyncBody, error::Error}; use super::{Context, Interceptor, InterceptorFuture}; use http::Request; @@ -14,7 +14,7 @@ impl InterceptorObj { impl Interceptor for InterceptorObj { type Err = Error; - fn intercept<'a>(&'a self, request: Request, cx: Context<'a>) -> InterceptorFuture<'a, Self::Err> { + fn intercept<'a>(&'a self, request: Request, cx: Context<'a>) -> InterceptorFuture<'a, Self::Err> { self.0.dyn_intercept(request, cx) } } @@ -22,11 +22,11 @@ impl Interceptor for InterceptorObj { /// Object-safe version of the interceptor used for type erasure. Implementation /// detail of [`InterceptorObj`]. trait DynInterceptor: Send + Sync { - fn dyn_intercept<'a>(&'a self, request: Request, cx: Context<'a>) -> InterceptorFuture<'a, Error>; + fn dyn_intercept<'a>(&'a self, request: Request, cx: Context<'a>) -> InterceptorFuture<'a, Error>; } impl DynInterceptor for I { - fn dyn_intercept<'a>(&'a self, request: Request, cx: Context<'a>) -> InterceptorFuture<'a, Error> { + fn dyn_intercept<'a>(&'a self, request: Request, cx: Context<'a>) -> InterceptorFuture<'a, Error> { Box::pin(async move { self.intercept(request, cx).await.map_err(Error::from_any) }) diff --git a/src/lib.rs b/src/lib.rs index 01703d38..1d7f54a7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -89,8 +89,15 @@ //! make common tasks convenient and allow you to configure more advanced //! connection and protocol details. //! -//! Some key traits to read about include -//! [`Configurable`](config::Configurable), [`RequestExt`], and [`ResponseExt`]. +//! Here are some of the key traits to read about: +//! +//! - [`Configurable`](config::Configurable): Configure request parameters. +//! - [`RequestExt`]: Manipulate and send requests. +//! - [`ResponseExt`]: Get information about the corresponding request or +//! response statistics. +//! - [`ReadResponseExt`]: Consume a response body in a variety of ways. +//! - [`AsyncReadResponseExt`]: Consume an asynchronous response body in a +//! variety of ways. //! //! ## Custom clients //! @@ -118,10 +125,15 @@ //! use isahc::prelude::*; //! //! let mut response = isahc::get_async("https://httpbin.org/get").await?; -//! println!("{}", response.text_async().await?); +//! println!("{}", response.text().await?); //! # Ok(()) } //! ``` //! +//! Since we sent our request using [`get_async`], no blocking will occur, and +//! the asynchronous versions of all response methods (such as +//! [`text`](AsyncReadResponseExt::text)) will also automatically be selected by +//! the compiler. +//! //! # Feature flags //! //! Isahc is designed to be as "pay-as-you-need" as possible using Cargo feature @@ -237,6 +249,7 @@ mod default_headers; mod handler; mod headers; mod metrics; +mod parsing; mod redirect; mod request; mod response; @@ -254,12 +267,12 @@ pub mod interceptor; pub(crate) mod interceptor; pub use crate::{ - body::Body, + body::{AsyncBody, Body}, client::{HttpClient, HttpClientBuilder, ResponseFuture}, error::Error, metrics::Metrics, request::RequestExt, - response::ResponseExt, + response::{AsyncReadResponseExt, ReadResponseExt, ResponseExt}, }; /// Re-export of the standard HTTP types. @@ -274,7 +287,14 @@ pub use http; /// ``` pub mod prelude { #[doc(no_inline)] - pub use crate::{config::Configurable, Body, HttpClient, RequestExt, ResponseExt}; + pub use crate::{ + AsyncReadResponseExt, + ReadResponseExt, + config::Configurable, + HttpClient, + RequestExt, + ResponseExt, + }; #[doc(no_inline)] pub use http::{Request, Response}; @@ -332,10 +352,11 @@ where /// /// The request is executed using a shared [`HttpClient`] instance. See /// [`HttpClient::post`] for details. -pub fn post(uri: U, body: impl Into) -> Result, Error> +pub fn post(uri: U, body: B) -> Result, Error> where http::Uri: TryFrom, >::Error: Into, + B: Into, { HttpClient::shared().post(uri, body) } @@ -345,10 +366,11 @@ where /// /// The request is executed using a shared [`HttpClient`] instance. See /// [`HttpClient::post_async`] for details. -pub fn post_async(uri: U, body: impl Into) -> ResponseFuture<'static> +pub fn post_async(uri: U, body: B) -> ResponseFuture<'static> where http::Uri: TryFrom, >::Error: Into, + B: Into, { HttpClient::shared().post_async(uri, body) } @@ -357,10 +379,11 @@ where /// /// The request is executed using a shared [`HttpClient`] instance. See /// [`HttpClient::put`] for details. -pub fn put(uri: U, body: impl Into) -> Result, Error> +pub fn put(uri: U, body: B) -> Result, Error> where http::Uri: TryFrom, >::Error: Into, + B: Into, { HttpClient::shared().put(uri, body) } @@ -370,10 +393,11 @@ where /// /// The request is executed using a shared [`HttpClient`] instance. See /// [`HttpClient::put_async`] for details. -pub fn put_async(uri: U, body: impl Into) -> ResponseFuture<'static> +pub fn put_async(uri: U, body: B) -> ResponseFuture<'static> where http::Uri: TryFrom, >::Error: Into, + B: Into { HttpClient::shared().put_async(uri, body) } @@ -414,7 +438,7 @@ pub fn send>(request: Request) -> Result, Error> /// /// The request is executed using a shared [`HttpClient`] instance. See /// [`HttpClient::send_async`] for details. -pub fn send_async>(request: Request) -> ResponseFuture<'static> { +pub fn send_async>(request: Request) -> ResponseFuture<'static> { HttpClient::shared().send_async(request) } diff --git a/src/parsing.rs b/src/parsing.rs new file mode 100644 index 00000000..5119d182 --- /dev/null +++ b/src/parsing.rs @@ -0,0 +1,191 @@ +use http::header::{HeaderName, HeaderValue}; +use http::{StatusCode, Version}; + +pub(crate) fn parse_status_line(line: &[u8]) -> Option<(Version, StatusCode)> { + let mut parts = line.split(u8::is_ascii_whitespace); + + let version = match parts.next()? { + b"HTTP/3" => Version::HTTP_3, + b"HTTP/2" => Version::HTTP_2, + b"HTTP/1.1" => Version::HTTP_11, + b"HTTP/1.0" => Version::HTTP_10, + b"HTTP/0.9" => Version::HTTP_09, + _ => return None, + }; + + let status_code = parts + .find(|s| !s.is_empty()) + .map(StatusCode::from_bytes)? + .ok()?; + + Some((version, status_code)) +} + +pub(crate) fn parse_header(line: &[u8]) -> Option<(HeaderName, HeaderValue)> { + let split_index = line.iter().position(|&f| f == b':')?; + + let name = HeaderName::from_bytes(&line[..split_index]).ok()?; + let mut value = &line[split_index + 1..]; + + // Trim whitespace + while let Some((byte, right)) = value.split_first() { + if byte.is_ascii_whitespace() { + value = right; + } else { + break; + } + } + + while let Some((byte, left)) = value.split_last() { + if byte.is_ascii_whitespace() { + value = left; + } else { + break; + } + } + + let value = HeaderValue::from_bytes(value).ok()?; + + Some((name, value)) +} + +pub(crate) fn header_to_curl_string( + name: &HeaderName, + value: &HeaderValue, + title_case: bool, +) -> String { + let header_value = value.to_str().expect("request header value is not valid UTF-8!"); + + let mut string = String::new(); + + if title_case { + let name_bytes: &[u8] = name.as_ref(); + let mut at_start_of_word = true; + + for &byte in name_bytes { + if at_start_of_word { + string.push(byte.to_ascii_uppercase().into()); + } else { + string.push(byte.into()); + } + + at_start_of_word = !byte.is_ascii_alphanumeric(); + } + } else { + string.push_str(name.as_str()); + } + + // libcurl requires a special syntax to set a header with an explicit empty + // value. See https://curl.haxx.se/libcurl/c/CURLOPT_HTTPHEADER.html. + if header_value.trim().is_empty() { + string.push(';'); + } else { + string.push(':'); + string.push_str(header_value); + } + + string +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parse_valid_status_line() { + assert_eq!( + parse_status_line(b"HTTP/0.9 200 \r\n"), + Some((Version::HTTP_09, StatusCode::OK,)) + ); + assert_eq!( + parse_status_line(b"HTTP/1.0 500 Internal Server Error\r\n"), + Some((Version::HTTP_10, StatusCode::INTERNAL_SERVER_ERROR,)) + ); + assert_eq!( + parse_status_line(b"HTTP/1.1 404 not found \r\n"), + Some((Version::HTTP_11, StatusCode::NOT_FOUND,)) + ); + assert_eq!( + parse_status_line(b"HTTP/2 200\r\n"), + Some((Version::HTTP_2, StatusCode::OK,)) + ); + assert_eq!( + parse_status_line(b"HTTP/3 200\r\n"), + Some((Version::HTTP_3, StatusCode::OK,)) + ); + } + + #[test] + fn parse_invalid_status_line() { + assert_eq!(parse_status_line(b""), None); + assert_eq!(parse_status_line(b" \r\n"), None); + assert_eq!(parse_status_line(b"HTP/foo bar baz\r\n"), None); + assert_eq!(parse_status_line(b"a-header: bar\r\n"), None); + assert_eq!( + parse_status_line(b" HTTP/1.1 500 Internal Server Error\r\n"), + None + ); + assert_eq!(parse_status_line(b"HTTP/4 200\r\n"), None); + } + + #[test] + fn parse_valid_headers() { + assert_eq!( + parse_header(b"Empty:"), + Some(("empty".parse().unwrap(), "".parse().unwrap(),)) + ); + assert_eq!( + parse_header(b"CONTENT-LENGTH:20\r\n"), + Some(("content-length".parse().unwrap(), "20".parse().unwrap(),)) + ); + assert_eq!( + parse_header(b"x-Server: Rust \r"), + Some(("x-server".parse().unwrap(), "Rust".parse().unwrap(),)) + ); + assert_eq!( + parse_header(b"X-val: Hello World\r"), + Some(("x-val".parse().unwrap(), "Hello World".parse().unwrap(),)) + ); + + assert_eq!( + parse_header(b"Location: https://example.com/\r"), + Some(( + "location".parse().unwrap(), + "https://example.com/".parse().unwrap(), + )) + ); + } + + #[test] + fn parse_invalid_headers() { + assert_eq!(parse_header(b""), None); + assert_eq!(parse_header(b":"), None); + assert_eq!(parse_header(b": bar"), None); + assert_eq!(parse_header(b"a\nheader: bar"), None); + assert_eq!(parse_header(b"foo : bar\r"), None); + } + + #[test] + fn normal_header_to_curl_string() { + let name = "User-Agent".parse().unwrap(); + let value = "foo".parse().unwrap(); + + assert_eq!(header_to_curl_string(&name, &value, false), "user-agent:foo"); + } + + #[test] + fn blank_header_to_curl_string() { + let name = "User-Agent".parse().unwrap(); + let value = "".parse().unwrap(); + + assert_eq!(header_to_curl_string(&name, &value, false), "user-agent;"); + } + + #[test] + fn normal_header_to_curl_string_title_case() { + let name = "User-Agent".parse().unwrap(); + let value = "foo".parse().unwrap(); + + assert_eq!(header_to_curl_string(&name, &value, true), "User-Agent:foo"); + } +} diff --git a/src/redirect.rs b/src/redirect.rs index 5e418f6f..43daf722 100644 --- a/src/redirect.rs +++ b/src/redirect.rs @@ -1,10 +1,10 @@ use crate::{ + body::AsyncBody, config::RedirectPolicy, error::{Error, ErrorKind}, handler::RequestBody, interceptor::{Context, Interceptor, InterceptorFuture}, request::RequestExt, - Body, }; use http::{Request, Response, Uri}; use std::convert::TryFrom; @@ -27,7 +27,7 @@ impl Interceptor for RedirectInterceptor { fn intercept<'a>( &'a self, - mut request: Request, + mut request: Request, ctx: Context<'a>, ) -> InterceptorFuture<'a, Self::Err> { Box::pin(async move { diff --git a/src/request.rs b/src/request.rs index fdb3af34..c436ddba 100644 --- a/src/request.rs +++ b/src/request.rs @@ -1,7 +1,8 @@ use crate::{ + body::{AsyncBody, Body}, client::ResponseFuture, config::{internal::ConfigurableBase, Configurable}, - {Body, Error}, + error::Error, }; use http::{Request, Response}; @@ -42,7 +43,7 @@ pub trait RequestExt { /// [`send_async`](crate::send_async). fn send_async(self) -> ResponseFuture<'static> where - T: Into; + T: Into; } impl RequestExt for Request { @@ -114,7 +115,7 @@ impl RequestExt for Request { fn send_async(self) -> ResponseFuture<'static> where - T: Into, + T: Into, { crate::send_async(self) } diff --git a/src/response.rs b/src/response.rs index a4992725..9b357e26 100644 --- a/src/response.rs +++ b/src/response.rs @@ -1,11 +1,17 @@ -use crate::{redirect::EffectiveUri, Metrics}; -use futures_lite::io::AsyncRead; +use crate::{ + metrics::Metrics, + redirect::EffectiveUri, +}; +use futures_lite::io::{AsyncRead, AsyncWrite}; use http::{Response, Uri}; use std::{ fs::File, + future::Future, io::{self, Read, Write}, net::SocketAddr, path::Path, + pin::Pin, + task::{Context, Poll}, }; /// Provides extension methods for working with HTTP responses. @@ -65,13 +71,50 @@ pub trait ResponseExt { /// metrics you can use /// [`Configurable::metrics`](crate::config::Configurable::metrics). fn metrics(&self) -> Option<&Metrics>; +} + +impl ResponseExt for Response { + fn effective_uri(&self) -> Option<&Uri> { + self.extensions().get::().map(|v| &v.0) + } + + fn local_addr(&self) -> Option { + self.extensions().get::().map(|v| v.0) + } + + fn remote_addr(&self) -> Option { + self.extensions().get::().map(|v| v.0) + } + #[cfg(feature = "cookies")] + fn cookie_jar(&self) -> Option<&crate::cookies::CookieJar> { + self.extensions().get() + } + + fn metrics(&self) -> Option<&Metrics> { + self.extensions().get() + } +} + +/// Provides extension methods for consuming HTTP response streams. +pub trait ReadResponseExt { /// Copy the response body into a writer. /// /// Returns the number of bytes that were written. - fn copy_to(&mut self, writer: impl Write) -> io::Result - where - T: Read; + /// + /// # Examples + /// + /// Copying the response into an in-memory buffer: + /// + /// ```no_run + /// use isahc::prelude::*; + /// + /// let mut buf = vec![]; + /// isahc::get("https://example.org")?.copy_to(&mut buf)?; + /// println!("Read {} bytes", buf.len()); + /// # Ok::<(), isahc::Error>(()) + /// ``` + fn copy_to(&mut self, writer: W) -> io::Result; /// Write the response body to a file. /// @@ -89,10 +132,7 @@ pub trait ResponseExt { /// .copy_to_file("myimage.jpg")?; /// # Ok::<(), isahc::Error>(()) /// ``` - fn copy_to_file(&mut self, path: impl AsRef) -> io::Result - where - T: Read, - { + fn copy_to_file>(&mut self, path: P) -> io::Result { File::create(path).and_then(|f| self.copy_to(f)) } @@ -128,24 +168,7 @@ pub trait ResponseExt { /// # Ok::<(), isahc::Error>(()) /// ``` #[cfg(feature = "text-decoding")] - fn text(&mut self) -> io::Result - where - T: Read; - - /// Read the response body as a string asynchronously. - /// - /// This method consumes the entire response body stream and can only be - /// called once. - /// - /// # Availability - /// - /// This method is only available when the - /// [`text-decoding`](index.html#text-decoding) feature is enabled, which it - /// is by default. - #[cfg(feature = "text-decoding")] - fn text_async(&mut self) -> crate::text::TextFuture<'_, &mut T> - where - T: AsyncRead + Unpin; + fn text(&mut self) -> io::Result; /// Deserialize the response body as JSON into a given type. /// @@ -167,62 +190,104 @@ pub trait ResponseExt { #[cfg(feature = "json")] fn json(&mut self) -> Result where - D: serde::de::DeserializeOwned, - T: Read; + D: serde::de::DeserializeOwned; } -impl ResponseExt for Response { - fn effective_uri(&self) -> Option<&Uri> { - self.extensions().get::().map(|v| &v.0) - } - - fn local_addr(&self) -> Option { - self.extensions().get::().map(|v| v.0) - } - - fn remote_addr(&self) -> Option { - self.extensions().get::().map(|v| v.0) - } - - #[cfg(feature = "cookies")] - fn cookie_jar(&self) -> Option<&crate::cookies::CookieJar> { - self.extensions().get() +impl ReadResponseExt for Response { + fn copy_to(&mut self, mut writer: W) -> io::Result { + io::copy(self.body_mut(), &mut writer) } - fn metrics(&self) -> Option<&Metrics> { - self.extensions().get() + #[cfg(feature = "text-decoding")] + fn text(&mut self) -> io::Result { + crate::text::Decoder::for_response(&self).decode_reader(self.body_mut()) } - fn copy_to(&mut self, mut writer: impl Write) -> io::Result + #[cfg(feature = "json")] + fn json(&mut self) -> Result where - T: Read, + D: serde::de::DeserializeOwned, { - io::copy(self.body_mut(), &mut writer) + serde_json::from_reader(self.body_mut()) } +} +/// Provides extension methods for consuming asynchronous HTTP response streams. +pub trait AsyncReadResponseExt { + /// Copy the response body into a writer asynchronously. + /// + /// Returns the number of bytes that were written. + /// + /// # Examples + /// + /// Copying the response into an in-memory buffer: + /// + /// ```no_run + /// use isahc::prelude::*; + /// + /// # async fn run() -> Result<(), isahc::Error> { + /// let mut buf = vec![]; + /// isahc::get_async("https://example.org").await? + /// .copy_to(&mut buf).await?; + /// println!("Read {} bytes", buf.len()); + /// # Ok(()) } + /// ``` + fn copy_to<'a, W>(&'a mut self, writer: W) -> CopyFuture<'a> + where + W: AsyncWrite + Unpin + 'a; + + /// Read the response body as a string asynchronously. + /// + /// This method consumes the entire response body stream and can only be + /// called once. + /// + /// # Availability + /// + /// This method is only available when the + /// [`text-decoding`](index.html#text-decoding) feature is enabled, which it + /// is by default. + /// + /// # Examples + /// + /// ```no_run + /// use isahc::prelude::*; + /// + /// # async fn run() -> Result<(), isahc::Error> { + /// let text = isahc::get_async("https://example.org").await? + /// .text().await?; + /// println!("{}", text); + /// # Ok(()) } + /// ``` #[cfg(feature = "text-decoding")] - fn text(&mut self) -> io::Result + fn text(&mut self) -> crate::text::TextFuture<'_, &mut T>; +} + +impl AsyncReadResponseExt for Response { + fn copy_to<'a, W>(&'a mut self, writer: W) -> CopyFuture<'a> where - T: Read, + W: AsyncWrite + Unpin + 'a, { - crate::text::Decoder::for_response(&self).decode_reader(self.body_mut()) + CopyFuture(Box::pin(async move { + futures_lite::io::copy(self.body_mut(), writer).await + })) } #[cfg(feature = "text-decoding")] - fn text_async(&mut self) -> crate::text::TextFuture<'_, &mut T> - where - T: AsyncRead + Unpin, - { + fn text(&mut self) -> crate::text::TextFuture<'_, &mut T> { crate::text::Decoder::for_response(&self).decode_reader_async(self.body_mut()) } +} - #[cfg(feature = "json")] - fn json(&mut self) -> Result - where - D: serde::de::DeserializeOwned, - T: Read, - { - serde_json::from_reader(self.body_mut()) +/// A future which copies all the response body bytes into a sink. +#[allow(missing_debug_implementations)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct CopyFuture<'a>(Pin> + 'a>>); + +impl Future for CopyFuture<'_> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.as_mut().poll(cx) } } diff --git a/src/text.rs b/src/text.rs index 98d4d40d..878b57cc 100644 --- a/src/text.rs +++ b/src/text.rs @@ -2,6 +2,7 @@ #![cfg(feature = "text-decoding")] +use crate::headers::HasHeaders; use encoding_rs::{CoderResult, Encoding}; use futures_lite::io::{AsyncRead, AsyncReadExt}; use http::Response; @@ -39,6 +40,7 @@ macro_rules! decode_reader { /// A future returning a response body decoded as text. #[allow(missing_debug_implementations)] +#[must_use = "futures do nothing unless you `.await` or poll them"] pub struct TextFuture<'a, R> { inner: Pin> + 'a>>, _phantom: PhantomData, @@ -80,10 +82,7 @@ impl Decoder { /// Create a new encoder suitable for decoding the given response. pub(crate) fn for_response(response: &Response) -> Self { - if let Some(content_type) = response - .headers() - .get(http::header::CONTENT_TYPE) - .and_then(|header| header.to_str().ok()) + if let Some(content_type) = response.content_type() .and_then(|header| header.parse::().ok()) { if let Some(charset) = content_type.get_param(mime::CHARSET) { @@ -156,7 +155,7 @@ impl Decoder { #[cfg(test)] mod tests { use super::*; - use crate::Body; + use crate::body::Body; static_assertions::assert_impl_all!(TextFuture<'_, &mut Body>: Send); diff --git a/tests/redirects.rs b/tests/redirects.rs index 195fb5ee..287c1c9c 100644 --- a/tests/redirects.rs +++ b/tests/redirects.rs @@ -1,4 +1,5 @@ use isahc::{ + Body, config::RedirectPolicy, prelude::*, }; diff --git a/tests/request_body.rs b/tests/request_body.rs index 0a2c15c7..2547b6d5 100644 --- a/tests/request_body.rs +++ b/tests/request_body.rs @@ -1,8 +1,17 @@ -use isahc::prelude::*; -use isahc::Body; +use futures_lite::{future::block_on, AsyncRead}; +use isahc::{prelude::*, AsyncBody, Body}; +use std::{ + error::Error, + io::{self, Read}, + pin::Pin, + task::{Context, Poll}, +}; use test_case::test_case; use testserver::mock; +#[macro_use] +mod utils; + #[test_case("GET")] #[test_case("HEAD")] #[test_case("POST")] @@ -82,3 +91,59 @@ fn content_length_header_takes_precedence_over_body_objects_length(method: &str) m.request().expect_header("content-length", "3"); m.request().expect_body("abc"); // truncated to 3 bytes } + +#[test] +fn upload_from_bad_reader_returns_error_with_original_cause() { + let m = mock!(); + + struct BadReader; + + impl Read for BadReader { + fn read(&mut self, _buf: &mut [u8]) -> io::Result { + Err(io::ErrorKind::UnexpectedEof.into()) + } + } + + let result = isahc::put(m.url(), Body::from_reader(BadReader)); + + assert_matches!(&result, Err(e) if e.kind() == isahc::error::ErrorKind::Io); + assert_eq!( + result + .unwrap_err() + .source() + .unwrap() + .downcast_ref::() + .unwrap() + .kind(), + io::ErrorKind::UnexpectedEof + ); +} + +#[test] +fn upload_from_bad_async_reader_returns_error_with_original_cause() { + let m = mock!(); + + struct BadReader; + + impl AsyncRead for BadReader { + fn poll_read(self: Pin<&mut Self>, _cx: &mut Context<'_>, _buf: &mut [u8]) -> Poll> { + Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into())) + } + } + + let result = block_on(async { + isahc::put_async(m.url(), AsyncBody::from_reader(BadReader)).await + }); + + assert_matches!(&result, Err(e) if e.kind() == isahc::error::ErrorKind::Io); + assert_eq!( + result + .unwrap_err() + .source() + .unwrap() + .downcast_ref::() + .unwrap() + .kind(), + io::ErrorKind::UnexpectedEof + ); +}