Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(tests): Reduce futures crates #1448

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tests/compression/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ version = "0.1.0"

[dependencies]
bytes = "1"
futures = "0.3"
futures-core = "0.3"
http = "0.2"
http-body = "0.4"
hyper = "0.14.3"
pin-project = "1.0"
prost = "0.11"
tokio = {version = "1.0", features = ["macros", "rt-multi-thread", "net"]}
tokio-stream = "0.1"
tonic = {path = "../../tonic", features = ["gzip"]}
tower = {version = "0.4", features = []}
tower-http = {version = "0.4", features = ["map-response-body", "map-request-body"]}
Expand Down
4 changes: 2 additions & 2 deletions tests/compression/src/bidirectional_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn client_enabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand All @@ -48,7 +48,7 @@ async fn client_enabled_server_enabled() {
.accept_compressed(CompressionEncoding::Gzip);

let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
let stream = tokio_stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
let req = Request::new(stream);

let res = client
Expand Down
16 changes: 8 additions & 8 deletions tests/compression/src/client_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ async fn client_enabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand All @@ -39,7 +39,7 @@ async fn client_enabled_server_enabled() {
.send_compressed(CompressionEncoding::Gzip);

let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
let stream = tokio_stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
let req = Request::new(Box::pin(stream));

client.compress_input_client_stream(req).await.unwrap();
Expand Down Expand Up @@ -75,7 +75,7 @@ async fn client_disabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand All @@ -84,7 +84,7 @@ async fn client_disabled_server_enabled() {
let mut client = test_client::TestClient::new(mock_io_channel(client).await);

let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
let stream = tokio_stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
let req = Request::new(Box::pin(stream));

client.compress_input_client_stream(req).await.unwrap();
Expand All @@ -102,7 +102,7 @@ async fn client_enabled_server_disabled() {
tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
});
Expand All @@ -111,7 +111,7 @@ async fn client_enabled_server_disabled() {
.send_compressed(CompressionEncoding::Gzip);

let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
let stream = tokio_stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]);
let req = Request::new(Box::pin(stream));

let status = client.compress_input_client_stream(req).await.unwrap_err();
Expand Down Expand Up @@ -147,7 +147,7 @@ async fn compressing_response_from_client_stream() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand All @@ -156,7 +156,7 @@ async fn compressing_response_from_client_stream() {
let mut client = test_client::TestClient::new(mock_io_channel(client).await)
.accept_compressed(CompressionEncoding::Gzip);

let stream = futures::stream::iter(vec![]);
let stream = tokio_stream::iter(vec![]);
let req = Request::new(Box::pin(stream));

let res = client.compress_output_client_stream(req).await.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions tests/compression/src/compressing_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn client_enabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand Down Expand Up @@ -61,7 +61,7 @@ async fn client_enabled_server_disabled() {
tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
});
Expand Down Expand Up @@ -99,7 +99,7 @@ async fn client_mark_compressed_without_header_server_enabled() {
async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand Down
16 changes: 8 additions & 8 deletions tests/compression/src/compressing_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ async fn client_enabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand Down Expand Up @@ -94,7 +94,7 @@ async fn client_enabled_server_disabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand Down Expand Up @@ -160,7 +160,7 @@ async fn client_disabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand Down Expand Up @@ -198,7 +198,7 @@ async fn server_replying_with_unsupported_encoding() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
});
Expand Down Expand Up @@ -240,7 +240,7 @@ async fn disabling_compression_on_single_response() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand Down Expand Up @@ -281,7 +281,7 @@ async fn disabling_compression_on_response_but_keeping_compression_on_stream() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand Down Expand Up @@ -337,7 +337,7 @@ async fn disabling_compression_on_response_from_client_stream() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand All @@ -346,7 +346,7 @@ async fn disabling_compression_on_response_from_client_stream() {
let mut client = test_client::TestClient::new(mock_io_channel(client).await)
.accept_compressed(CompressionEncoding::Gzip);

let stream = futures::stream::iter(vec![]);
let stream = tokio_stream::iter(vec![]);
let req = Request::new(Box::pin(stream));

let res = client.compress_output_client_stream(req).await.unwrap();
Expand Down
6 changes: 3 additions & 3 deletions tests/compression/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

use self::util::*;
use crate::util::mock_io_channel;
use futures::{Stream, StreamExt};
use std::{
pin::Pin,
sync::{
Expand All @@ -11,6 +10,7 @@ use std::{
},
};
use tokio::net::TcpListener;
use tokio_stream::{Stream, StreamExt};
use tonic::{
transport::{Channel, Endpoint, Server, Uri},
Request, Response, Status, Streaming,
Expand Down Expand Up @@ -67,7 +67,7 @@ impl test_server::Test for Svc {
_req: Request<()>,
) -> Result<Response<Self::CompressOutputServerStreamStream>, Status> {
let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
let stream = futures::stream::repeat(SomeData { data })
let stream = tokio_stream::iter(std::iter::repeat(SomeData { data }))
.take(2)
.map(Ok::<_, Status>);
Ok(self.prepare_response(Response::new(Box::pin(stream))))
Expand Down Expand Up @@ -113,7 +113,7 @@ impl test_server::Test for Svc {
}

let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec();
let stream = futures::stream::repeat(SomeData { data })
let stream = tokio_stream::iter(std::iter::repeat(SomeData { data }))
.take(2)
.map(Ok::<_, Status>);
Ok(self.prepare_response(Response::new(Box::pin(stream))))
Expand Down
6 changes: 3 additions & 3 deletions tests/compression/src/server_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn client_enabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand Down Expand Up @@ -80,7 +80,7 @@ async fn client_disabled_server_enabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand Down Expand Up @@ -125,7 +125,7 @@ async fn client_enabled_server_disabled() {
.into_inner(),
)
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
}
Expand Down
2 changes: 1 addition & 1 deletion tests/compression/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::*;
use bytes::Bytes;
use futures::ready;
use futures_core::ready;
use http_body::Body;
use pin_project::pin_project;
use std::{
Expand Down
5 changes: 2 additions & 3 deletions tests/integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ version = "0.1.0"

[dependencies]
bytes = "1.0"
futures-util = "0.3"
futures-util = {version="0.3", default-features =false}
prost = "0.11"
tokio = {version = "1.0", features = ["macros", "rt-multi-thread", "net"]}
tokio = {version = "1.0", features = ["macros", "rt-multi-thread", "net", "sync"]}
tonic = {path = "../../tonic"}
tracing-subscriber = {version = "0.3"}

[dev-dependencies]
async-stream = "0.3"
futures = "0.3"
http = "0.2"
http-body = "0.4"
hyper = "0.14"
Expand Down
3 changes: 2 additions & 1 deletion tests/integration_tests/tests/client_layer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::time::Duration;

use futures::{channel::oneshot, FutureExt};
use futures_util::FutureExt;
use http::{header::HeaderName, HeaderValue};
use integration_tests::pb::{test_client::TestClient, test_server, Input, Output};
use tokio::sync::oneshot;
use tonic::{
transport::{Endpoint, Server},
Request, Response, Status,
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/tests/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ where
{
type Response = S::Response;
type Error = S::Error;
type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
type Future = futures_util::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
Expand Down
3 changes: 2 additions & 1 deletion tests/integration_tests/tests/interceptor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::time::Duration;

use futures::{channel::oneshot, FutureExt};
use futures_util::FutureExt;
use integration_tests::pb::{test_client::TestClient, test_server, Input, Output};
use tokio::sync::oneshot;
use tonic::{
transport::{Endpoint, Server},
GrpcMethod, Request, Response, Status,
Expand Down
8 changes: 4 additions & 4 deletions tests/integration_tests/tests/max_message_size.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::pin::Pin;

use futures::{stream, Stream};
use integration_tests::{
pb::{test1_client, test1_server, Input1, Output1},
trace_init,
};
use tokio_stream::Stream;
use tonic::{
transport::{Endpoint, Server},
Code, Request, Response, Status,
Expand Down Expand Up @@ -137,7 +137,7 @@ async fn response_stream_limit() {
let blob = Output1 {
buf: vec![0; 6877902],
};
let stream = stream::iter(vec![Ok(blob.clone()), Ok(blob.clone())]);
let stream = tokio_stream::iter(vec![Ok(blob.clone()), Ok(blob.clone())]);

Ok(Response::new(Box::pin(stream)))
}
Expand All @@ -148,7 +148,7 @@ async fn response_stream_limit() {
tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
});
Expand Down Expand Up @@ -317,7 +317,7 @@ async fn max_message_run(case: &TestCase) -> Result<(), Status> {
tokio::spawn(async move {
Server::builder()
.add_service(svc)
.serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)]))
.await
.unwrap();
});
Expand Down
3 changes: 1 addition & 2 deletions tests/integration_tests/tests/origin.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use futures::future::BoxFuture;
use futures_util::FutureExt;
use futures_util::{future::BoxFuture, FutureExt};
use integration_tests::pb::test_client;
use integration_tests::pb::{test_server, Input, Output};
use std::task::Context;
Expand Down
7 changes: 4 additions & 3 deletions tests/integration_tests/tests/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ async fn status_with_metadata() {
jh.await.unwrap();
}

type Stream<T> =
std::pin::Pin<Box<dyn futures::Stream<Item = std::result::Result<T, Status>> + Send + 'static>>;
type Stream<T> = std::pin::Pin<
Box<dyn tokio_stream::Stream<Item = std::result::Result<T, Status>> + Send + 'static>,
>;

#[tokio::test]
async fn status_from_server_stream() {
Expand All @@ -142,7 +143,7 @@ async fn status_from_server_stream() {
&self,
_: Request<InputStream>,
) -> Result<Response<Self::StreamCallStream>, Status> {
let s = futures::stream::iter(vec![
let s = tokio_stream::iter(vec![
Err::<OutputStream, _>(Status::unavailable("foo")),
Err::<OutputStream, _>(Status::unavailable("bar")),
]);
Expand Down
Loading
Loading