diff --git a/Cargo.toml b/Cargo.toml index 37915295db..93da57819a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ qp2p = "~0.11.9" rand = "~0.7.3" rand_chacha = "~0.2.2" resource_proof = "0.8.0" -sn_messaging = "25.0.0" +sn_messaging = "25.1.0" sn_data_types = "~0.18.3" thiserror = "1.0.23" tokio = "1.3.0" diff --git a/src/routing/comm.rs b/src/routing/comm.rs index 57eb1ab43f..6b5a007f8f 100644 --- a/src/routing/comm.rs +++ b/src/routing/comm.rs @@ -13,6 +13,7 @@ use futures::stream::{FuturesUnordered, StreamExt}; use hex_fmt::HexFmt; use qp2p::{Endpoint, QuicP2p}; use sn_messaging::MessageType; +use sn_messaging::WireMsg; use std::{ fmt::{self, Debug, Formatter}, net::SocketAddr, @@ -121,6 +122,7 @@ impl Comm { mut msg: MessageType, ) -> Result<(), Error> { msg.update_dest_info(None, Some(recipient.0)); + let bytes = msg.serialize()?; self.endpoint .send_message(bytes, &recipient.1) @@ -128,7 +130,9 @@ impl Comm { .map_err(|err| { error!("Sending to {:?} failed with {}", recipient, err); Error::FailedSend(recipient.1, recipient.0) - }) + })?; + + Ok(()) } /// Tests whether the peer is reachable. @@ -190,12 +194,13 @@ impl Comm { let delivery_group_size = delivery_group_size.min(recipients.len()); + let wire_msg = msg.to_wire_msg()?; // Run all the sends concurrently (using `FuturesUnordered`). If any of them fails, pick // the next recipient and try to send to them. Proceed until the needed number of sends // succeeds or if there are no more recipients to pick. - let send = |recipient: (XorName, SocketAddr), mut msg: MessageType| async move { - msg.update_dest_info(None, Some(recipient.0)); - match msg.serialize() { + let send = |recipient: (XorName, SocketAddr), mut wire_msg: WireMsg| async move { + wire_msg.update_dest_info(None, Some(recipient.0)); + match wire_msg.serialize() { Ok(bytes) => { trace!( "Sending message ({} bytes) to {} of {:?}", @@ -223,7 +228,7 @@ impl Comm { let mut tasks: FuturesUnordered<_> = recipients[0..delivery_group_size] .iter() - .map(|(name, recipient)| send((*name, *recipient), msg.clone())) + .map(|(name, recipient)| send((*name, *recipient), wire_msg.clone())) .collect(); let mut next = delivery_group_size; @@ -242,7 +247,7 @@ impl Comm { failed_recipients.push(addr); if next < recipients.len() { - tasks.push(send(recipients[next], msg.clone())); + tasks.push(send(recipients[next], wire_msg.clone())); next += 1; } }