Skip to content

Commit

Permalink
Merge 'tokio-1.25.3' into 'tokio-1.32.x' (#6227)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed Dec 19, 2023
2 parents ccb37c4 + 0d36233 commit 22b3a65
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 9 deletions.
7 changes: 7 additions & 0 deletions tokio/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,13 @@ This release bumps the MSRV of Tokio to 1.56. ([#5559])
[#5513]: https://github.com/tokio-rs/tokio/pull/5513
[#5517]: https://github.com/tokio-rs/tokio/pull/5517

# 1.25.3 (December 17th, 2023)

### Fixed
- io: add budgeting to `tokio::runtime::io::registration::async_io` ([#6221])

[#6221]: https://github.com/tokio-rs/tokio/pull/6221

# 1.25.2 (September 22, 2023)

Forward ports 1.20.6 changes.
Expand Down
16 changes: 8 additions & 8 deletions tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,23 @@ tokio_thread_local! {
#[cfg(feature = "rt")]
thread_id: Cell::new(None),

/// Tracks the current runtime handle to use when spawning,
/// accessing drivers, etc...
// Tracks the current runtime handle to use when spawning,
// accessing drivers, etc...
#[cfg(feature = "rt")]
current: current::HandleCell::new(),

/// Tracks the current scheduler internal context
// Tracks the current scheduler internal context
#[cfg(feature = "rt")]
scheduler: Scoped::new(),

#[cfg(feature = "rt")]
current_task_id: Cell::new(None),

/// Tracks if the current thread is currently driving a runtime.
/// Note, that if this is set to "entered", the current scheduler
/// handle may not reference the runtime currently executing. This
/// is because other runtime handles may be set to current from
/// within a runtime.
// Tracks if the current thread is currently driving a runtime.
// Note, that if this is set to "entered", the current scheduler
// handle may not reference the runtime currently executing. This
// is because other runtime handles may be set to current from
// within a runtime.
#[cfg(feature = "rt")]
runtime: Cell::new(EnterRuntime::NotEntered),

Expand Down
7 changes: 6 additions & 1 deletion tokio/src/runtime/io/registration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,16 @@ impl Registration {
loop {
let event = self.readiness(interest).await?;

let coop = crate::future::poll_fn(crate::runtime::coop::poll_proceed).await;

match f() {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
self.clear_readiness(event);
}
x => return x,
x => {
coop.made_progress();
return x;
}
}
}
}
Expand Down
77 changes: 77 additions & 0 deletions tokio/tests/coop_budger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", target_os = "linux"))]

use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::net::UdpSocket;

/// Ensure that UDP sockets have functional budgeting
///
/// # Design
/// Two sockets communicate by spamming packets from one to the other.
///
/// In Linux, this packet will be slammed through the entire network stack and into the receiver's buffer during the
/// send system call because we are using the loopback interface.
/// This happens because the softirq chain invoked on send when using the loopback interface covers virtually the
/// entirety of the lifecycle of a packet within the kernel network stack.
///
/// As a result, neither socket will ever encounter an EWOULDBLOCK, and the only way for these to yield during the loop
/// is through budgeting.
///
/// A second task runs in the background and increments a counter before yielding, allowing us to know how many times sockets yielded.
/// Since we are both sending and receiving, that should happen once per 64 packets, because budgets are of size 128
/// and there are two budget events per packet, a send and a recv.
#[tokio::test]
async fn coop_budget_udp_send_recv() {
const BUDGET: usize = 128;
const N_ITERATIONS: usize = 1024;

const PACKET: &[u8] = b"Hello, world";
const PACKET_LEN: usize = 12;

assert_eq!(
PACKET_LEN,
PACKET.len(),
"Defect in test, programmer can't do math"
);

// bind each socket to a dynamic port, forcing IPv4 addressing on the localhost interface
let tx = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let rx = UdpSocket::bind("127.0.0.1:0").await.unwrap();

tx.connect(rx.local_addr().unwrap()).await.unwrap();
rx.connect(tx.local_addr().unwrap()).await.unwrap();

let tracker = Arc::new(AtomicUsize::default());

let tracker_clone = Arc::clone(&tracker);

tokio::task::yield_now().await;

tokio::spawn(async move {
loop {
tracker_clone.fetch_add(1, Ordering::SeqCst);

tokio::task::yield_now().await;
}
});

for _ in 0..N_ITERATIONS {
tx.send(PACKET).await.unwrap();

let mut tmp = [0; PACKET_LEN];

// ensure that we aren't somehow accumulating other
assert_eq!(
PACKET_LEN,
rx.recv(&mut tmp).await.unwrap(),
"Defect in test case, received unexpected result from socket"
);
assert_eq!(
PACKET, &tmp,
"Defect in test case, received unexpected result from socket"
);
}

assert_eq!(N_ITERATIONS / (BUDGET / 2), tracker.load(Ordering::SeqCst));
}

0 comments on commit 22b3a65

Please sign in to comment.