Skip to content

Commit

Permalink
Fix larger than necessary allocations in streamer (#8187) (#8192)
Browse files Browse the repository at this point in the history
automerge
  • Loading branch information
mergify[bot] authored Feb 10, 2020
1 parent 7bd9501 commit 2f54f57
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 10 deletions.
43 changes: 36 additions & 7 deletions core/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use solana_metrics::inc_new_counter_debug;
pub use solana_sdk::packet::{Meta, Packet, PACKET_DATA_SIZE};
use std::{io::Result, net::UdpSocket, time::Instant};

pub fn recv_from(obj: &mut Packets, socket: &UdpSocket) -> Result<usize> {
pub fn recv_from(obj: &mut Packets, socket: &UdpSocket, max_wait_ms: usize) -> Result<usize> {
let mut i = 0;
//DOCUMENTED SIDE-EFFECT
//Performance out of the IO without poll
Expand All @@ -20,9 +20,11 @@ pub fn recv_from(obj: &mut Packets, socket: &UdpSocket) -> Result<usize> {
socket.set_nonblocking(false)?;
trace!("receiving on {}", socket.local_addr().unwrap());
let start = Instant::now();
let mut total_size = 0;
loop {
obj.packets.resize(i + NUM_RCVMMSGS, Packet::default());
obj.packets.resize(
std::cmp::min(i + NUM_RCVMMSGS, PACKETS_PER_BATCH),
Packet::default(),
);
match recv_mmsg(socket, &mut obj.packets[i..]) {
Err(_) if i > 0 => {
if start.elapsed().as_millis() > 1 {
Expand All @@ -33,16 +35,15 @@ pub fn recv_from(obj: &mut Packets, socket: &UdpSocket) -> Result<usize> {
trace!("recv_from err {:?}", e);
return Err(e);
}
Ok((size, npkts)) => {
Ok((_, npkts)) => {
if i == 0 {
socket.set_nonblocking(true)?;
}
trace!("got {} packets", npkts);
i += npkts;
total_size += size;
// Try to batch into big enough buffers
// will cause less re-shuffling later on.
if start.elapsed().as_millis() > 1 || total_size >= PACKETS_BATCH_SIZE {
if start.elapsed().as_millis() > max_wait_ms as u128 || i >= PACKETS_PER_BATCH {
break;
}
}
Expand Down Expand Up @@ -95,7 +96,7 @@ mod tests {
}
send_to(&p, &send_socket).unwrap();

let recvd = recv_from(&mut p, &recv_socket).unwrap();
let recvd = recv_from(&mut p, &recv_socket, 1).unwrap();

assert_eq!(recvd, p.packets.len());

Expand Down Expand Up @@ -127,4 +128,32 @@ mod tests {
p2.data[0] = 4;
assert!(p1 != p2);
}

#[test]
fn test_packet_resize() {
solana_logger::setup();
let recv_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
let addr = recv_socket.local_addr().unwrap();
let send_socket = UdpSocket::bind("127.0.0.1:0").expect("bind");
let mut p = Packets::default();
p.packets.resize(PACKETS_PER_BATCH, Packet::default());

// Should only get PACKETS_PER_BATCH packets per iteration even
// if a lot more were sent, and regardless of packet size
for _ in 0..2 * PACKETS_PER_BATCH {
let mut p = Packets::default();
p.packets.resize(1, Packet::default());
for m in p.packets.iter_mut() {
m.meta.set_addr(&addr);
m.meta.size = 1;
}
send_to(&p, &send_socket).unwrap();
}

let recvd = recv_from(&mut p, &recv_socket, 100).unwrap();

// Check we only got PACKETS_PER_BATCH packets
assert_eq!(recvd, PACKETS_PER_BATCH);
assert_eq!(p.packets.capacity(), PACKETS_PER_BATCH);
}
}
4 changes: 2 additions & 2 deletions core/src/retransmit_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ mod tests {
// it should send this over the sockets.
retransmit_sender.send(packets).unwrap();
let mut packets = Packets::new(vec![]);
packet::recv_from(&mut packets, &me_retransmit).unwrap();
packet::recv_from(&mut packets, &me_retransmit, 1).unwrap();
assert_eq!(packets.packets.len(), 1);
assert_eq!(packets.packets[0].meta.repair, false);

Expand All @@ -347,7 +347,7 @@ mod tests {
let packets = Packets::new(vec![repair, Packet::default()]);
retransmit_sender.send(packets).unwrap();
let mut packets = Packets::new(vec![]);
packet::recv_from(&mut packets, &me_retransmit).unwrap();
packet::recv_from(&mut packets, &me_retransmit, 1).unwrap();
assert_eq!(packets.packets.len(), 1);
assert_eq!(packets.packets[0].meta.repair, false);
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ fn recv_loop(
if exit.load(Ordering::Relaxed) {
return Ok(());
}
if let Ok(len) = packet::recv_from(&mut msgs, sock) {
if let Ok(len) = packet::recv_from(&mut msgs, sock, 1) {
if len == NUM_RCVMMSGS {
num_max_received += 1;
}
Expand Down
4 changes: 4 additions & 0 deletions perf/src/cuda_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ impl<T: Clone + Default + Sized> PinnedVec<T> {
pub fn iter_mut(&mut self) -> PinnedIterMut<T> {
PinnedIterMut(self.x.iter_mut())
}

pub fn capacity(&self) -> usize {
self.x.capacity()
}
}

impl<'a, T: Clone + Send + Sync + Default + Sized> IntoParallelIterator for &'a PinnedVec<T> {
Expand Down

0 comments on commit 2f54f57

Please sign in to comment.