diff --git a/tests/compression/Cargo.toml b/tests/compression/Cargo.toml index 5086679a8..c795d0868 100644 --- a/tests/compression/Cargo.toml +++ b/tests/compression/Cargo.toml @@ -8,13 +8,14 @@ version = "0.1.0" [dependencies] bytes = "1" -futures = "0.3" +futures-core = "0.3" http = "0.2" http-body = "0.4" hyper = "0.14.3" pin-project = "1.0" prost = "0.11" tokio = {version = "1.0", features = ["macros", "rt-multi-thread", "net"]} +tokio-stream = "0.1" tonic = {path = "../../tonic", features = ["gzip"]} tower = {version = "0.4", features = []} tower-http = {version = "0.4", features = ["map-response-body", "map-request-body"]} diff --git a/tests/compression/src/bidirectional_stream.rs b/tests/compression/src/bidirectional_stream.rs index a92783c34..4abb4d10d 100644 --- a/tests/compression/src/bidirectional_stream.rs +++ b/tests/compression/src/bidirectional_stream.rs @@ -37,7 +37,7 @@ async fn client_enabled_server_enabled() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await .unwrap(); } @@ -48,7 +48,7 @@ async fn client_enabled_server_enabled() { .accept_compressed(CompressionEncoding::Gzip); let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(); - let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]); + let stream = tokio_stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]); let req = Request::new(stream); let res = client diff --git a/tests/compression/src/client_stream.rs b/tests/compression/src/client_stream.rs index a749c2b58..99d72751b 100644 --- a/tests/compression/src/client_stream.rs +++ b/tests/compression/src/client_stream.rs @@ -29,7 +29,7 @@ async fn client_enabled_server_enabled() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await .unwrap(); } @@ -39,7 +39,7 @@ async fn client_enabled_server_enabled() { .send_compressed(CompressionEncoding::Gzip); let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(); - let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]); + let stream = tokio_stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]); let req = Request::new(Box::pin(stream)); client.compress_input_client_stream(req).await.unwrap(); @@ -75,7 +75,7 @@ async fn client_disabled_server_enabled() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await .unwrap(); } @@ -84,7 +84,7 @@ async fn client_disabled_server_enabled() { let mut client = test_client::TestClient::new(mock_io_channel(client).await); let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(); - let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]); + let stream = tokio_stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]); let req = Request::new(Box::pin(stream)); client.compress_input_client_stream(req).await.unwrap(); @@ -102,7 +102,7 @@ async fn client_enabled_server_disabled() { tokio::spawn(async move { Server::builder() .add_service(svc) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await .unwrap(); }); @@ -111,7 +111,7 @@ async fn client_enabled_server_disabled() { .send_compressed(CompressionEncoding::Gzip); let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(); - let stream = futures::stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]); + let stream = tokio_stream::iter(vec![SomeData { data: data.clone() }, SomeData { data }]); let req = Request::new(Box::pin(stream)); let status = client.compress_input_client_stream(req).await.unwrap_err(); @@ -147,7 +147,7 @@ async fn compressing_response_from_client_stream() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await .unwrap(); } @@ -156,7 +156,7 @@ async fn compressing_response_from_client_stream() { let mut client = test_client::TestClient::new(mock_io_channel(client).await) .accept_compressed(CompressionEncoding::Gzip); - let stream = futures::stream::iter(vec![]); + let stream = tokio_stream::iter(vec![]); let req = Request::new(Box::pin(stream)); let res = client.compress_output_client_stream(req).await.unwrap(); diff --git a/tests/compression/src/compressing_request.rs b/tests/compression/src/compressing_request.rs index dd0536091..7cdfd7cec 100644 --- a/tests/compression/src/compressing_request.rs +++ b/tests/compression/src/compressing_request.rs @@ -31,7 +31,7 @@ async fn client_enabled_server_enabled() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await .unwrap(); } @@ -61,7 +61,7 @@ async fn client_enabled_server_disabled() { tokio::spawn(async move { Server::builder() .add_service(svc) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await .unwrap(); }); @@ -99,7 +99,7 @@ async fn client_mark_compressed_without_header_server_enabled() { async move { Server::builder() .add_service(svc) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await .unwrap(); } diff --git a/tests/compression/src/compressing_response.rs b/tests/compression/src/compressing_response.rs index 5c1cb9fa9..cc5d4f4cd 100644 --- a/tests/compression/src/compressing_response.rs +++ b/tests/compression/src/compressing_response.rs @@ -53,7 +53,7 @@ async fn client_enabled_server_enabled() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await .unwrap(); } @@ -94,7 +94,7 @@ async fn client_enabled_server_disabled() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await .unwrap(); } @@ -160,7 +160,7 @@ async fn client_disabled() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await .unwrap(); } @@ -198,7 +198,7 @@ async fn server_replying_with_unsupported_encoding() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await .unwrap(); }); @@ -240,7 +240,7 @@ async fn disabling_compression_on_single_response() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await .unwrap(); } @@ -281,7 +281,7 @@ async fn disabling_compression_on_response_but_keeping_compression_on_stream() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await .unwrap(); } @@ -337,7 +337,7 @@ async fn disabling_compression_on_response_from_client_stream() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await .unwrap(); } @@ -346,7 +346,7 @@ async fn disabling_compression_on_response_from_client_stream() { let mut client = test_client::TestClient::new(mock_io_channel(client).await) .accept_compressed(CompressionEncoding::Gzip); - let stream = futures::stream::iter(vec![]); + let stream = tokio_stream::iter(vec![]); let req = Request::new(Box::pin(stream)); let res = client.compress_output_client_stream(req).await.unwrap(); diff --git a/tests/compression/src/lib.rs b/tests/compression/src/lib.rs index 00d735ec3..02f729b60 100644 --- a/tests/compression/src/lib.rs +++ b/tests/compression/src/lib.rs @@ -2,7 +2,6 @@ use self::util::*; use crate::util::mock_io_channel; -use futures::{Stream, StreamExt}; use std::{ pin::Pin, sync::{ @@ -11,6 +10,7 @@ use std::{ }, }; use tokio::net::TcpListener; +use tokio_stream::{Stream, StreamExt}; use tonic::{ transport::{Channel, Endpoint, Server, Uri}, Request, Response, Status, Streaming, @@ -67,7 +67,7 @@ impl test_server::Test for Svc { _req: Request<()>, ) -> Result, Status> { let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(); - let stream = futures::stream::repeat(SomeData { data }) + let stream = tokio_stream::iter(std::iter::repeat(SomeData { data })) .take(2) .map(Ok::<_, Status>); Ok(self.prepare_response(Response::new(Box::pin(stream)))) @@ -113,7 +113,7 @@ impl test_server::Test for Svc { } let data = [0_u8; UNCOMPRESSED_MIN_BODY_SIZE].to_vec(); - let stream = futures::stream::repeat(SomeData { data }) + let stream = tokio_stream::iter(std::iter::repeat(SomeData { data })) .take(2) .map(Ok::<_, Status>); Ok(self.prepare_response(Response::new(Box::pin(stream)))) diff --git a/tests/compression/src/server_stream.rs b/tests/compression/src/server_stream.rs index 2ec52bb08..453c055c8 100644 --- a/tests/compression/src/server_stream.rs +++ b/tests/compression/src/server_stream.rs @@ -26,7 +26,7 @@ async fn client_enabled_server_enabled() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await .unwrap(); } @@ -80,7 +80,7 @@ async fn client_disabled_server_enabled() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await .unwrap(); } @@ -125,7 +125,7 @@ async fn client_enabled_server_disabled() { .into_inner(), ) .add_service(svc) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await .unwrap(); } diff --git a/tests/compression/src/util.rs b/tests/compression/src/util.rs index 34fdc0f3a..57c3f7dbf 100644 --- a/tests/compression/src/util.rs +++ b/tests/compression/src/util.rs @@ -1,6 +1,6 @@ use super::*; use bytes::Bytes; -use futures::ready; +use futures_core::ready; use http_body::Body; use pin_project::pin_project; use std::{ diff --git a/tests/integration_tests/Cargo.toml b/tests/integration_tests/Cargo.toml index a44a74002..2fecfccba 100644 --- a/tests/integration_tests/Cargo.toml +++ b/tests/integration_tests/Cargo.toml @@ -10,15 +10,14 @@ version = "0.1.0" [dependencies] bytes = "1.0" -futures-util = "0.3" +futures-util = {version="0.3", default-features =false} prost = "0.11" -tokio = {version = "1.0", features = ["macros", "rt-multi-thread", "net"]} +tokio = {version = "1.0", features = ["macros", "rt-multi-thread", "net", "sync"]} tonic = {path = "../../tonic"} tracing-subscriber = {version = "0.3"} [dev-dependencies] async-stream = "0.3" -futures = "0.3" http = "0.2" http-body = "0.4" hyper = "0.14" diff --git a/tests/integration_tests/tests/client_layer.rs b/tests/integration_tests/tests/client_layer.rs index c33f388d7..0636b1b4f 100644 --- a/tests/integration_tests/tests/client_layer.rs +++ b/tests/integration_tests/tests/client_layer.rs @@ -1,8 +1,9 @@ use std::time::Duration; -use futures::{channel::oneshot, FutureExt}; +use futures_util::FutureExt; use http::{header::HeaderName, HeaderValue}; use integration_tests::pb::{test_client::TestClient, test_server, Input, Output}; +use tokio::sync::oneshot; use tonic::{ transport::{Endpoint, Server}, Request, Response, Status, diff --git a/tests/integration_tests/tests/extensions.rs b/tests/integration_tests/tests/extensions.rs index c60c98414..9da9bc939 100644 --- a/tests/integration_tests/tests/extensions.rs +++ b/tests/integration_tests/tests/extensions.rs @@ -120,7 +120,7 @@ where { type Response = S::Response; type Error = S::Error; - type Future = futures::future::BoxFuture<'static, Result>; + type Future = futures_util::future::BoxFuture<'static, Result>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.inner.poll_ready(cx) diff --git a/tests/integration_tests/tests/interceptor.rs b/tests/integration_tests/tests/interceptor.rs index 062e289c0..d9a6fdda5 100644 --- a/tests/integration_tests/tests/interceptor.rs +++ b/tests/integration_tests/tests/interceptor.rs @@ -1,7 +1,8 @@ use std::time::Duration; -use futures::{channel::oneshot, FutureExt}; +use futures_util::FutureExt; use integration_tests::pb::{test_client::TestClient, test_server, Input, Output}; +use tokio::sync::oneshot; use tonic::{ transport::{Endpoint, Server}, GrpcMethod, Request, Response, Status, diff --git a/tests/integration_tests/tests/max_message_size.rs b/tests/integration_tests/tests/max_message_size.rs index f79a41640..bda1e9d47 100644 --- a/tests/integration_tests/tests/max_message_size.rs +++ b/tests/integration_tests/tests/max_message_size.rs @@ -1,10 +1,10 @@ use std::pin::Pin; -use futures::{stream, Stream}; use integration_tests::{ pb::{test1_client, test1_server, Input1, Output1}, trace_init, }; +use tokio_stream::Stream; use tonic::{ transport::{Endpoint, Server}, Code, Request, Response, Status, @@ -137,7 +137,7 @@ async fn response_stream_limit() { let blob = Output1 { buf: vec![0; 6877902], }; - let stream = stream::iter(vec![Ok(blob.clone()), Ok(blob.clone())]); + let stream = tokio_stream::iter(vec![Ok(blob.clone()), Ok(blob.clone())]); Ok(Response::new(Box::pin(stream))) } @@ -148,7 +148,7 @@ async fn response_stream_limit() { tokio::spawn(async move { Server::builder() .add_service(svc) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await .unwrap(); }); @@ -317,7 +317,7 @@ async fn max_message_run(case: &TestCase) -> Result<(), Status> { tokio::spawn(async move { Server::builder() .add_service(svc) - .serve_with_incoming(futures::stream::iter(vec![Ok::<_, std::io::Error>(server)])) + .serve_with_incoming(tokio_stream::iter(vec![Ok::<_, std::io::Error>(server)])) .await .unwrap(); }); diff --git a/tests/integration_tests/tests/origin.rs b/tests/integration_tests/tests/origin.rs index 17bbc9cdd..5dd42f240 100644 --- a/tests/integration_tests/tests/origin.rs +++ b/tests/integration_tests/tests/origin.rs @@ -1,5 +1,4 @@ -use futures::future::BoxFuture; -use futures_util::FutureExt; +use futures_util::{future::BoxFuture, FutureExt}; use integration_tests::pb::test_client; use integration_tests::pb::{test_server, Input, Output}; use std::task::Context; diff --git a/tests/integration_tests/tests/status.rs b/tests/integration_tests/tests/status.rs index 8fcb2875b..d92718534 100644 --- a/tests/integration_tests/tests/status.rs +++ b/tests/integration_tests/tests/status.rs @@ -125,8 +125,9 @@ async fn status_with_metadata() { jh.await.unwrap(); } -type Stream = - std::pin::Pin> + Send + 'static>>; +type Stream = std::pin::Pin< + Box> + Send + 'static>, +>; #[tokio::test] async fn status_from_server_stream() { @@ -142,7 +143,7 @@ async fn status_from_server_stream() { &self, _: Request, ) -> Result, Status> { - let s = futures::stream::iter(vec![ + let s = tokio_stream::iter(vec![ Err::(Status::unavailable("foo")), Err::(Status::unavailable("bar")), ]); diff --git a/tests/integration_tests/tests/streams.rs b/tests/integration_tests/tests/streams.rs index de0c6b438..9b58d23cf 100644 --- a/tests/integration_tests/tests/streams.rs +++ b/tests/integration_tests/tests/streams.rs @@ -1,9 +1,10 @@ -use futures::FutureExt; +use futures_util::FutureExt; use integration_tests::pb::{test_stream_server, InputStream, OutputStream}; use tonic::{transport::Server, Request, Response, Status}; -type Stream = - std::pin::Pin> + Send + 'static>>; +type Stream = std::pin::Pin< + Box> + Send + 'static>, +>; #[tokio::test] async fn status_from_server_stream_with_source() { @@ -35,7 +36,7 @@ struct Unsync(*mut ()); unsafe impl Send for Unsync {} -impl futures::Stream for Unsync { +impl tokio_stream::Stream for Unsync { type Item = Result; fn poll_next( diff --git a/tests/use_arc_self/Cargo.toml b/tests/use_arc_self/Cargo.toml index c40cca878..7c2d09209 100644 --- a/tests/use_arc_self/Cargo.toml +++ b/tests/use_arc_self/Cargo.toml @@ -7,7 +7,7 @@ publish = false version = "0.1.0" [dependencies] -futures = "0.3" +tokio-stream = "0.1" prost = "0.11" tonic = {path = "../../tonic", features = ["gzip"]} diff --git a/tests/use_arc_self/src/lib.rs b/tests/use_arc_self/src/lib.rs index 4fb2fa46a..6b12b588e 100644 --- a/tests/use_arc_self/src/lib.rs +++ b/tests/use_arc_self/src/lib.rs @@ -1,7 +1,7 @@ #![allow(unused_imports)] -use futures::{Stream, StreamExt}; use std::sync::Arc; +use tokio_stream::{Stream, StreamExt}; use tonic::{Request, Response, Status}; tonic::include_proto!("test");