Skip to content

Commit

Permalink
sync: avoid false sharing in mpsc channel (#5829)
Browse files Browse the repository at this point in the history
  • Loading branch information
wathenjiang authored Aug 3, 2023
1 parent 52e6510 commit 38d1bcd
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 10 deletions.
21 changes: 11 additions & 10 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::runtime::park::CachedParkThread;
use crate::sync::mpsc::error::TryRecvError;
use crate::sync::mpsc::{bounded, list, unbounded};
use crate::sync::notify::Notify;
use crate::util::cacheline::CachePadded;

use std::fmt;
use std::process;
Expand Down Expand Up @@ -46,18 +47,18 @@ pub(crate) trait Semaphore {
}

pub(super) struct Chan<T, S> {
/// Handle to the push half of the lock-free list.
tx: CachePadded<list::Tx<T>>,

/// Receiver waker. Notified when a value is pushed into the channel.
rx_waker: CachePadded<AtomicWaker>,

/// Notifies all tasks listening for the receiver being dropped.
notify_rx_closed: Notify,

/// Handle to the push half of the lock-free list.
tx: list::Tx<T>,

/// Coordinates access to channel's capacity.
semaphore: S,

/// Receiver waker. Notified when a value is pushed into the channel.
rx_waker: AtomicWaker,

/// Tracks the number of outstanding sender handles.
///
/// When this drops to zero, the send half of the channel is closed.
Expand All @@ -73,9 +74,9 @@ where
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Chan")
.field("tx", &self.tx)
.field("tx", &*self.tx)
.field("semaphore", &self.semaphore)
.field("rx_waker", &self.rx_waker)
.field("rx_waker", &*self.rx_waker)
.field("tx_count", &self.tx_count)
.field("rx_fields", &"...")
.finish()
Expand Down Expand Up @@ -108,9 +109,9 @@ pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) {

let chan = Arc::new(Chan {
notify_rx_closed: Notify::new(),
tx,
tx: CachePadded::new(tx),
semaphore,
rx_waker: AtomicWaker::new(),
rx_waker: CachePadded::new(AtomicWaker::new()),
tx_count: AtomicUsize::new(1),
rx_fields: UnsafeCell::new(RxFields {
list: rx,
Expand Down
95 changes: 95 additions & 0 deletions tokio/src/util/cacheline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
use std::ops::{Deref, DerefMut};

/// Pads and aligns a value to the length of a cache line.
#[derive(Clone, Copy, Default, Hash, PartialEq, Eq)]
// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache
// lines at a time, so we have to align to 128 bytes rather than 64.
//
// Sources:
// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107
//
// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size.
//
// Sources:
// - https://www.mono-project.com/news/2016/09/12/arm64-icache/
//
// powerpc64 has 128-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9
#[cfg_attr(
any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
),
repr(align(128))
)]
// arm, mips, mips64, and riscv64 have 32-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7
#[cfg_attr(
any(
target_arch = "arm",
target_arch = "mips",
target_arch = "mips64",
target_arch = "riscv64",
),
repr(align(32))
)]
// s390x has 256-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7
#[cfg_attr(target_arch = "s390x", repr(align(256)))]
// x86 and wasm have 64-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
//
// All others are assumed to have 64-byte cache line size.
#[cfg_attr(
not(any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
target_arch = "arm",
target_arch = "mips",
target_arch = "mips64",
target_arch = "riscv64",
target_arch = "s390x",
)),
repr(align(64))
)]
pub(crate) struct CachePadded<T> {
value: T,
}

impl<T> CachePadded<T> {
/// Pads and aligns a value to the length of a cache line.
pub(crate) fn new(value: T) -> CachePadded<T> {
CachePadded::<T> { value }
}
}

impl<T> Deref for CachePadded<T> {
type Target = T;

fn deref(&self) -> &T {
&self.value
}
}

impl<T> DerefMut for CachePadded<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.value
}
}
2 changes: 2 additions & 0 deletions tokio/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,5 @@ pub(crate) mod error;
pub(crate) mod memchr;

pub(crate) mod markers;

pub(crate) mod cacheline;

0 comments on commit 38d1bcd

Please sign in to comment.