Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iface: make poll() process all packets, add fine-grained poll functions. #991

Merged
merged 2 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions src/iface/interface/ipv4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,23 @@ impl Interface {
/// processed or emitted, and thus, whether the readiness of any socket might
/// have changed.
#[cfg(feature = "proto-ipv4-fragmentation")]
pub(super) fn ipv4_egress<D>(&mut self, device: &mut D) -> bool
where
D: Device + ?Sized,
{
pub(super) fn ipv4_egress(&mut self, device: &mut (impl Device + ?Sized)) {
// Reset the buffer when we transmitted everything.
if self.fragmenter.finished() {
self.fragmenter.reset();
}

if self.fragmenter.is_empty() {
return false;
return;
}

let pkt = &self.fragmenter;
if pkt.packet_len > pkt.sent_bytes {
if let Some(tx_token) = device.transmit(self.inner.now) {
self.inner
.dispatch_ipv4_frag(tx_token, &mut self.fragmenter);
return true;
}
}
false
}
}

Expand Down
192 changes: 140 additions & 52 deletions src/iface/interface/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,44 @@ macro_rules! check {
}
use check;

/// Result returned by [`Interface::poll`].
///
/// This contains information on whether socket states might have changed.
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum PollResult {
/// Socket state is guaranteed to not have changed.
None,
/// You should check the state of sockets again for received data or completion of operations.
SocketStateChanged,
}

/// Result returned by [`Interface::poll_ingress_single`].
///
/// This contains information on whether a packet was processed or not,
/// and whether it might've affected socket states.
#[derive(Copy, Clone, PartialEq, Eq, Debug)]
#[cfg_attr(feature = "defmt", derive(defmt::Format))]
pub enum PollIngressSingleResult {
/// No packet was processed. You don't need to call [`Interface::poll_ingress_single`]
/// again, until more packets arrive.
///
/// Socket state is guaranteed to not have changed.
None,
/// A packet was processed.
///
/// There may be more packets in the device's RX queue, so you should call [`Interface::poll_ingress_single`] again.
///
/// Socket state is guaranteed to not have changed.
PacketProcessed,
/// A packet was processed, which might have caused socket state to change.
///
/// There may be more packets in the device's RX queue, so you should call [`Interface::poll_ingress_single`] again.
///
/// You should check the state of sockets again for received data or completion of operations.
SocketStateChanged,
}

/// A network interface.
///
/// The network interface logically owns a number of other data structures; to avoid
Expand Down Expand Up @@ -150,10 +188,7 @@ impl Interface {
/// # Panics
/// This function panics if the [`Config::hardware_address`] does not match
/// the medium of the device.
pub fn new<D>(config: Config, device: &mut D, now: Instant) -> Self
where
D: Device + ?Sized,
{
pub fn new(config: Config, device: &mut (impl Device + ?Sized), now: Instant) -> Self {
let caps = device.capabilities();
assert_eq!(
config.hardware_addr.medium(),
Expand Down Expand Up @@ -375,59 +410,107 @@ impl Interface {
self.fragments.reassembly_timeout = timeout;
}

/// Transmit packets queued in the given sockets, and receive packets queued
/// Transmit packets queued in the sockets, and receive packets queued
/// in the device.
///
/// This function returns a boolean value indicating whether any packets were
/// processed or emitted, and thus, whether the readiness of any socket might
/// have changed.
/// This function returns a value indicating whether the state of any socket
/// might have changed.
///
/// ## DoS warning
///
/// # Note
/// This function performs a bounded amount of work per call to avoid
/// starving other tasks of CPU time. If it returns true, there may still be
/// packets to be received or transmitted. Depending on system design,
/// calling this function in a loop may cause a denial of service if
/// packets cannot be processed faster than they arrive.
pub fn poll<D>(
/// This function processes all packets in the device's queue. This can
/// be an unbounded amount of work if packets arrive faster than they're
/// processed.
///
/// If this is a concern for your application (i.e. your environment doesn't
/// have preemptive scheduling, or `poll()` is called from a main loop where
/// other important things are processed), you may use the lower-level methods
/// [`poll_egress()`](Self::poll_egress) and [`poll_ingress_single()`](Self::poll_ingress_single).
/// This allows you to insert yields or process other events between processing
/// individual ingress packets.
pub fn poll(
&mut self,
timestamp: Instant,
device: &mut D,
device: &mut (impl Device + ?Sized),
sockets: &mut SocketSet<'_>,
) -> bool
where
D: Device + ?Sized,
{
) -> PollResult {
self.inner.now = timestamp;

let mut res = PollResult::None;

#[cfg(feature = "_proto-fragmentation")]
self.fragments.assembler.remove_expired(timestamp);

// Process ingress while there's packets available.
loop {
match self.socket_ingress(device, sockets) {
PollIngressSingleResult::None => break,
PollIngressSingleResult::PacketProcessed => {}
PollIngressSingleResult::SocketStateChanged => res = PollResult::SocketStateChanged,
}
}

// Process egress.
match self.poll_egress(timestamp, device, sockets) {
PollResult::None => {}
PollResult::SocketStateChanged => res = PollResult::SocketStateChanged,
}

res
}

/// Transmit packets queued in the sockets.
///
/// This function returns a value indicating whether the state of any socket
/// might have changed.
///
/// This is guaranteed to always perform a bounded amount of work.
pub fn poll_egress(
&mut self,
timestamp: Instant,
device: &mut (impl Device + ?Sized),
sockets: &mut SocketSet<'_>,
) -> PollResult {
self.inner.now = timestamp;

match self.inner.caps.medium {
#[cfg(feature = "medium-ieee802154")]
Medium::Ieee802154 =>
{
Medium::Ieee802154 => {
#[cfg(feature = "proto-sixlowpan-fragmentation")]
if self.sixlowpan_egress(device) {
return true;
}
self.sixlowpan_egress(device);
}
#[cfg(any(feature = "medium-ethernet", feature = "medium-ip"))]
_ =>
{
_ => {
#[cfg(feature = "proto-ipv4-fragmentation")]
if self.ipv4_egress(device) {
return true;
}
self.ipv4_egress(device);
}
}

let mut readiness_may_have_changed = self.socket_ingress(device, sockets);
readiness_may_have_changed |= self.socket_egress(device, sockets);

#[cfg(feature = "multicast")]
self.multicast_egress(device);

readiness_may_have_changed
self.socket_egress(device, sockets)
}

/// Process one incoming packet queued in the device.
///
/// Returns a value indicating:
/// - whether a packet was processed, in which case you have to call this method again in case there's more packets queued.
/// - whether the state of any socket might have changed.
///
/// Since it processes at most one packet, this is guaranteed to always perform a bounded amount of work.
pub fn poll_ingress_single(
&mut self,
timestamp: Instant,
device: &mut (impl Device + ?Sized),
sockets: &mut SocketSet<'_>,
) -> PollIngressSingleResult {
self.inner.now = timestamp;

#[cfg(feature = "_proto-fragmentation")]
self.fragments.assembler.remove_expired(timestamp);

self.socket_ingress(device, sockets)
}

/// Return a _soft deadline_ for calling [poll] the next time.
Expand Down Expand Up @@ -480,20 +563,19 @@ impl Interface {
}
}

fn socket_ingress<D>(&mut self, device: &mut D, sockets: &mut SocketSet<'_>) -> bool
where
D: Device + ?Sized,
{
let mut processed_any = false;

fn socket_ingress(
&mut self,
device: &mut (impl Device + ?Sized),
sockets: &mut SocketSet<'_>,
) -> PollIngressSingleResult {
let Some((rx_token, tx_token)) = device.receive(self.inner.now) else {
return processed_any;
return PollIngressSingleResult::None;
};

let rx_meta = rx_token.meta();
rx_token.consume(|frame| {
if frame.is_empty() {
return;
return PollIngressSingleResult::PacketProcessed;
}

match self.inner.caps.medium {
Expand Down Expand Up @@ -543,24 +625,30 @@ impl Interface {
}
}
}
processed_any = true;
});

processed_any
// TODO: Propagate the PollIngressSingleResult from deeper.
// There's many received packets that we process but can't cause sockets
// to change state. For example IP fragments, multicast stuff, ICMP pings
// if they dont't match any raw socket...
// We should return `PacketProcessed` for these to save the user from
// doing useless socket polls.
PollIngressSingleResult::SocketStateChanged
})
}

fn socket_egress<D>(&mut self, device: &mut D, sockets: &mut SocketSet<'_>) -> bool
where
D: Device + ?Sized,
{
fn socket_egress(
&mut self,
device: &mut (impl Device + ?Sized),
sockets: &mut SocketSet<'_>,
) -> PollResult {
let _caps = device.capabilities();

enum EgressError {
Exhausted,
Dispatch,
}

let mut emitted_any = false;
let mut result = PollResult::None;
for item in sockets.items_mut() {
if !item
.meta
Expand All @@ -581,7 +669,7 @@ impl Interface {
.dispatch_ip(t, meta, response, &mut self.fragmenter)
.map_err(|_| EgressError::Dispatch)?;

emitted_any = true;
result = PollResult::SocketStateChanged;

Ok(())
};
Expand Down Expand Up @@ -663,7 +751,7 @@ impl Interface {
Ok(()) => {}
}
}
emitted_any
result
}
}

Expand Down
5 changes: 1 addition & 4 deletions src/iface/interface/multicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,7 @@ impl Interface {
/// - Send join/leave packets according to the multicast group state.
/// - Depending on `igmp_report_state` and the therein contained
/// timeouts, send IGMP membership reports.
pub(crate) fn multicast_egress<D>(&mut self, device: &mut D)
where
D: Device + ?Sized,
{
pub(crate) fn multicast_egress(&mut self, device: &mut (impl Device + ?Sized)) {
// Process multicast joins.
while let Some((&addr, _)) = self
.inner
Expand Down
13 changes: 2 additions & 11 deletions src/iface/interface/sixlowpan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,24 @@ pub(crate) const MAX_DECOMPRESSED_LEN: usize = 1500;

impl Interface {
/// Process fragments that still need to be sent for 6LoWPAN packets.
///
/// This function returns a boolean value indicating whether any packets were
/// processed or emitted, and thus, whether the readiness of any socket might
/// have changed.
#[cfg(feature = "proto-sixlowpan-fragmentation")]
pub(super) fn sixlowpan_egress<D>(&mut self, device: &mut D) -> bool
where
D: Device + ?Sized,
{
pub(super) fn sixlowpan_egress(&mut self, device: &mut (impl Device + ?Sized)) {
// Reset the buffer when we transmitted everything.
if self.fragmenter.finished() {
self.fragmenter.reset();
}

if self.fragmenter.is_empty() {
return false;
return;
}

let pkt = &self.fragmenter;
if pkt.packet_len > pkt.sent_bytes {
if let Some(tx_token) = device.transmit(self.inner.now) {
self.inner
.dispatch_ieee802154_frag(tx_token, &mut self.fragmenter);
return true;
}
}
false
}

/// Get the 6LoWPAN address contexts.
Expand Down
10 changes: 2 additions & 8 deletions src/iface/interface/tests/ipv4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,20 +721,14 @@ fn test_handle_igmp(#[case] medium: Medium) {
}

// General query
let timestamp = Instant::ZERO;
const GENERAL_QUERY_BYTES: &[u8] = &[
0x46, 0xc0, 0x00, 0x24, 0xed, 0xb4, 0x00, 0x00, 0x01, 0x02, 0x47, 0x43, 0xac, 0x16, 0x63,
0x04, 0xe0, 0x00, 0x00, 0x01, 0x94, 0x04, 0x00, 0x00, 0x11, 0x64, 0xec, 0x8f, 0x00, 0x00,
0x00, 0x00, 0x02, 0x0c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00,
];
{
// Transmit GENERAL_QUERY_BYTES into loopback
let tx_token = device.transmit(timestamp).unwrap();
tx_token.consume(GENERAL_QUERY_BYTES.len(), |buffer| {
buffer.copy_from_slice(GENERAL_QUERY_BYTES);
});
}
device.rx_queue.push_back(GENERAL_QUERY_BYTES.to_vec());

// Trigger processing until all packets received through the
// loopback have been processed, including responses to
// GENERAL_QUERY_BYTES. Therefore `recv_all()` would return 0
Expand Down
6 changes: 2 additions & 4 deletions src/iface/interface/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@ fn fill_slice(s: &mut [u8], val: u8) {
#[allow(unused)]
fn recv_all(device: &mut crate::tests::TestingDevice, timestamp: Instant) -> Vec<Vec<u8>> {
let mut pkts = Vec::new();
while let Some((rx, _tx)) = device.receive(timestamp) {
rx.consume(|pkt| {
pkts.push(pkt.to_vec());
});
while let Some(pkt) = device.tx_queue.pop_front() {
pkts.push(pkt)
}
pkts
}
Expand Down
Loading
Loading