Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: mpsc performance optimization avoid false sharing #5829

Merged
merged 10 commits into from
Aug 3, 2023
21 changes: 11 additions & 10 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::runtime::park::CachedParkThread;
use crate::sync::mpsc::error::TryRecvError;
use crate::sync::mpsc::{bounded, list, unbounded};
use crate::sync::notify::Notify;
use crate::util::cacheline::CachePadded;

use std::fmt;
use std::process;
Expand Down Expand Up @@ -46,18 +47,18 @@ pub(crate) trait Semaphore {
}

pub(super) struct Chan<T, S> {
/// Handle to the push half of the lock-free list.
tx: CachePadded<list::Tx<T>>,

/// Receiver waker. Notified when a value is pushed into the channel.
rx_waker: CachePadded<AtomicWaker>,

/// Notifies all tasks listening for the receiver being dropped.
notify_rx_closed: Notify,

/// Handle to the push half of the lock-free list.
tx: list::Tx<T>,

/// Coordinates access to channel's capacity.
semaphore: S,

/// Receiver waker. Notified when a value is pushed into the channel.
rx_waker: AtomicWaker,

/// Tracks the number of outstanding sender handles.
///
/// When this drops to zero, the send half of the channel is closed.
Expand All @@ -73,9 +74,9 @@ where
{
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Chan")
.field("tx", &self.tx)
.field("tx", &*self.tx)
.field("semaphore", &self.semaphore)
.field("rx_waker", &self.rx_waker)
.field("rx_waker", &*self.rx_waker)
.field("tx_count", &self.tx_count)
.field("rx_fields", &"...")
.finish()
Expand Down Expand Up @@ -108,9 +109,9 @@ pub(crate) fn channel<T, S: Semaphore>(semaphore: S) -> (Tx<T, S>, Rx<T, S>) {

let chan = Arc::new(Chan {
notify_rx_closed: Notify::new(),
tx,
tx: CachePadded::new(tx),
semaphore,
rx_waker: AtomicWaker::new(),
rx_waker: CachePadded::new(AtomicWaker::new()),
tx_count: AtomicUsize::new(1),
rx_fields: UnsafeCell::new(RxFields {
list: rx,
Expand Down
95 changes: 95 additions & 0 deletions tokio/src/util/cacheline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
use std::ops::{Deref, DerefMut};

/// Pads and aligns a value to the length of a cache line.
#[derive(Clone, Copy, Default, Hash, PartialEq, Eq)]
// Starting from Intel's Sandy Bridge, spatial prefetcher is now pulling pairs of 64-byte cache
// lines at a time, so we have to align to 128 bytes rather than 64.
//
// Sources:
// - https://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
// - https://github.com/facebook/folly/blob/1b5288e6eea6df074758f877c849b6e73bbb9fbb/folly/lang/Align.h#L107
//
// ARM's big.LITTLE architecture has asymmetric cores and "big" cores have 128-byte cache line size.
//
// Sources:
// - https://www.mono-project.com/news/2016/09/12/arm64-icache/
//
// powerpc64 has 128-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_ppc64x.go#L9
#[cfg_attr(
any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
),
repr(align(128))
)]
// arm, mips, mips64, and riscv64 have 32-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_arm.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mipsle.go#L7
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_mips64x.go#L9
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_riscv64.go#L7
#[cfg_attr(
any(
target_arch = "arm",
target_arch = "mips",
target_arch = "mips64",
target_arch = "riscv64",
),
repr(align(32))
)]
// s390x has 256-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_s390x.go#L7
#[cfg_attr(target_arch = "s390x", repr(align(256)))]
// x86 and wasm have 64-byte cache line size.
//
// Sources:
// - https://github.com/golang/go/blob/dda2991c2ea0c5914714469c4defc2562a907230/src/internal/cpu/cpu_x86.go#L9
// - https://github.com/golang/go/blob/3dd58676054223962cd915bb0934d1f9f489d4d2/src/internal/cpu/cpu_wasm.go#L7
//
// All others are assumed to have 64-byte cache line size.
#[cfg_attr(
not(any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
target_arch = "arm",
target_arch = "mips",
target_arch = "mips64",
target_arch = "riscv64",
target_arch = "s390x",
)),
repr(align(64))
)]
pub(crate) struct CachePadded<T> {
value: T,
}

impl<T> CachePadded<T> {
/// Pads and aligns a value to the length of a cache line.
pub(crate) fn new(value: T) -> CachePadded<T> {
CachePadded::<T> { value }
}
}

impl<T> Deref for CachePadded<T> {
type Target = T;

fn deref(&self) -> &T {
&self.value
}
}

impl<T> DerefMut for CachePadded<T> {
fn deref_mut(&mut self) -> &mut T {
&mut self.value
}
}
2 changes: 2 additions & 0 deletions tokio/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,5 @@ pub(crate) mod error;
pub(crate) mod memchr;

pub(crate) mod markers;

pub(crate) mod cacheline;
Loading