From 56194e96b192d3cd83e4a4c6ccebe7e191bced8e Mon Sep 17 00:00:00 2001 From: Stanimal Date: Wed, 25 Mar 2020 12:50:19 +0200 Subject: [PATCH] Fixed `ready()` deprecation warnings from tower upgrade `ready()` is deprecated, used `ready_and()` and `oneshot` where appropriate. --- .../service_framework/src/reply_channel.rs | 18 +++++++----------- comms/dht/src/inbound/deserialize.rs | 9 ++++++--- comms/dht/src/logging_middleware.rs | 7 ++----- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/base_layer/service_framework/src/reply_channel.rs b/base_layer/service_framework/src/reply_channel.rs index ec0f75fc4a..25875474e9 100644 --- a/base_layer/service_framework/src/reply_channel.rs +++ b/base_layer/service_framework/src/reply_channel.rs @@ -255,10 +255,9 @@ mod test { #[test] fn requestor_call() { let (tx, rx) = mpsc::unbounded(); - let mut requestor = SenderService::<_, _>::new(tx); + let requestor = SenderService::<_, _>::new(tx); - block_on(requestor.ready()).unwrap(); - let fut = future::join(requestor.call("PING"), reply(rx, "PONG")); + let fut = future::join(requestor.oneshot("PING"), reply(rx, "PONG")); let msg = block_on(fut.map(|(r, _)| r.unwrap())); assert_eq!(msg, "PONG"); @@ -276,13 +275,12 @@ mod test { #[test] fn request_response_request_abort() { - let (mut requestor, mut request_stream) = super::unbounded::<_, &str>(); + let (requestor, mut request_stream) = super::unbounded::<_, &str>(); block_on(future::join( async move { - requestor.ready().await.unwrap(); // `_` drops the response receiver, so when a reply is sent it will fail - let _ = requestor.call("PING"); + let _ = requestor.oneshot("PING"); }, async move { let a = request_stream.next().await.unwrap(); @@ -298,8 +296,7 @@ mod test { block_on(future::join( async move { - requestor.ready().await.unwrap(); - let err = requestor.call("PING").await.unwrap_err(); + let err = requestor.ready_and().await.unwrap().call("PING").await.unwrap_err(); assert_eq!(err, TransportChannelError::Canceled); }, async move { @@ -311,10 +308,9 @@ mod test { #[test] fn request_response_success() { - let (mut requestor, mut request_stream) = super::unbounded::<_, &str>(); + let (requestor, mut request_stream) = super::unbounded::<_, &str>(); - block_on(requestor.ready()).unwrap(); - let (result, _) = block_on(future::join(requestor.call("PING"), async move { + let (result, _) = block_on(future::join(requestor.oneshot("PING"), async move { let req = request_stream.next().await.unwrap(); req.reply("PONG").unwrap(); })); diff --git a/comms/dht/src/inbound/deserialize.rs b/comms/dht/src/inbound/deserialize.rs index 17dc20dd99..18ea93004a 100644 --- a/comms/dht/src/inbound/deserialize.rs +++ b/comms/dht/src/inbound/deserialize.rs @@ -70,9 +70,8 @@ where S: Service, S::Error: std::error::Error + Send + Sync + 'static, { - pub async fn deserialize(mut next_service: S, message: InboundMessage) -> Result<(), PipelineError> { + pub async fn deserialize(next_service: S, message: InboundMessage) -> Result<(), PipelineError> { trace!(target: LOG_TARGET, "Deserializing InboundMessage"); - next_service.ready().await.map_err(PipelineError::from_debug)?; let InboundMessage { source_peer, mut body, .. @@ -100,7 +99,11 @@ where source_peer, dht_envelope.body, ); - next_service.call(inbound_msg).await.map_err(PipelineError::from_debug) + + next_service + .oneshot(inbound_msg) + .await + .map_err(PipelineError::from_debug) }, Err(err) => { error!(target: LOG_TARGET, "DHT deserialization failed: {}", err); diff --git a/comms/dht/src/logging_middleware.rs b/comms/dht/src/logging_middleware.rs index f0cb060e42..e6b906b1d8 100644 --- a/comms/dht/src/logging_middleware.rs +++ b/comms/dht/src/logging_middleware.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use futures::{task::Context, Future}; +use futures::{task::Context, Future, TryFutureExt}; use log::*; use std::{fmt::Display, marker::PhantomData, task::Poll}; use tower::{layer::Layer, Service, ServiceExt}; @@ -88,9 +88,6 @@ where fn call(&mut self, msg: R) -> Self::Future { debug!(target: LOG_TARGET, "{}{}", self.prefix_msg, msg); let mut inner = self.inner.clone(); - async move { - inner.ready().await?; - inner.call(msg).await - } + async move { inner.ready_and().and_then(|s| s.call(msg)).await.map_err(Into::into) } } }