Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

protocols/mdns: Optimise InterfaceState::poll for low latency #2939

Merged
merged 29 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
06de1b3
Rewrite `InterfaceState::poll` to have a single loop
thomaseizinger Sep 24, 2022
9e0d653
Change priorities in poll loop
thomaseizinger Sep 24, 2022
91b9719
Ensure to empty buffer even in case of error
thomaseizinger Sep 24, 2022
2612e58
Add changelog entry
thomaseizinger Sep 24, 2022
500c4ef
Bump version in manifest
thomaseizinger Sep 24, 2022
6ba9f77
Update protocols/mdns/CHANGELOG.md
thomaseizinger Sep 24, 2022
b9663e4
Move helper function to bottom
thomaseizinger Sep 28, 2022
ddf08b1
Optimise for small, local buffers
thomaseizinger Sep 28, 2022
517814e
Return `Poll` from `poll` function
thomaseizinger Sep 28, 2022
e5ac0fd
Improve local variable name
thomaseizinger Sep 28, 2022
ccf3ee9
Use `VecDeque::extend` over manual iterator
thomaseizinger Sep 28, 2022
a3240b4
Extract helper for extracting responses
thomaseizinger Sep 28, 2022
52692c1
Inline `inject_mdns_packet`
thomaseizinger Sep 28, 2022
d5f0232
Flatten match statement
thomaseizinger Sep 28, 2022
51086fa
Further flatten match statement
thomaseizinger Sep 28, 2022
e3210f5
MOAR flattening!
thomaseizinger Sep 28, 2022
d656898
Move `extract_discovered` to `MdnsResponse`
thomaseizinger Sep 28, 2022
02dfb4b
Extract time dependency
thomaseizinger Sep 28, 2022
f4d1891
Use `?` in `MdnsPacket::new` to propagate error
thomaseizinger Sep 28, 2022
4012750
Further flatten `MdnsPacket::new_from_bytes`
thomaseizinger Sep 28, 2022
3605f8d
Prefer early exit over if else
thomaseizinger Sep 28, 2022
c3e8dc7
Inline variables
thomaseizinger Sep 28, 2022
174aa85
Improve logs
thomaseizinger Sep 28, 2022
4ee79ab
Fix clippy
thomaseizinger Sep 28, 2022
bf334db
Merge branch 'master' into 2916-mdns-refactor-poll
thomaseizinger Sep 29, 2022
e4f8b7c
Put changelog items in correct order
thomaseizinger Sep 29, 2022
c125666
Fix bad merge of mDNS version
thomaseizinger Sep 29, 2022
63da9a1
Merge branch 'master' into 2916-mdns-refactor-poll
thomaseizinger Oct 4, 2022
ed1a720
Merge branch 'master' into 2916-mdns-refactor-poll
thomaseizinger Oct 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion protocols/mdns/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
- Update to `libp2p-core` `v0.37.0`.

- Update to `libp2p-swarm` `v0.40.0`.


- Fix a bug that could cause a delay of ~10s until peers would get discovered when using the tokio runtime. See [PR 2939].

[PR 2918]: https://github.com/libp2p/rust-libp2p/pull/2918
[PR 2939]: https://github.com/libp2p/rust-libp2p/pull/2939

# 0.40.0

Expand Down
2 changes: 1 addition & 1 deletion protocols/mdns/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ where
// Emit discovered event.
let mut discovered = SmallVec::<[(PeerId, Multiaddr); 4]>::new();
for iface_state in self.iface_states.values_mut() {
while let Some((peer, addr, expiration)) = iface_state.poll(cx, params) {
while let Poll::Ready((peer, addr, expiration)) = iface_state.poll(cx, params) {
if let Some((_, _, cur_expires)) = self
.discovered_nodes
.iter_mut()
Expand Down
165 changes: 80 additions & 85 deletions protocols/mdns/src/behaviour/iface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ use self::dns::{build_query, build_query_response, build_service_discovery_respo
use self::query::MdnsPacket;
use crate::behaviour::{socket::AsyncSocket, timer::Builder};
use crate::MdnsConfig;
use libp2p_core::{address_translation, multiaddr::Protocol, Multiaddr, PeerId};
use libp2p_core::{Multiaddr, PeerId};
use libp2p_swarm::PollParameters;
use socket2::{Domain, Socket, Type};
use std::{
collections::VecDeque,
io, iter,
io,
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, UdpSocket},
pin::Pin,
task::{Context, Poll},
Expand Down Expand Up @@ -145,106 +145,101 @@ where
self.timeout = T::interval_at(Instant::now(), self.query_interval);
}

fn inject_mdns_packet(&mut self, packet: MdnsPacket, params: &impl PollParameters) {
log::trace!("received packet on iface {} {:?}", self.addr, packet);
match packet {
MdnsPacket::Query(query) => {
self.reset_timer();
log::trace!("sending response on iface {}", self.addr);
for packet in build_query_response(
query.query_id(),
*params.local_peer_id(),
params.listened_addresses(),
self.ttl,
) {
self.send_buffer.push_back(packet);
}
pub fn poll(
&mut self,
cx: &mut Context,
params: &impl PollParameters,
) -> Poll<(PeerId, Multiaddr, Instant)> {
loop {
// 1st priority: Low latency: Create packet ASAP after timeout.
if Pin::new(&mut self.timeout).poll_next(cx).is_ready() {
log::trace!("sending query on iface {}", self.addr);
self.send_buffer.push_back(build_query());
}
MdnsPacket::Response(response) => {
// We replace the IP address with the address we observe the
// remote as and the address they listen on.
let obs_ip = Protocol::from(response.remote_addr().ip());
let obs_port = Protocol::Udp(response.remote_addr().port());
let observed: Multiaddr = iter::once(obs_ip).chain(iter::once(obs_port)).collect();

for peer in response.discovered_peers() {
if peer.id() == params.local_peer_id() {
// 2nd priority: Keep local buffers small: Send packets to remote.
if let Some(packet) = self.send_buffer.pop_front() {
match Pin::new(&mut self.send_socket).poll_write(
cx,
&packet,
SocketAddr::new(self.multicast_addr, 5353),
) {
Poll::Ready(Ok(_)) => {
log::trace!("sent packet on iface {}", self.addr);
continue;
}

let new_expiration = Instant::now() + peer.ttl();

for addr in peer.addresses() {
if let Some(new_addr) = address_translation(addr, &observed) {
self.discovered.push_back((
*peer.id(),
new_addr.clone(),
new_expiration,
));
}

self.discovered
.push_back((*peer.id(), addr.clone(), new_expiration));
Poll::Ready(Err(err)) => {
log::error!("error sending packet on iface {} {}", self.addr, err);
continue;
}
Poll::Pending => {
self.send_buffer.push_front(packet);
}
}
}
MdnsPacket::ServiceDiscovery(disc) => {
let resp = build_service_discovery_response(disc.query_id(), self.ttl);
self.send_buffer.push_back(resp);

// 3rd priority: Keep local buffers small: Return discovered addresses.
if let Some(discovered) = self.discovered.pop_front() {
return Poll::Ready(discovered);
}
}
}

pub fn poll(
&mut self,
cx: &mut Context,
params: &impl PollParameters,
) -> Option<(PeerId, Multiaddr, Instant)> {
// Poll receive socket.
while let Poll::Ready(data) =
Pin::new(&mut self.recv_socket).poll_read(cx, &mut self.recv_buffer)
{
match data {
Ok((len, from)) => {
if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from)
{
self.inject_mdns_packet(packet, params);
}
// 4th priority: Remote work: Answer incoming requests.
match Pin::new(&mut self.recv_socket)
.poll_read(cx, &mut self.recv_buffer)
.map_ok(|(len, from)| MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from))
{
Poll::Ready(Ok(Ok(Some(MdnsPacket::Query(query))))) => {
self.reset_timer();
log::trace!(
"received query from {} on {}",
query.remote_addr(),
self.addr
);

self.send_buffer.extend(build_query_response(
query.query_id(),
*params.local_peer_id(),
params.listened_addresses(),
self.ttl,
));
continue;
}
Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
// No more bytes available on the socket to read
break;
Poll::Ready(Ok(Ok(Some(MdnsPacket::Response(response))))) => {
log::trace!(
"received response from {} on {}",
response.remote_addr(),
self.addr
);

self.discovered.extend(
response.extract_discovered(Instant::now(), *params.local_peer_id()),
);
continue;
}
Err(err) => {
log::error!("failed reading datagram: {}", err);
Poll::Ready(Ok(Ok(Some(MdnsPacket::ServiceDiscovery(disc))))) => {
log::trace!(
"received service discovery from {} on {}",
disc.remote_addr(),
self.addr
);

self.send_buffer
.push_back(build_service_discovery_response(disc.query_id(), self.ttl));
continue;
}
Poll::Ready(Err(err)) if err.kind() == std::io::ErrorKind::WouldBlock => {
// No more bytes available on the socket to read
}
}
}

// Send responses.
while let Some(packet) = self.send_buffer.pop_front() {
match Pin::new(&mut self.send_socket).poll_write(
cx,
&packet,
SocketAddr::new(self.multicast_addr, 5353),
) {
Poll::Ready(Ok(_)) => log::trace!("sent packet on iface {}", self.addr),
Poll::Ready(Err(err)) => {
log::error!("error sending packet on iface {} {}", self.addr, err);
log::error!("failed reading datagram: {}", err);
}
Poll::Pending => {
self.send_buffer.push_front(packet);
break;
Poll::Ready(Ok(Err(err))) => {
log::debug!("Parsing mdns packet failed: {:?}", err);
}
Poll::Ready(Ok(Ok(None))) | Poll::Pending => {}
}
}

if Pin::new(&mut self.timeout).poll_next(cx).is_ready() {
log::trace!("sending query on iface {}", self.addr);
self.send_buffer.push_back(build_query());
return Poll::Pending;
}

// Emit discovered event.
self.discovered.pop_front()
}
}
111 changes: 68 additions & 43 deletions protocols/mdns/src/behaviour/iface/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ use super::dns;
use crate::{META_QUERY_SERVICE, SERVICE_NAME};
use dns_parser::{Packet, RData};
use libp2p_core::{
address_translation,
multiaddr::{Multiaddr, Protocol},
PeerId,
};
use std::time::Instant;
use std::{convert::TryFrom, fmt, net::SocketAddr, str, time::Duration};

/// A valid mDNS packet received by the service.
Expand All @@ -39,44 +41,40 @@ pub enum MdnsPacket {
}

impl MdnsPacket {
pub fn new_from_bytes(buf: &[u8], from: SocketAddr) -> Option<MdnsPacket> {
match Packet::parse(buf) {
Ok(packet) => {
if packet.header.query {
if packet
.questions
.iter()
.any(|q| q.qname.to_string().as_bytes() == SERVICE_NAME)
{
let query = MdnsPacket::Query(MdnsQuery {
from,
query_id: packet.header.id,
});
Some(query)
} else if packet
.questions
.iter()
.any(|q| q.qname.to_string().as_bytes() == META_QUERY_SERVICE)
{
// TODO: what if multiple questions, one with SERVICE_NAME and one with META_QUERY_SERVICE?
let discovery = MdnsPacket::ServiceDiscovery(MdnsServiceDiscovery {
from,
query_id: packet.header.id,
});
Some(discovery)
} else {
None
}
} else {
let resp = MdnsPacket::Response(MdnsResponse::new(packet, from));
Some(resp)
}
}
Err(err) => {
log::debug!("Parsing mdns packet failed: {:?}", err);
None
}
pub fn new_from_bytes(
buf: &[u8],
from: SocketAddr,
) -> Result<Option<MdnsPacket>, dns_parser::Error> {
let packet = Packet::parse(buf)?;

if !packet.header.query {
return Ok(Some(MdnsPacket::Response(MdnsResponse::new(packet, from))));
}

if packet
.questions
.iter()
.any(|q| q.qname.to_string().as_bytes() == SERVICE_NAME)
{
return Ok(Some(MdnsPacket::Query(MdnsQuery {
from,
query_id: packet.header.id,
})));
}

if packet
.questions
.iter()
.any(|q| q.qname.to_string().as_bytes() == META_QUERY_SERVICE)
{
// TODO: what if multiple questions, one with SERVICE_NAME and one with META_QUERY_SERVICE?
return Ok(Some(MdnsPacket::ServiceDiscovery(MdnsServiceDiscovery {
from,
query_id: packet.header.id,
})));
}

Ok(None)
}
}

Expand Down Expand Up @@ -167,18 +165,45 @@ impl MdnsResponse {
MdnsResponse { peers, from }
}

/// Returns the list of peers that have been reported in this packet.
///
/// > **Note**: Keep in mind that this will also contain the responses we sent ourselves.
pub fn discovered_peers(&self) -> impl Iterator<Item = &MdnsPeer> {
self.peers.iter()
pub fn extract_discovered(
&self,
now: Instant,
local_peer_id: PeerId,
) -> impl Iterator<Item = (PeerId, Multiaddr, Instant)> + '_ {
self.discovered_peers()
.filter(move |peer| peer.id() != &local_peer_id)
.flat_map(move |peer| {
let observed = self.observed_address();
let new_expiration = now + peer.ttl();

peer.addresses().iter().filter_map(move |address| {
let new_addr = address_translation(address, &observed)?;

Some((*peer.id(), new_addr, new_expiration))
})
})
}

/// Source address of the packet.
#[inline]
pub fn remote_addr(&self) -> &SocketAddr {
&self.from
}

fn observed_address(&self) -> Multiaddr {
// We replace the IP address with the address we observe the
// remote as and the address they listen on.
let obs_ip = Protocol::from(self.remote_addr().ip());
let obs_port = Protocol::Udp(self.remote_addr().port());

Multiaddr::empty().with(obs_ip).with(obs_port)
}

/// Returns the list of peers that have been reported in this packet.
///
/// > **Note**: Keep in mind that this will also contain the responses we sent ourselves.
fn discovered_peers(&self) -> impl Iterator<Item = &MdnsPeer> {
self.peers.iter()
}
}

impl fmt::Debug for MdnsResponse {
Expand Down