Skip to content

Commit

Permalink
Auto merge of rust-lang#3495 - RalfJung:data-race-clocks, r=RalfJung
Browse files Browse the repository at this point in the history
data_race: make the release/acquire API more clear
  • Loading branch information
bors committed Apr 20, 2024
2 parents 9713ff4 + 8cab0d5 commit 6b0ce8b
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 124 deletions.
20 changes: 10 additions & 10 deletions src/tools/miri/src/alloc_addresses/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use rustc_data_structures::fx::{FxHashMap, FxHashSet};
use rustc_span::Span;
use rustc_target::abi::{Align, HasDataLayout, Size};

use crate::*;
use crate::{concurrency::VClock, *};

use self::reuse_pool::ReusePool;

Expand Down Expand Up @@ -171,8 +171,10 @@ trait EvalContextExtPriv<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
memory_kind,
ecx.get_active_thread(),
) {
if let Some(data_race) = &ecx.machine.data_race {
data_race.validate_lock_acquire(&clock, ecx.get_active_thread());
if let Some(clock) = clock
&& let Some(data_race) = &ecx.machine.data_race
{
data_race.acquire_clock(&clock, ecx.get_active_thread());
}
reuse_addr
} else {
Expand Down Expand Up @@ -369,15 +371,13 @@ impl<'mir, 'tcx> MiriMachine<'mir, 'tcx> {
// Also remember this address for future reuse.
let thread = self.threads.get_active_thread_id();
global_state.reuse.add_addr(rng, addr, size, align, kind, thread, || {
let mut clock = concurrency::VClock::default();
if let Some(data_race) = &self.data_race {
data_race.validate_lock_release(
&mut clock,
thread,
self.threads.active_thread_ref().current_span(),
);
data_race
.release_clock(thread, self.threads.active_thread_ref().current_span())
.clone()
} else {
VClock::default()
}
clock
})
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/tools/miri/src/alloc_addresses/reuse_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,15 @@ impl ReusePool {
subpool.insert(pos, (addr, size, thread, clock));
}

/// Returns the address to use and optionally a clock we have to synchronize with.
pub fn take_addr(
&mut self,
rng: &mut impl Rng,
size: Size,
align: Align,
kind: MemoryKind,
thread: ThreadId,
) -> Option<(u64, VClock)> {
) -> Option<(u64, Option<VClock>)> {
// Determine whether we'll even attempt a reuse. As above, we don't do reuse for stack addresses.
if kind == MemoryKind::Stack || !rng.gen_bool(self.address_reuse_rate) {
return None;
Expand Down Expand Up @@ -122,6 +123,7 @@ impl ReusePool {
let (chosen_addr, chosen_size, chosen_thread, clock) = subpool.remove(idx);
debug_assert!(chosen_size >= size && chosen_addr % align.bytes() == 0);
debug_assert!(cross_thread_reuse || chosen_thread == thread);
Some((chosen_addr, clock))
// No synchronization needed if we reused from the current thread.
Some((chosen_addr, if chosen_thread == thread { None } else { Some(clock) }))
}
}
77 changes: 34 additions & 43 deletions src/tools/miri/src/concurrency/data_race.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,9 +547,9 @@ impl MemoryCellClocks {
) -> Result<(), DataRace> {
trace!("Unsynchronized read with vectors: {:#?} :: {:#?}", self, thread_clocks);
if !current_span.is_dummy() {
thread_clocks.clock[index].span = current_span;
thread_clocks.clock.index_mut(index).span = current_span;
}
thread_clocks.clock[index].set_read_type(read_type);
thread_clocks.clock.index_mut(index).set_read_type(read_type);
if self.write_was_before(&thread_clocks.clock) {
let race_free = if let Some(atomic) = self.atomic() {
// We must be ordered-after all atomic accesses, reads and writes.
Expand Down Expand Up @@ -577,7 +577,7 @@ impl MemoryCellClocks {
) -> Result<(), DataRace> {
trace!("Unsynchronized write with vectors: {:#?} :: {:#?}", self, thread_clocks);
if !current_span.is_dummy() {
thread_clocks.clock[index].span = current_span;
thread_clocks.clock.index_mut(index).span = current_span;
}
if self.write_was_before(&thread_clocks.clock) && self.read <= thread_clocks.clock {
let race_free = if let Some(atomic) = self.atomic() {
Expand Down Expand Up @@ -1701,49 +1701,34 @@ impl GlobalState {
format!("thread `{thread_name}`")
}

/// Acquire a lock, express that the previous call of
/// `validate_lock_release` must happen before this.
/// Acquire the given clock into the given thread, establishing synchronization with
/// the moment when that clock snapshot was taken via `release_clock`.
/// As this is an acquire operation, the thread timestamp is not
/// incremented.
pub fn validate_lock_acquire(&self, lock: &VClock, thread: ThreadId) {
let (_, mut clocks) = self.load_thread_state_mut(thread);
pub fn acquire_clock(&self, lock: &VClock, thread: ThreadId) {
let (_, mut clocks) = self.thread_state_mut(thread);
clocks.clock.join(lock);
}

/// Release a lock handle, express that this happens-before
/// any subsequent calls to `validate_lock_acquire`.
/// For normal locks this should be equivalent to `validate_lock_release_shared`
/// since an acquire operation should have occurred before, however
/// for futex & condvar operations this is not the case and this
/// operation must be used.
pub fn validate_lock_release(&self, lock: &mut VClock, thread: ThreadId, current_span: Span) {
let (index, mut clocks) = self.load_thread_state_mut(thread);
lock.clone_from(&clocks.clock);
clocks.increment_clock(index, current_span);
}

/// Release a lock handle, express that this happens-before
/// any subsequent calls to `validate_lock_acquire` as well
/// as any previous calls to this function after any
/// `validate_lock_release` calls.
/// For normal locks this should be equivalent to `validate_lock_release`.
/// This function only exists for joining over the set of concurrent readers
/// in a read-write lock and should not be used for anything else.
pub fn validate_lock_release_shared(
&self,
lock: &mut VClock,
thread: ThreadId,
current_span: Span,
) {
let (index, mut clocks) = self.load_thread_state_mut(thread);
lock.join(&clocks.clock);
/// Returns the `release` clock of the given thread.
/// Other threads can acquire this clock in the future to establish synchronization
/// with this program point.
pub fn release_clock(&self, thread: ThreadId, current_span: Span) -> Ref<'_, VClock> {
// We increment the clock each time this happens, to ensure no two releases
// can be confused with each other.
let (index, mut clocks) = self.thread_state_mut(thread);
clocks.increment_clock(index, current_span);
drop(clocks);
// To return a read-only view, we need to release the RefCell
// and borrow it again.
let (_index, clocks) = self.thread_state(thread);
Ref::map(clocks, |c| &c.clock)
}

/// Load the vector index used by the given thread as well as the set of vector clocks
/// used by the thread.
#[inline]
fn load_thread_state_mut(&self, thread: ThreadId) -> (VectorIdx, RefMut<'_, ThreadClockSet>) {
fn thread_state_mut(&self, thread: ThreadId) -> (VectorIdx, RefMut<'_, ThreadClockSet>) {
let index = self.thread_info.borrow()[thread]
.vector_index
.expect("Loading thread state for thread with no assigned vector");
Expand All @@ -1752,17 +1737,26 @@ impl GlobalState {
(index, clocks)
}

/// Load the vector index used by the given thread as well as the set of vector clocks
/// used by the thread.
#[inline]
fn thread_state(&self, thread: ThreadId) -> (VectorIdx, Ref<'_, ThreadClockSet>) {
let index = self.thread_info.borrow()[thread]
.vector_index
.expect("Loading thread state for thread with no assigned vector");
let ref_vector = self.vector_clocks.borrow();
let clocks = Ref::map(ref_vector, |vec| &vec[index]);
(index, clocks)
}

/// Load the current vector clock in use and the current set of thread clocks
/// in use for the vector.
#[inline]
pub(super) fn current_thread_state(
&self,
thread_mgr: &ThreadManager<'_, '_>,
) -> (VectorIdx, Ref<'_, ThreadClockSet>) {
let index = self.current_index(thread_mgr);
let ref_vector = self.vector_clocks.borrow();
let clocks = Ref::map(ref_vector, |vec| &vec[index]);
(index, clocks)
self.thread_state(thread_mgr.get_active_thread_id())
}

/// Load the current vector clock in use and the current set of thread clocks
Expand All @@ -1772,10 +1766,7 @@ impl GlobalState {
&self,
thread_mgr: &ThreadManager<'_, '_>,
) -> (VectorIdx, RefMut<'_, ThreadClockSet>) {
let index = self.current_index(thread_mgr);
let ref_vector = self.vector_clocks.borrow_mut();
let clocks = RefMut::map(ref_vector, |vec| &mut vec[index]);
(index, clocks)
self.thread_state_mut(thread_mgr.get_active_thread_id())
}

/// Return the current thread, should be the same
Expand Down
12 changes: 5 additions & 7 deletions src/tools/miri/src/concurrency/init_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub enum InitOnceStatus {
pub(super) struct InitOnce<'mir, 'tcx> {
status: InitOnceStatus,
waiters: VecDeque<InitOnceWaiter<'mir, 'tcx>>,
data_race: VClock,
clock: VClock,
}

impl<'mir, 'tcx> VisitProvenance for InitOnce<'mir, 'tcx> {
Expand All @@ -61,10 +61,8 @@ trait EvalContextExtPriv<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
let current_thread = this.get_active_thread();

if let Some(data_race) = &this.machine.data_race {
data_race.validate_lock_acquire(
&this.machine.threads.sync.init_onces[id].data_race,
current_thread,
);
data_race
.acquire_clock(&this.machine.threads.sync.init_onces[id].clock, current_thread);
}
}

Expand Down Expand Up @@ -176,7 +174,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {

// Each complete happens-before the end of the wait
if let Some(data_race) = &this.machine.data_race {
data_race.validate_lock_release(&mut init_once.data_race, current_thread, current_span);
init_once.clock.clone_from(&data_race.release_clock(current_thread, current_span));
}

// Wake up everyone.
Expand All @@ -202,7 +200,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {

// Each complete happens-before the end of the wait
if let Some(data_race) = &this.machine.data_race {
data_race.validate_lock_release(&mut init_once.data_race, current_thread, current_span);
init_once.clock.clone_from(&data_race.release_clock(current_thread, current_span));
}

// Wake up one waiting thread, so they can go ahead and try to init this.
Expand Down
Loading

0 comments on commit 6b0ce8b

Please sign in to comment.