Skip to content

Commit

Permalink
iface: make poll() process all packets, add fine-grained poll functions.
Browse files Browse the repository at this point in the history
This makes `.poll()` behave the same as before #954. Users affected by DoS
concerns can use the finer-grained egress-only and single-packet-ingress-only fns.
  • Loading branch information
Dirbaio committed Sep 16, 2024
1 parent 86ec8f9 commit 5984bc7
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 74 deletions.
9 changes: 2 additions & 7 deletions src/iface/interface/ipv4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,23 @@ impl Interface {
/// processed or emitted, and thus, whether the readiness of any socket might
/// have changed.
#[cfg(feature = "proto-ipv4-fragmentation")]
pub(super) fn ipv4_egress<D>(&mut self, device: &mut D) -> bool
where
D: Device + ?Sized,
{
pub(super) fn ipv4_egress(&mut self, device: &mut (impl Device + ?Sized)) {
// Reset the buffer when we transmitted everything.
if self.fragmenter.finished() {
self.fragmenter.reset();
}

if self.fragmenter.is_empty() {
return false;
return;
}

let pkt = &self.fragmenter;
if pkt.packet_len > pkt.sent_bytes {
if let Some(tx_token) = device.transmit(self.inner.now) {
self.inner
.dispatch_ipv4_frag(tx_token, &mut self.fragmenter);
return true;
}
}
false
}
}

Expand Down
192 changes: 140 additions & 52 deletions src/iface/interface/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,44 @@ macro_rules! check {
}
use check;

/// Result returned by [`Interface::poll`].
///
/// This contains information on whether socket states might have changed.
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum PollResult {
/// Socket state is guaranteed to not have changed.
None,
/// You should check the state of sockets again for received data or completion of operations.
SocketStateChanged,
}

/// Result returned by [`Interface::poll_ingress_single`].
///
/// This contains information on whether a packet was processed or not,
/// and whether it might've affected socket states.
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum PollIngressSingleResult {
/// No packet was processed. You don't need to call [`Interface::poll_ingress_single`]
/// again, until more packets arrive.
///
/// Socket state is guaranteed to not have changed.
None,
/// A packet was processed.
///
/// There may be more packets in the device's RX queue, so you should call [`Interface::poll_ingress_single`] again.
///
/// Socket state is guaranteed to not have changed.
PacketProcessed,
/// A packet was processed, which might have caused socket state to change.
///
/// There may be more packets in the device's RX queue, so you should call [`Interface::poll_ingress_single`] again.
///
/// You should check the state of sockets again for received data or completion of operations.
SocketStateChanged,
}

/// A network interface.
///
/// The network interface logically owns a number of other data structures; to avoid
Expand Down Expand Up @@ -150,10 +188,7 @@ impl Interface {
/// # Panics
/// This function panics if the [`Config::hardware_address`] does not match
/// the medium of the device.
pub fn new<D>(config: Config, device: &mut D, now: Instant) -> Self
where
D: Device + ?Sized,
{
pub fn new(config: Config, device: &mut (impl Device + ?Sized), now: Instant) -> Self {
let caps = device.capabilities();
assert_eq!(
config.hardware_addr.medium(),
Expand Down Expand Up @@ -375,59 +410,107 @@ impl Interface {
self.fragments.reassembly_timeout = timeout;
}

/// Transmit packets queued in the given sockets, and receive packets queued
/// Transmit packets queued in the sockets, and receive packets queued
/// in the device.
///
/// This function returns a boolean value indicating whether any packets were
/// processed or emitted, and thus, whether the readiness of any socket might
/// have changed.
/// This function returns a value indicating whether the state of any socket
/// might have changed.
///
/// ## DoS warning
///
/// # Note
/// This function performs a bounded amount of work per call to avoid
/// starving other tasks of CPU time. If it returns true, there may still be
/// packets to be received or transmitted. Depending on system design,
/// calling this function in a loop may cause a denial of service if
/// packets cannot be processed faster than they arrive.
pub fn poll<D>(
/// This function processes all packets in the device's queue. This can
/// be an unbounded amount of work if packets arrive faster than they're
/// processed.
///
/// If this is a concern for your application (i.e. your environment doesn't
/// have preemptive scheduling, or `poll()` is called from a main loop where
/// other important things are processed), you may use the lower-level methods
/// [`poll_egress()`](Self::poll_egress) and [`poll_ingress_single()`](Self::poll_ingress_single).
/// This allows you to insert yields or process other events between processing
/// individual ingress packets.
pub fn poll(
&mut self,
timestamp: Instant,
device: &mut D,
device: &mut (impl Device + ?Sized),
sockets: &mut SocketSet<'_>,
) -> bool
where
D: Device + ?Sized,
{
) -> PollResult {
self.inner.now = timestamp;

let mut res = PollResult::None;

#[cfg(feature = "_proto-fragmentation")]
self.fragments.assembler.remove_expired(timestamp);

// Process ingress while there's packets available.
loop {
match self.socket_ingress(device, sockets) {
PollIngressSingleResult::None => break,
PollIngressSingleResult::PacketProcessed => {}
PollIngressSingleResult::SocketStateChanged => res = PollResult::SocketStateChanged,
}
}

// Process egress.
match self.poll_egress(timestamp, device, sockets) {
PollResult::None => {}
PollResult::SocketStateChanged => res = PollResult::SocketStateChanged,
}

res
}

/// Transmit packets queued in the sockets.
///
/// This function returns a value indicating whether the state of any socket
/// might have changed.
///
/// This is guaranteed to always perform a bounded amount of work.
pub fn poll_egress(
&mut self,
timestamp: Instant,
device: &mut (impl Device + ?Sized),
sockets: &mut SocketSet<'_>,
) -> PollResult {
self.inner.now = timestamp;

match self.inner.caps.medium {
#[cfg(feature = "medium-ieee802154")]
Medium::Ieee802154 =>
{
Medium::Ieee802154 => {
#[cfg(feature = "proto-sixlowpan-fragmentation")]
if self.sixlowpan_egress(device) {
return true;
}
self.sixlowpan_egress(device);
}
#[cfg(any(feature = "medium-ethernet", feature = "medium-ip"))]
_ =>
{
_ => {
#[cfg(feature = "proto-ipv4-fragmentation")]
if self.ipv4_egress(device) {
return true;
}
self.ipv4_egress(device);
}
}

let mut readiness_may_have_changed = self.socket_ingress(device, sockets);
readiness_may_have_changed |= self.socket_egress(device, sockets);

#[cfg(feature = "multicast")]
self.multicast_egress(device);

readiness_may_have_changed
self.socket_egress(device, sockets)
}

/// Process one incoming packet queued in the device.
///
/// Returns a value indicating:
/// - whether a packet was processed, in which case you have to call this method again in case there's more packets queued.
/// - whether the state of any socket might have changed.
///
/// Since it processes at most one packet, this is guaranteed to always perform a bounded amount of work.
pub fn poll_ingress_single(
&mut self,
timestamp: Instant,
device: &mut (impl Device + ?Sized),
sockets: &mut SocketSet<'_>,
) -> PollIngressSingleResult {
self.inner.now = timestamp;

#[cfg(feature = "_proto-fragmentation")]
self.fragments.assembler.remove_expired(timestamp);

self.socket_ingress(device, sockets)
}

/// Return a _soft deadline_ for calling [poll] the next time.
Expand Down Expand Up @@ -480,20 +563,19 @@ impl Interface {
}
}

fn socket_ingress<D>(&mut self, device: &mut D, sockets: &mut SocketSet<'_>) -> bool
where
D: Device + ?Sized,
{
let mut processed_any = false;

fn socket_ingress(
&mut self,
device: &mut (impl Device + ?Sized),
sockets: &mut SocketSet<'_>,
) -> PollIngressSingleResult {
let Some((rx_token, tx_token)) = device.receive(self.inner.now) else {
return processed_any;
return PollIngressSingleResult::None;
};

let rx_meta = rx_token.meta();
rx_token.consume(|frame| {
if frame.is_empty() {
return;
return PollIngressSingleResult::PacketProcessed;
}

match self.inner.caps.medium {
Expand Down Expand Up @@ -543,24 +625,30 @@ impl Interface {
}
}
}
processed_any = true;
});

processed_any
// TODO: Propagate the PollIngressSingleResult from deeper.
// There's many received packets that we process but can't cause sockets
// to change state. For example IP fragments, multicast stuff, ICMP pings
// if they dont't match any raw socket...
// We should return `PacketProcessed` for these to save the user from
// doing useless socket polls.
PollIngressSingleResult::SocketStateChanged
})
}

fn socket_egress<D>(&mut self, device: &mut D, sockets: &mut SocketSet<'_>) -> bool
where
D: Device + ?Sized,
{
fn socket_egress(
&mut self,
device: &mut (impl Device + ?Sized),
sockets: &mut SocketSet<'_>,
) -> PollResult {
let _caps = device.capabilities();

enum EgressError {
Exhausted,
Dispatch,
}

let mut emitted_any = false;
let mut result = PollResult::None;
for item in sockets.items_mut() {
if !item
.meta
Expand All @@ -581,7 +669,7 @@ impl Interface {
.dispatch_ip(t, meta, response, &mut self.fragmenter)
.map_err(|_| EgressError::Dispatch)?;

emitted_any = true;
result = PollResult::SocketStateChanged;

Ok(())
};
Expand Down Expand Up @@ -663,7 +751,7 @@ impl Interface {
Ok(()) => {}
}
}
emitted_any
result
}
}

Expand Down
5 changes: 1 addition & 4 deletions src/iface/interface/multicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,7 @@ impl Interface {
/// - Send join/leave packets according to the multicast group state.
/// - Depending on `igmp_report_state` and the therein contained
/// timeouts, send IGMP membership reports.
pub(crate) fn multicast_egress<D>(&mut self, device: &mut D)
where
D: Device + ?Sized,
{
pub(crate) fn multicast_egress(&mut self, device: &mut (impl Device + ?Sized)) {
// Process multicast joins.
while let Some((&addr, _)) = self
.inner
Expand Down
13 changes: 2 additions & 11 deletions src/iface/interface/sixlowpan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,24 @@ pub(crate) const MAX_DECOMPRESSED_LEN: usize = 1500;

impl Interface {
/// Process fragments that still need to be sent for 6LoWPAN packets.
///
/// This function returns a boolean value indicating whether any packets were
/// processed or emitted, and thus, whether the readiness of any socket might
/// have changed.
#[cfg(feature = "proto-sixlowpan-fragmentation")]
pub(super) fn sixlowpan_egress<D>(&mut self, device: &mut D) -> bool
where
D: Device + ?Sized,
{
pub(super) fn sixlowpan_egress(&mut self, device: &mut (impl Device + ?Sized)) {
// Reset the buffer when we transmitted everything.
if self.fragmenter.finished() {
self.fragmenter.reset();
}

if self.fragmenter.is_empty() {
return false;
return;
}

let pkt = &self.fragmenter;
if pkt.packet_len > pkt.sent_bytes {
if let Some(tx_token) = device.transmit(self.inner.now) {
self.inner
.dispatch_ieee802154_frag(tx_token, &mut self.fragmenter);
return true;
}
}
false
}

/// Get the 6LoWPAN address contexts.
Expand Down

0 comments on commit 5984bc7

Please sign in to comment.