Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
fix: update message bytes directly for dest change
Browse files Browse the repository at this point in the history
reduces mem usage of repeated msg.serialise when we change destinations
  • Loading branch information
joshuef authored and oetyng committed May 26, 2021
1 parent d87fae8 commit d253690
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 7 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ qp2p = "~0.11.9"
rand = "~0.7.3"
rand_chacha = "~0.2.2"
resource_proof = "0.8.0"
sn_messaging = "25.0.0"
sn_messaging = "25.1.0"
sn_data_types = "~0.18.3"
thiserror = "1.0.23"
tokio = "1.3.0"
Expand Down
17 changes: 11 additions & 6 deletions src/routing/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use futures::stream::{FuturesUnordered, StreamExt};
use hex_fmt::HexFmt;
use qp2p::{Endpoint, QuicP2p};
use sn_messaging::MessageType;
use sn_messaging::WireMsg;
use std::{
fmt::{self, Debug, Formatter},
net::SocketAddr,
Expand Down Expand Up @@ -121,14 +122,17 @@ impl Comm {
mut msg: MessageType,
) -> Result<(), Error> {
msg.update_dest_info(None, Some(recipient.0));

let bytes = msg.serialize()?;
self.endpoint
.send_message(bytes, &recipient.1)
.await
.map_err(|err| {
error!("Sending to {:?} failed with {}", recipient, err);
Error::FailedSend(recipient.1, recipient.0)
})
})?;

Ok(())
}

/// Tests whether the peer is reachable.
Expand Down Expand Up @@ -190,12 +194,13 @@ impl Comm {

let delivery_group_size = delivery_group_size.min(recipients.len());

let wire_msg = msg.to_wire_msg()?;
// Run all the sends concurrently (using `FuturesUnordered`). If any of them fails, pick
// the next recipient and try to send to them. Proceed until the needed number of sends
// succeeds or if there are no more recipients to pick.
let send = |recipient: (XorName, SocketAddr), mut msg: MessageType| async move {
msg.update_dest_info(None, Some(recipient.0));
match msg.serialize() {
let send = |recipient: (XorName, SocketAddr), mut wire_msg: WireMsg| async move {
wire_msg.update_dest_info(None, Some(recipient.0));
match wire_msg.serialize() {
Ok(bytes) => {
trace!(
"Sending message ({} bytes) to {} of {:?}",
Expand Down Expand Up @@ -223,7 +228,7 @@ impl Comm {

let mut tasks: FuturesUnordered<_> = recipients[0..delivery_group_size]
.iter()
.map(|(name, recipient)| send((*name, *recipient), msg.clone()))
.map(|(name, recipient)| send((*name, *recipient), wire_msg.clone()))
.collect();

let mut next = delivery_group_size;
Expand All @@ -242,7 +247,7 @@ impl Comm {
failed_recipients.push(addr);

if next < recipients.len() {
tasks.push(send(recipients[next], msg.clone()));
tasks.push(send(recipients[next], wire_msg.clone()));
next += 1;
}
}
Expand Down

0 comments on commit d253690

Please sign in to comment.