Skip to content

Commit

Permalink
feat: add loom test for the counter (#6888)
Browse files Browse the repository at this point in the history
Basically same as #6822. But we reuse the Counter to avoid code duplicate. Copy the content here.

Use https://github.com/tokio-rs/loom for concurrency test. Maybe can make the correctness more confident. But I'm not so sure how to write the best test.

it requires some code change for previous structure, so may need discuss:

remove workspacke hack in task_stats_alloc crate. Otherwise there will be package conflict if you run RUSTFLAGS="--cfg loom" cargo test --test loom. Have not investigate deeply but simple remove just works.

Refactor &'static AtomicUsize to be NonNull<AtomicUsize>. This is because the AtomicUsize in loom type do not support .as_mut_ptr (described in What to do when loom::AtomicUsize do not implement as_mut_ptr() tokio-rs/loom#298), so we use NonNull as intermediate workaround, that will add some unsafe code in .add() and .sub(). But seems OK.

To test the drop, I have to add a flag var AtomicUsize in .sub() to ensure that the value is dropped. Please provide some suggestions if you have better ideas on how to write test.


Approved-By: BugenZhao
Approved-By: liurenjie1024

Co-Authored-By: BowenXiao1999 <931759898@qq.com>
Co-Authored-By: Bowen <36908971+BowenXiao1999@users.noreply.github.com>
  • Loading branch information
BowenXiao1999 and BowenXiao1999 authored Dec 15, 2022
1 parent d2dfad3 commit 2235e84
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 14 deletions.
80 changes: 79 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion src/utils/task_stats_alloc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
"time",
"signal",
] }
workspace-hack = { path = "../../workspace-hack" }

[dev-dependencies]


[target.'cfg(loom)'.dependencies]
loom = {version = "0.5", features = ["futures", "checkpoint"]}
36 changes: 24 additions & 12 deletions src/utils/task_stats_alloc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,28 @@

use std::alloc::{GlobalAlloc, Layout, System};
use std::future::Future;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;

use tokio::task_local;

/// If you change the code in this struct, pls re-run the `tests/loom.rs` test locally.
#[repr(transparent)]
#[derive(Clone, Copy, Debug)]
pub struct TaskLocalBytesAllocated(Option<&'static AtomicUsize>);
pub struct TaskLocalBytesAllocated(Option<NonNull<AtomicUsize>>);

impl Default for TaskLocalBytesAllocated {
fn default() -> Self {
Self(Some(Box::leak(Box::new_in(0.into(), System))))
Self(Some(
NonNull::new(Box::into_raw(Box::new_in(0.into(), System))).unwrap(),
))
}
}

// Need this otherwise the NonNull is not Send and can not be used in future.
unsafe impl Send for TaskLocalBytesAllocated {}

impl TaskLocalBytesAllocated {
pub fn new() -> Self {
Self::default()
Expand All @@ -45,9 +52,10 @@ impl TaskLocalBytesAllocated {

/// Adds to the current counter.
#[inline(always)]
fn add(&self, val: usize) {
pub fn add(&self, val: usize) {
if let Some(bytes) = self.0 {
bytes.fetch_add(val, Ordering::Relaxed);
let bytes_ref = unsafe { bytes.as_ref() };
bytes_ref.fetch_add(val, Ordering::Relaxed);
}
}

Expand All @@ -57,33 +65,37 @@ impl TaskLocalBytesAllocated {
/// The caller must ensure that `self` is valid.
#[inline(always)]
unsafe fn add_unchecked(&self, val: usize) {
self.0.unwrap_unchecked().fetch_add(val, Ordering::Relaxed);
let bytes = self.0.unwrap_unchecked();
let bytes_ref = unsafe { bytes.as_ref() };
bytes_ref.fetch_add(val, Ordering::Relaxed);
}

/// Subtracts from the counter value, and `drop` the counter while the count reaches zero.
#[inline(always)]
fn sub(&self, val: usize) {
pub fn sub(&self, val: usize) -> bool {
if let Some(bytes) = self.0 {
// Use `Relaxed` order as we don't need to sync read/write with other memory addresses.
// Accesses to the counter itself are serialized by atomic operations.
let old_bytes = bytes.fetch_sub(val, Ordering::Relaxed);
let bytes_ref = unsafe { bytes.as_ref() };
let old_bytes = bytes_ref.fetch_sub(val, Ordering::Relaxed);
// If the counter reaches zero, delete the counter. Note that we've ensured there's no
// zero deltas in `wrap_layout`, so there'll be no more uses of the counter.
if old_bytes == val {
// No fence here, this is different from ref counter impl in https://www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html#boost_atomic.usage_examples.example_reference_counters.
// As here, T is the exactly Counter and they have same memory address, so there
// should not happen out-of-order commit.
unsafe { Box::from_raw_in(bytes.as_mut_ptr(), System) };
unsafe { Box::from_raw_in(bytes.as_ptr(), System) };
return true;
}
}
false
}

#[inline(always)]
pub fn val(&self) -> usize {
self.0
.as_ref()
.expect("bytes is invalid")
.load(Ordering::Relaxed)
let bytes_ref = self.0.as_ref().expect("bytes is invalid");
let bytes_ref = unsafe { bytes_ref.as_ref() };
bytes_ref.load(Ordering::Relaxed)
}
}

Expand Down
50 changes: 50 additions & 0 deletions src/utils/task_stats_alloc/tests/loom.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#![cfg(loom)]

/// Note this test is not running in CI, due to the re-compile time cost. Add it when it is
/// necessary. Run `RUSTFLAGS="--cfg loom" cargo test --test loom` to test.
use loom::sync::Arc;
use loom::thread;
use task_stats_alloc::TaskLocalBytesAllocated;

#[test]
fn test_to_avoid_double_drop() {
loom::model(|| {
let bytes_num = 3;
let num = Arc::new(TaskLocalBytesAllocated::new());

let threads: Vec<_> = (0..bytes_num)
.map(|_| {
let num = num.clone();
thread::spawn(move || {
num.add(1);
num.sub(1)
})
})
.collect();

// How many times the bytes have been dropped.
let mut drop_num = 0;
for t in threads {
if t.join().unwrap() {
drop_num += 1;
}
}

// Ensure the counter is dropped.
assert_eq!(drop_num, 1);
});
}

0 comments on commit 2235e84

Please sign in to comment.