Skip to content

Commit

Permalink
Fixed ready() deprecation warnings from tower upgrade
Browse files Browse the repository at this point in the history
`ready()` is deprecated, used `ready_and()` and `oneshot` where
appropriate.
  • Loading branch information
sdbondi committed Mar 25, 2020
1 parent 50873b8 commit 56194e9
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 19 deletions.
18 changes: 7 additions & 11 deletions base_layer/service_framework/src/reply_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,9 @@ mod test {
#[test]
fn requestor_call() {
let (tx, rx) = mpsc::unbounded();
let mut requestor = SenderService::<_, _>::new(tx);
let requestor = SenderService::<_, _>::new(tx);

block_on(requestor.ready()).unwrap();
let fut = future::join(requestor.call("PING"), reply(rx, "PONG"));
let fut = future::join(requestor.oneshot("PING"), reply(rx, "PONG"));

let msg = block_on(fut.map(|(r, _)| r.unwrap()));
assert_eq!(msg, "PONG");
Expand All @@ -276,13 +275,12 @@ mod test {

#[test]
fn request_response_request_abort() {
let (mut requestor, mut request_stream) = super::unbounded::<_, &str>();
let (requestor, mut request_stream) = super::unbounded::<_, &str>();

block_on(future::join(
async move {
requestor.ready().await.unwrap();
// `_` drops the response receiver, so when a reply is sent it will fail
let _ = requestor.call("PING");
let _ = requestor.oneshot("PING");
},
async move {
let a = request_stream.next().await.unwrap();
Expand All @@ -298,8 +296,7 @@ mod test {

block_on(future::join(
async move {
requestor.ready().await.unwrap();
let err = requestor.call("PING").await.unwrap_err();
let err = requestor.ready_and().await.unwrap().call("PING").await.unwrap_err();
assert_eq!(err, TransportChannelError::Canceled);
},
async move {
Expand All @@ -311,10 +308,9 @@ mod test {

#[test]
fn request_response_success() {
let (mut requestor, mut request_stream) = super::unbounded::<_, &str>();
let (requestor, mut request_stream) = super::unbounded::<_, &str>();

block_on(requestor.ready()).unwrap();
let (result, _) = block_on(future::join(requestor.call("PING"), async move {
let (result, _) = block_on(future::join(requestor.oneshot("PING"), async move {
let req = request_stream.next().await.unwrap();
req.reply("PONG").unwrap();
}));
Expand Down
9 changes: 6 additions & 3 deletions comms/dht/src/inbound/deserialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ where
S: Service<DhtInboundMessage, Response = ()>,
S::Error: std::error::Error + Send + Sync + 'static,
{
pub async fn deserialize(mut next_service: S, message: InboundMessage) -> Result<(), PipelineError> {
pub async fn deserialize(next_service: S, message: InboundMessage) -> Result<(), PipelineError> {
trace!(target: LOG_TARGET, "Deserializing InboundMessage");
next_service.ready().await.map_err(PipelineError::from_debug)?;

let InboundMessage {
source_peer, mut body, ..
Expand Down Expand Up @@ -100,7 +99,11 @@ where
source_peer,
dht_envelope.body,
);
next_service.call(inbound_msg).await.map_err(PipelineError::from_debug)

next_service
.oneshot(inbound_msg)
.await
.map_err(PipelineError::from_debug)
},
Err(err) => {
error!(target: LOG_TARGET, "DHT deserialization failed: {}", err);
Expand Down
7 changes: 2 additions & 5 deletions comms/dht/src/logging_middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use futures::{task::Context, Future};
use futures::{task::Context, Future, TryFutureExt};
use log::*;
use std::{fmt::Display, marker::PhantomData, task::Poll};
use tower::{layer::Layer, Service, ServiceExt};
Expand Down Expand Up @@ -88,9 +88,6 @@ where
fn call(&mut self, msg: R) -> Self::Future {
debug!(target: LOG_TARGET, "{}{}", self.prefix_msg, msg);
let mut inner = self.inner.clone();
async move {
inner.ready().await?;
inner.call(msg).await
}
async move { inner.ready_and().and_then(|s| s.call(msg)).await.map_err(Into::into) }
}
}

0 comments on commit 56194e9

Please sign in to comment.