Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move SdkBody and bytestream into aws-smithy-types #3026

Merged
merged 18 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aws/rust-runtime/aws-config/external-types.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ allowed_external_types = [
"aws_smithy_async::rt::sleep::SharedAsyncSleep",
"aws_smithy_async::time::SharedTimeSource",
"aws_smithy_async::time::TimeSource",
"aws_smithy_http::body::SdkBody",
"aws_smithy_types::body::SdkBody",
"aws_smithy_http::endpoint",
"aws_smithy_http::endpoint::error::InvalidEndpointError",
"aws_smithy_http::result::SdkError",
Expand Down
2 changes: 1 addition & 1 deletion aws/rust-runtime/aws-http/external-types.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
allowed_external_types = [
"aws_smithy_http::body::Error",
"aws_smithy_types::body::Error",
"aws_smithy_types::config_bag::storable::Storable",
"aws_smithy_types::config_bag::storable::StoreReplace",
"aws_types::app_name::AppName",
Expand Down
1 change: 1 addition & 0 deletions aws/rust-runtime/aws-sig-auth/external-types.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ allowed_external_types = [
"aws_sigv4::http_request::sign::SignableBody",
"aws_sigv4::http_request::error::SigningError",
"aws_smithy_http::*",
"aws_smithy_types::body::SdkBody",
"aws_types::*",
"http::request::Request",
"aws_smithy_runtime_api::client::identity::Identity",
Expand Down
2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-checksums/external-types.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
allowed_external_types = [
"aws_smithy_http::*",
"aws_smithy_types::body::SdkBody",
"bytes::bytes::Bytes",
"http::header::map::HeaderMap",
"http::header::name::HeaderName",
Expand Down
15 changes: 2 additions & 13 deletions rust-runtime/aws-smithy-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ license = "Apache-2.0"
repository = "https://github.com/awslabs/smithy-rs"

[features]
rt-tokio = ["dep:tokio-util", "dep:tokio", "tokio?/rt", "tokio?/fs", "tokio?/io-util", "tokio-util?/io"]
event-stream = ["aws-smithy-eventstream"]
rt-tokio = ["aws-smithy-types/rt-tokio"]

[dependencies]
aws-smithy-eventstream = { path = "../aws-smithy-eventstream", optional = true }
Expand All @@ -27,30 +27,19 @@ pin-project-lite = "0.2.9"
pin-utils = "0.1.0"
tracing = "0.1"

# We are using hyper for our streaming body implementation, but this is an internal detail.
hyper = "0.14.26"

# ByteStream internals
# For an adapter to enable the `Stream` trait for `aws_smithy_types::byte_stream::ByteStream`
futures-core = "0.3.14"
tokio = { version = "1.23.1", optional = true }
tokio-util = { version = "0.7", optional = true }

[dev-dependencies]
async-stream = "0.3"
futures-util = { version = "0.3.16", default-features = false }
hyper = { version = "0.14.26", features = ["stream"] }
pretty_assertions = "1.3"
proptest = "1"
tokio = { version = "1.23.1", features = [
"macros",
"rt",
"rt-multi-thread",
"fs",
"io-util",
] }
tokio-stream = "0.1.5"
tempfile = "3.2.0"
tracing-test = "0.2.1"

[package.metadata.docs.rs]
all-features = true
Expand Down
13 changes: 1 addition & 12 deletions rust-runtime/aws-smithy-http/external-types.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
allowed_external_types = [
"aws_smithy_types::*",
"bytes::buf::buf_impl::Buf",
"bytes::bytes::Bytes",
"http::error::Error",
"http::header::map::HeaderMap",
Expand All @@ -12,20 +11,10 @@ allowed_external_types = [
"http::response::Builder",
"http::response::Response",
"http::uri::Uri",
"http::version::Version",
"http_body::Body",
"http_body::combinators::box_body::BoxBody",
"hyper::body::body::Body",

# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Feature gate Tokio `AsyncRead`
"tokio::io::async_read::AsyncRead",

# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Switch to AsyncIterator once standardized
# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Once tooling permits it, only allow the following types in the `event-stream` feature
"futures_core::stream::Stream",
ysaito1001 marked this conversation as resolved.
Show resolved Hide resolved

# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Feature gate references to Tokio `File`
"tokio::fs::file::File",

# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Once tooling permits it, only allow the following types in the `event-stream` feature
"aws_smithy_eventstream::*",
]
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use aws_smithy_eventstream::frame::{
use bytes::Buf;
use bytes::Bytes;
use bytes_utils::SegmentedBuf;
use hyper::body::HttpBody;
use http_body::Body;
use std::error::Error as StdError;
use std::fmt;
use std::marker::PhantomData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::task::{Context, Poll};
/// new-type to enable the trait when it is required.
///
/// This is meant to be used by codegen code, and users should not need to use it directly.
#[derive(Debug)]
pub struct FuturesStreamCompatByteStream(ByteStream);

impl FuturesStreamCompatByteStream {
Expand Down
8 changes: 5 additions & 3 deletions rust-runtime/aws-smithy-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
#![allow(clippy::derive_partial_eq_without_eq)]
#![cfg_attr(docsrs, feature(doc_cfg))]

pub mod body;
//TODO(runtimeCratesVersioningCleanup): Re-point those who use the following reexports to
// directly depend on `aws_smithy_types` and remove the reexports below.
pub use aws_smithy_types::body;
pub use aws_smithy_types::byte_stream;
ysaito1001 marked this conversation as resolved.
Show resolved Hide resolved

pub mod endpoint;
// Marked as `doc(hidden)` because a type in the module is used both by this crate and by the code
// generator, but not by external users. Also, by the module being `doc(hidden)` instead of it being
Expand All @@ -45,7 +49,5 @@ pub mod result;
#[cfg(feature = "event-stream")]
pub mod event_stream;

pub mod byte_stream;

pub mod connection;
mod urlencode;
22 changes: 22 additions & 0 deletions rust-runtime/aws-smithy-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,30 @@ license = "Apache-2.0"
repository = "https://github.com/awslabs/smithy-rs"

[features]
rt-tokio = ["dep:tokio-util", "dep:tokio", "tokio?/rt", "tokio?/fs", "tokio?/io-util", "tokio-util?/io"]
test-util = []
serde-serialize = []
serde-deserialize = []

[dependencies]
base64-simd = "0.8"
bytes = "1"
bytes-utils = "0.1"
http = "0.2.3"
http-body = "0.4.4"
hyper = "0.14.26"
ysaito1001 marked this conversation as resolved.
Show resolved Hide resolved
itoa = "1.0.0"
num-integer = "0.1.44"
pin-project-lite = "0.2.9"
pin-utils = "0.1.0"
ysaito1001 marked this conversation as resolved.
Show resolved Hide resolved
ryu = "1.0.5"
time = { version = "0.3.4", features = ["parsing"] }

# ByteStream internals
futures-core = "0.3.14"
tokio = { version = "1.23.1", optional = true }
tokio-util = { version = "0.7", optional = true }

[dev-dependencies]
base64 = "0.13.0"
ciborium = { version = "0.2.1" }
Expand All @@ -31,6 +44,15 @@ proptest = "1"
rand = "0.8.4"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1.23.1", features = [
"macros",
"rt",
"rt-multi-thread",
"fs",
"io-util",
] }
tokio-stream = "0.1.5"
tempfile = "3.2.0"

[package.metadata.docs.rs]
all-features = true
Expand Down
13 changes: 13 additions & 0 deletions rust-runtime/aws-smithy-types/external-types.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,15 @@
allowed_external_types = [
"bytes::bytes::Bytes",
"bytes::buf::buf_impl::Buf",

# TODO(https://github.com/awslabs/smithy-rs/issues/3033): Feature gate based on unstable versions
"http_body::Body",
"http_body::combinators::box_body::BoxBody",
"hyper::body::body::Body",
ysaito1001 marked this conversation as resolved.
Show resolved Hide resolved

# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Feature gate Tokio `AsyncRead`
"tokio::io::async_read::AsyncRead",

# TODO(https://github.com/awslabs/smithy-rs/issues/1193): Feature gate references to Tokio `File`
"tokio::fs::file::File",
]
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl SdkBody {
}
}

/// Update this `SdkBody` with `map`. **This function MUST NOT alert the data of the body.**
/// Update this `SdkBody` with `map`. **This function MUST NOT alter the data of the body.**
///
/// This function is useful for adding metadata like progress tracking to an [`SdkBody`] that
/// does not alter the actual byte data. If your mapper alters the contents of the body, use [`SdkBody::map`]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//!
//! ### Writing a ByteStream into a file:
//! ```no_run
//! use aws_smithy_http::byte_stream::ByteStream;
//! use aws_smithy_types::byte_stream::ByteStream;
//! use std::error::Error;
//! use tokio::fs::File;
//! use tokio::io::AsyncWriteExt;
Expand All @@ -34,7 +34,7 @@
//! ### Converting a ByteStream into Bytes
//! ```no_run
//! use bytes::Bytes;
//! use aws_smithy_http::byte_stream::ByteStream;
//! use aws_smithy_types::byte_stream::ByteStream;
//! use std::error::Error;
//! struct SynthesizeSpeechOutput {
//! audio_stream: ByteStream,
Expand All @@ -53,7 +53,7 @@
//!
//! ```no_run
//! use bytes::{Buf, Bytes};
//! use aws_smithy_http::byte_stream::ByteStream;
//! use aws_smithy_types::byte_stream::ByteStream;
//! use std::error::Error;
//! use tokio::fs::File;
//! use tokio::io::AsyncWriteExt;
Expand Down Expand Up @@ -83,7 +83,7 @@
//! ```no_run
//! # #[cfg(feature = "rt-tokio")]
//! # {
//! use aws_smithy_http::byte_stream::ByteStream;
//! use aws_smithy_types::byte_stream::ByteStream;
//! use std::path::Path;
//! struct GetObjectInput {
//! body: ByteStream
Expand All @@ -104,7 +104,7 @@
//! ```no_run
//! # #[cfg(feature = "rt-tokio")]
//! # {
//! use aws_smithy_http::byte_stream::{ByteStream, Length};
//! use aws_smithy_types::byte_stream::{ByteStream, Length};
//! use std::path::Path;
//! struct GetObjectInput {
//! body: ByteStream
Expand Down Expand Up @@ -157,8 +157,8 @@ pin_project! {
/// [`.collect()`](crate::byte_stream::ByteStream::collect) reads the complete ByteStream into memory and stores it in `AggregatedBytes`,
/// a non-contiguous ByteBuffer.
/// ```no_run
/// use aws_smithy_http::byte_stream::{ByteStream, AggregatedBytes};
/// use aws_smithy_http::body::SdkBody;
/// use aws_smithy_types::byte_stream::{ByteStream, AggregatedBytes};
/// use aws_smithy_types::body::SdkBody;
/// use bytes::Buf;
/// async fn example() {
/// let stream = ByteStream::new(SdkBody::from("hello! This is some data"));
Expand All @@ -181,8 +181,8 @@ pin_project! {
/// # pub fn finish(&self) -> u64 { 6 }
/// # }
/// # }
/// use aws_smithy_http::byte_stream::{ByteStream, AggregatedBytes, error::Error};
/// use aws_smithy_http::body::SdkBody;
/// use aws_smithy_types::byte_stream::{ByteStream, AggregatedBytes, error::Error};
/// use aws_smithy_types::body::SdkBody;
///
/// async fn example() -> Result<(), Error> {
/// let mut stream = ByteStream::from(vec![1, 2, 3, 4, 5, 99]);
Expand All @@ -202,8 +202,8 @@ pin_project! {
/// It's possible to convert a `ByteStream` into a struct that implements [`tokio::io::AsyncRead`](tokio::io::AsyncRead).
/// Then, you can use pre-existing tools like [`tokio::io::BufReader`](tokio::io::BufReader):
/// ```no_run
/// use aws_smithy_http::byte_stream::ByteStream;
/// use aws_smithy_http::body::SdkBody;
/// use aws_smithy_types::byte_stream::ByteStream;
/// use aws_smithy_types::body::SdkBody;
/// use tokio::io::{AsyncBufReadExt, BufReader};
/// #[cfg(feature = "rt-tokio")]
/// async fn example() -> std::io::Result<()> {
Expand All @@ -224,7 +224,7 @@ pin_project! {
/// will be converted into `Bytes` enabling a cheap clone during retries.
/// ```no_run
/// use bytes::Bytes;
/// use aws_smithy_http::byte_stream::ByteStream;
/// use aws_smithy_types::byte_stream::ByteStream;
/// let stream = ByteStream::from(vec![1,2,3]);
/// let stream = ByteStream::from(Bytes::from_static(b"hello!"));
/// ```
Expand All @@ -233,7 +233,7 @@ pin_project! {
/// ```no_run
/// #[cfg(feature = "tokio-rt")]
/// # {
/// use aws_smithy_http::byte_stream::ByteStream;
/// use aws_smithy_types::byte_stream::ByteStream;
/// let stream = ByteStream::from_path("big_file.csv");
/// # }
/// ```
Expand All @@ -242,8 +242,8 @@ pin_project! {
/// from an SdkBody. **When created from an SdkBody, care must be taken to ensure retriability.** An SdkBody is retryable
/// when constructed from in-memory data or when using [`SdkBody::retryable`](crate::body::SdkBody::retryable).
/// ```no_run
/// use aws_smithy_http::byte_stream::ByteStream;
/// use aws_smithy_http::body::SdkBody;
/// use aws_smithy_types::byte_stream::ByteStream;
/// use aws_smithy_types::body::SdkBody;
/// use bytes::Bytes;
/// let (mut tx, channel_body) = hyper::Body::channel();
/// // this will not be retryable because the SDK has no way to replay this stream
Expand Down Expand Up @@ -322,9 +322,9 @@ impl ByteStream {
/// over the network. If a contiguous slice is required, use `into_bytes()`.
/// ```no_run
/// use bytes::Bytes;
/// use aws_smithy_http::body;
/// use aws_smithy_http::body::SdkBody;
/// use aws_smithy_http::byte_stream::{ByteStream, error::Error};
/// use aws_smithy_types::body;
/// use aws_smithy_types::body::SdkBody;
/// use aws_smithy_types::byte_stream::{ByteStream, error::Error};
/// async fn get_data() {
/// let stream = ByteStream::new(SdkBody::from("hello!"));
/// let data: Result<Bytes, Error> = stream.collect().await.map(|data| data.into_bytes());
Expand All @@ -339,7 +339,7 @@ impl ByteStream {
/// ```no_run
/// # #[cfg(feature = "rt-tokio")]
/// # {
/// use aws_smithy_http::byte_stream::{ByteStream, Length};
/// use aws_smithy_types::byte_stream::{ByteStream, Length};
///
/// async fn bytestream_from_file() -> ByteStream {
/// let bytestream = ByteStream::read_from()
Expand Down Expand Up @@ -379,7 +379,7 @@ impl ByteStream {
///
/// # Examples
/// ```no_run
/// use aws_smithy_http::byte_stream::ByteStream;
/// use aws_smithy_types::byte_stream::ByteStream;
/// use std::path::Path;
/// async fn make_bytestream() -> ByteStream {
/// ByteStream::from_path("docs/rows.csv").await.expect("file should be readable")
Expand Down Expand Up @@ -412,7 +412,7 @@ impl ByteStream {
///
/// ```rust
/// use tokio::io::{BufReader, AsyncBufReadExt};
/// use aws_smithy_http::byte_stream::ByteStream;
/// use aws_smithy_types::byte_stream::ByteStream;
///
/// # async fn dox(my_bytestream: ByteStream) -> std::io::Result<()> {
/// let mut lines = BufReader::new(my_bytestream.into_async_read()).lines();
Expand All @@ -423,9 +423,20 @@ impl ByteStream {
/// # }
/// ```
pub fn into_async_read(self) -> impl tokio::io::AsyncRead {
tokio_util::io::StreamReader::new(
crate::futures_stream_adapter::FuturesStreamCompatByteStream::new(self),
)
// The `Stream` trait is currently unstable so we can only use it in private.
// Here, we create a local struct just to enable the trait for `ByteStream` and pass it
// to `StreamReader`.
struct FuturesStreamCompatByteStream(ByteStream);
impl futures_core::stream::Stream for FuturesStreamCompatByteStream {
type Item = Result<Bytes, Error>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.0).poll_next(cx)
}
}
tokio_util::io::StreamReader::new(FuturesStreamCompatByteStream(self))
}

/// Given a function to modify an [`SdkBody`], run it on the `SdkBody` inside this `Bytestream`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl PathBody {
/// ```no_run
/// # #[cfg(feature = "rt-tokio")]
/// # {
/// use aws_smithy_http::byte_stream::{ByteStream, Length};
/// use aws_smithy_types::byte_stream::{ByteStream, Length};
/// use std::path::Path;
/// struct GetObjectInput {
/// body: ByteStream
Expand Down
Loading
Loading