Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement <stop_token> and jthread. #1196

Merged
merged 21 commits into from
Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
a7e2c93
Implement `<stop_token>` and `jthread`.
BillyONeal Aug 14, 2020
c8b2db3
Bill didn't name wait correctly.
BillyONeal Aug 14, 2020
f0554d2
Fix deadlock found by tests that came with the reference implementation.
BillyONeal Aug 14, 2020
a1886a0
Fix a nullptr dereference found by tests from the reference implement…
BillyONeal Aug 14, 2020
8b26e2a
Fix small CR nitpicks and back off on memory_order aggressiveness.
BillyONeal Aug 17, 2020
01fc42c
Tests pass tests pass woo woo nice nice
BillyONeal Aug 25, 2020
c28e072
Add all the missing [replacement.functions].
BillyONeal Aug 25, 2020
f2bf64b
Test and comment nitpicks.
BillyONeal Aug 25, 2020
9cf088f
Fix CV race wherein missed wakes could result in deadlock, see exampl…
BillyONeal Aug 18, 2020
8854f4f
Compiler errors :/
BillyONeal Aug 18, 2020
338249d
More compiler errors.
BillyONeal Aug 18, 2020
7aa09ca
More test fixes.
BillyONeal Aug 25, 2020
7334bae
Make test more resilient to differences between system_clock and stea…
BillyONeal Aug 25, 2020
1a4b5cb
Ignore timeout status from the underlying CV in cv_any because the re…
BillyONeal Aug 26, 2020
66a14c2
Repair rebase damage.
BillyONeal Aug 26, 2020
e416bbd
Fix ill-formed constexpr, more rebase damage.
BillyONeal Aug 26, 2020
c38d004
No nodiscard.
BillyONeal Aug 26, 2020
b6e6e46
Merge remote-tracking branch 'origin/master' into stop_token
BillyONeal Sep 18, 2020
02d9922
Resolve more CR comments.
BillyONeal Sep 18, 2020
d4778ed
Sort stl/CMakeLists.txt.
StephanTLavavej Sep 19, 2020
0cbc067
Fix compiler errors in test.
StephanTLavavej Sep 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions stl/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ set(HEADERS
${CMAKE_CURRENT_LIST_DIR}/inc/span
${CMAKE_CURRENT_LIST_DIR}/inc/sstream
${CMAKE_CURRENT_LIST_DIR}/inc/stack
${CMAKE_CURRENT_LIST_DIR}/inc/stop_token
StephanTLavavej marked this conversation as resolved.
Show resolved Hide resolved
${CMAKE_CURRENT_LIST_DIR}/inc/stdexcept
${CMAKE_CURRENT_LIST_DIR}/inc/streambuf
${CMAKE_CURRENT_LIST_DIR}/inc/string
Expand Down
1 change: 1 addition & 0 deletions stl/inc/__msvc_all_public_headers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
#include <barrier>
#include <latch>
#include <semaphore>
#include <stop_token>
#endif // _M_CEE_PURE

#ifndef _M_CEE
Expand Down
63 changes: 63 additions & 0 deletions stl/inc/atomic
Original file line number Diff line number Diff line change
Expand Up @@ -2970,6 +2970,69 @@ inline void atomic_flag_notify_all(volatile atomic_flag* const _Flag) noexcept {
inline void atomic_flag_notify_all(atomic_flag* const _Flag) noexcept {
return _Flag->notify_all();
}

template <class _Ty>
class _Locked_pointer {
public:
static_assert(alignof(_Ty) >= (1 << 2), "2 low order bits are needed by _Locked_pointer");
static constexpr uintptr_t _Lock_mask = 3;
static constexpr uintptr_t _Not_locked = 0;
static constexpr uintptr_t _Locked_notify_not_needed = 1;
static constexpr uintptr_t _Locked_notify_needed = 2;
StephanTLavavej marked this conversation as resolved.
Show resolved Hide resolved
static constexpr uintptr_t _Ptr_value_mask = ~_Lock_mask;

constexpr _Locked_pointer() noexcept : _Storage{} {}
explicit _Locked_pointer(_Ty* const _Ptr) noexcept : _Storage{reinterpret_cast<uintptr_t>(_Ptr)} {}

_Locked_pointer(const _Locked_pointer&) = delete;
_Locked_pointer& operator=(const _Locked_pointer&) = delete;

_NODISCARD _Ty* _Lock_and_load() noexcept {
uintptr_t _Rep = _Storage.load(memory_order_relaxed);
for (;;) {
switch (_Rep & _Lock_mask) {
case _Not_locked: // Can try to lock now
if (_Storage.compare_exchange_weak(_Rep, _Rep | _Locked_notify_not_needed)) {
return reinterpret_cast<_Ty*>(_Rep);
}
_YIELD_PROCESSOR();
break;

case _Locked_notify_not_needed: // Try to set "notify needed" and wait
if (!_Storage.compare_exchange_weak(_Rep, (_Rep & _Ptr_value_mask) | _Locked_notify_needed)) {
// Failed to set notify needed flag, try again
_YIELD_PROCESSOR();
break;
}
_Rep = (_Rep & _Ptr_value_mask) | _Locked_notify_needed;
[[fallthrough]];

case _Locked_notify_needed: // "Notify needed" is already set, just wait
_Storage.wait(_Rep, memory_order_relaxed);
_Rep = _Storage.load(memory_order_relaxed);
break;

default: // Unrecognized bit pattern
_CSTD abort();
}
}
}

void _Store_and_unlock(_Ty* const _Value) noexcept {
const auto _Rep = _Storage.exchange(reinterpret_cast<uintptr_t>(_Value));
if ((_Rep & _Lock_mask) == _Locked_notify_needed) {
// As we don't count waiters, every waiter is notified, and then some may re-request notification
_Storage.notify_all();
}
}

_NODISCARD _Ty* _Unsafe_load_relaxed() const noexcept {
return reinterpret_cast<_Ty*>(_Storage.load(memory_order_relaxed));
}

private:
atomic<uintptr_t> _Storage;
};
#endif // _HAS_CXX20

_STD_END
Expand Down
156 changes: 126 additions & 30 deletions stl/inc/condition_variable
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
#include <memory>
#include <mutex>
#include <xthreads.h>
#if _HAS_CXX20
#include <stop_token>
#endif // _HAS_CXX20

#pragma pack(push, _CRT_PACKING)
#pragma warning(push, _STL_WARNING_LEVEL)
Expand All @@ -25,6 +28,27 @@ _STL_DISABLE_CLANG_WARNINGS
#endif // _M_CEE

_STD_BEGIN
template <class _Lock>
struct _Unlock_guard {
explicit _Unlock_guard(_Lock& _Mtx_) : _Mtx(_Mtx_) {
_Mtx.unlock();
}

~_Unlock_guard() noexcept /* terminates */ {
// relock mutex or terminate()
// condition_variable_any wait functions are required to terminate if
// the mutex cannot be relocked;
// we slam into noexcept here for easier user debugging.
_Mtx.lock();
}

_Unlock_guard(const _Unlock_guard&) = delete;
_Unlock_guard& operator=(const _Unlock_guard&) = delete;

private:
_Lock& _Mtx;
};

class condition_variable_any { // class for waiting for conditions with any kind of mutex
public:
condition_variable_any() : _Myptr{_STD make_shared<mutex>()} {
Expand All @@ -50,20 +74,17 @@ public:

template <class _Lock>
void wait(_Lock& _Lck) noexcept /* terminates */ { // wait for signal
{
const shared_ptr<mutex> _Ptr = _Myptr; // for immunity to *this destruction
lock_guard<mutex> _Guard{*_Ptr};
_Lck.unlock();
_Cnd_wait(_Mycnd(), _Ptr->_Mymtx());
} // unlock

_Lck.lock();
}
const shared_ptr<mutex> _Ptr = _Myptr; // for immunity to *this destruction
unique_lock<mutex> _Guard{*_Ptr};
_Unlock_guard<_Lock> _Unlock_outer{_Lck};
_Cnd_wait(_Mycnd(), _Ptr->_Mymtx());
_Guard.unlock();
} // relock _Lck

template <class _Lock, class _Predicate>
void wait(_Lock& _Lck, _Predicate _Pred) noexcept(noexcept(!_Pred())) /* strengthened */ {
void wait(_Lock& _Lck, _Predicate _Pred) noexcept(noexcept(static_cast<bool>(_Pred()))) /* strengthened */ {
// wait for signal and check predicate
while (!_Pred()) {
while (!static_cast<bool>(_Pred())) {
wait(_Lck);
}
}
Expand All @@ -89,8 +110,8 @@ public:
template <class _Lock, class _Rep, class _Period>
cv_status wait_for(_Lock& _Lck, const chrono::duration<_Rep, _Period>& _Rel_time) { // wait for duration
if (_Rel_time <= chrono::duration<_Rep, _Period>::zero()) {
_Lck.unlock();
_Relock(_Lck);
_Unlock_guard<_Lock> _Unlock_outer{_Lck};
(void) _Unlock_outer;
Copy link
Member

@CaseyCarter CaseyCarter Aug 26, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this discarded void cast? Did a compiler complain about _Unlock_outer being unused?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return cv_status::timeout;
}

Expand Down Expand Up @@ -128,6 +149,93 @@ public:
return true;
}

#if _HAS_CXX20
private:
struct _Cv_any_notify_all {
condition_variable_any* _This;

explicit _Cv_any_notify_all(condition_variable_any* _This_) : _This{_This_} {}

_Cv_any_notify_all(const _Cv_any_notify_all&) = delete;
_Cv_any_notify_all& operator=(const _Cv_any_notify_all&) = delete;

void operator()() const noexcept {
_This->notify_all();
}
Comment on lines +162 to +164

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to take out a lock on the internal mutex before calling notify_all() here to avoid a missed wake-up.

eg. consider in the wait() implementation below that the call to .request_stop() occurs after the check of _Stoken.stop_requested() and before the call to wait(_Lck).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

notify_all already takes the internal mutex.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to take the internal mutex before creating _Cb below...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would explain why this was added to cv_any and not plain cv.

Although I think there might be a spec defect here: If the registration for cancel is supposed to be part of the atomic operation that includes unlocking _Lck and going to sleep, the spec doesn't say that right now.

Copy link
Member Author

@BillyONeal BillyONeal Aug 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is broken in the spec. I asked SG1 to comment:

{
    condition_variable_any cv;
    mutex m;
    jthread j{[&](stop_token token) {
        unique_lock lck{m};
        cv.wait(lck, token, [] { return false; });
    }};
} // destroy j

This program can deadlock under the current spec; there is nothing forbidding the following execution:

T1: launches T2
T2: takes m
T2: [thread.condvarany.intwait]/2 "Registers for the duration of this call *this to get notified on a stop request on stoken during this call", stop is not requested so execution continues
T2: while (!stoken.stop_requested()) {
T2: if (pred()) (pred returns false)
T1: j.~jthread
T1: request_stop(), calls cv.notify_all()
T2: wait(lck)
T1: join()
Deadlock.

Copy link

@lewissbaker lewissbaker Aug 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This program can deadlock under the current spec

Can you elaborate on what causes the deadlock in this example?
I'm struggling to see where the deadlock occurs.
AFAICT, if request_stop(), and thus cv.notify_all(), has returned then T1 is no longer holding any resources that T2 is waiting on.

Maybe we need to take the internal mutex before creating _Cb below...

You don't want to do that. The callback might execute inline during construction of _Cb, which would then call notify_all(), which then also takes out a lock on the internal mutex. This would be UB to have the calling thread attempt to lock the mutex twice.

Copy link
Member Author

@BillyONeal BillyONeal Aug 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on what causes the deadlock in this example?

After the execution I enumerated, T2 is waiting on the CV, and T1 is waiting on T2, and nothing will ever notify the CV. Note that when T1 tries to notify_all, T2 isn't waiting on the CV yet.

if request_stop(), and thus cv.notify_all(), has returned then T1 is no longer holding any resources that T2 is waiting on.

Correct, T1 is not holding any such resources. T2 isn't waiting on resources. But T2 is waiting to be notified.

You don't want to do that. The callback might execute inline during construction of _Cb, which would then call notify_all(), which then also takes out a lock on the internal mutex.

I see, we need something more complex :/

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reference implementation indeed does something a bit tricker to ensure it doesn't miss wakeup notifications.
See https://github.com/josuttis/jthread/blob/master/source/condition_variable_any2.hpp#L216-L238

It first locks the internal mutex and then checks stoken.stop_requested().
This prevents the missed wakeup if request_stop() is called between the evaluation of pred() and wait(lck).
The registered callback will block until wait() releases the internal mutex before calling .notify_all().

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reference implementation indeed does something a bit tricker to ensure it doesn't miss wakeup notifications.

I'm aggressively trying to not look at the reference implementation because licensing scary.

It first locks the internal mutex and then checks stoken.stop_requested().

Right, that's the exact fix. But nothing in the spec requires that right now.

};

public:
template <class _Lock, class _Predicate>
bool wait(_Lock& _Lck, stop_token _Stoken, _Predicate _Pred) noexcept(
noexcept(static_cast<bool>(_Pred()))) /* strengthened */ {
// TRANSITION, ABI: Due to the unsynchronized delivery of notify_all by _Stoken,
// this implementation cannot tolerate *this destruction while an interruptible wait
// is outstanding. A future ABI should store both the internal CV and internal mutex
// in the reference counted block to allow this.
stop_callback<_Cv_any_notify_all> _Cb{_Stoken, this};
for (;;) {
if (_Pred()) {
return true;
}

unique_lock<mutex> _Guard{*_Myptr};
if (_Stoken.stop_requested()) {
_Guard.unlock();
return _Pred();
}

_Unlock_guard<_Lock> _Unlock_outer{_Lck};
_Cnd_wait(_Mycnd(), _Myptr->_Mymtx());
_Guard.unlock();
} // relock
}

template <class _Lock, class _Clock, class _Duration, class _Predicate>
bool wait_until(
_Lock& _Lck, stop_token _Stoken, const chrono::time_point<_Clock, _Duration>& _Abs_time, _Predicate _Pred) {
stop_callback<_Cv_any_notify_all> _Cb{_Stoken, this};
for (;;) {
if (_Pred()) {
return true;
}

unique_lock<mutex> _Guard{*_Myptr};
if (_Stoken.stop_requested()) {
break;
}

_Unlock_guard<_Lock> _Unlock_outer{_Lck};
const auto _Now = _Clock::now();
if (_Now >= _Abs_time) {
break;
}

const auto _Rel_time = _Abs_time - _Now;
// TRANSITION, ABI: The standard says that we should use a steady clock,
// but unfortunately our ABI speaks struct xtime, which is relative to the system clock.
_CSTD xtime _Tgt;
(void) _To_xtime_10_day_clamped(_Tgt, _Rel_time);
const int _Res = _Cnd_timedwait(_Mycnd(), _Myptr->_Mymtx(), &_Tgt);
_Guard.unlock();

switch (_Res) {
case _Thrd_timedout:
case _Thrd_success:
break;
default:
_Throw_C_error(_Res);
}
} // relock

return _Pred();
}

template <class _Lock, class _Rep, class _Period, class _Predicate>
bool wait_for(_Lock& _Lck, stop_token _Stoken, const chrono::duration<_Rep, _Period>& _Rel_time, _Predicate _Pred) {
return wait_until(_Lck, _STD move(_Stoken), chrono::steady_clock::now() + _Rel_time, _STD move(_Pred));
}
#endif // _HAS_CXX20

private:
shared_ptr<mutex> _Myptr;

Expand All @@ -139,16 +247,11 @@ private:

template <class _Lock>
cv_status _Wait_until(_Lock& _Lck, const xtime* const _Abs_time) { // wait for signal with timeout
int _Res;

{
const shared_ptr<mutex> _Ptr = _Myptr; // for immunity to *this destruction
lock_guard<mutex> _Guard{*_Ptr};
_Lck.unlock();
_Res = _Cnd_timedwait(_Mycnd(), _Ptr->_Mymtx(), _Abs_time);
} // unlock

_Relock(_Lck);
const shared_ptr<mutex> _Ptr = _Myptr; // for immunity to *this destruction
unique_lock<mutex> _Guard{*_Ptr};
_Unlock_guard<_Lock> _Unlock_outer{_Lck};
const int _Res = _Cnd_timedwait(_Mycnd(), _Ptr->_Mymtx(), _Abs_time);
_Guard.unlock();

switch (_Res) {
case _Thrd_success:
Expand All @@ -159,13 +262,6 @@ private:
_Throw_C_error(_Res);
}
}

template <class _Lock>
static void _Relock(_Lock& _Lck) noexcept /* terminates */ { // relock external mutex or terminate()
// Wait functions are required to terminate if the mutex cannot be locked;
// we slam into noexcept here for easier user debugging.
_Lck.lock();
}
};

inline void notify_all_at_thread_exit(condition_variable& _Cnd, unique_lock<mutex> _Lck) {
Expand Down
Loading