Skip to content

Commit

Permalink
Requiring executor tasks are noexcept. (#528)
Browse files Browse the repository at this point in the history
* Requiring `noexcept` for executor tasks.
* Requiring C++17.
* `task<>` is now a type alias for `task_<>` and supports `noexcept` invocable tasks.
  • Loading branch information
sean-parent authored Aug 22, 2023
1 parent 31a8282 commit 8300eb9
Show file tree
Hide file tree
Showing 19 changed files with 267 additions and 310 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/stlab.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
pull_request:
push:
branches:
- main
- main

jobs:
generate-matrix:
Expand Down Expand Up @@ -67,7 +67,7 @@ jobs:
./emsdk install latest
./emsdk activate latest
echo 'source "$HOME/emsdk/emsdk_env.sh"' >> $HOME/.bash_profile
# Override Emsdk's bundled node (14.18.2) to the GH Actions system installation (>= 16.16.0)
sed -i "/^NODE_JS = .*/c\NODE_JS = '`which node`'" .emscripten
echo "Overwrote .emscripten config file to:"
Expand Down Expand Up @@ -122,7 +122,7 @@ jobs:
run: |
call "C:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\VC\Auxiliary\Build\vcvarsall.bat" x64
mkdir ..\build
cmake -S. -B../build -GNinja -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_STANDARD=23 -DCMAKE_TOOLCHAIN_FILE=C:/vcpkg/scripts/buildsystems/vcpkg.cmake
cmake -S. -B../build -GNinja -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_STANDARD=20 -DCMAKE_TOOLCHAIN_FILE=C:/vcpkg/scripts/buildsystems/vcpkg.cmake
- name: Build // Unix
if: ${{ startsWith(matrix.config.os, 'ubuntu') || startsWith(matrix.config.os, 'macos') }}
Expand Down
128 changes: 63 additions & 65 deletions stlab/concurrency/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ auto set_process_error(P& process, std::exception_ptr&& error)
}

template <typename P>
auto set_process_error(P&, std::exception_ptr &&)
auto set_process_error(P&, std::exception_ptr&&)
-> std::enable_if_t<!has_set_process_error_v<unwrap_reference_t<P>>, void> {}

/**************************************************************************************************/
Expand Down Expand Up @@ -597,8 +597,8 @@ template <typename Q, typename T, typename R, typename Arg, std::size_t I, typen
struct shared_process_sender_indexed : public shared_process_sender<Arg> {
shared_process<Q, T, R, Args...>& _shared_process;

explicit
shared_process_sender_indexed(shared_process<Q, T, R, Args...>& sp) : _shared_process(sp) {}
explicit shared_process_sender_indexed(shared_process<Q, T, R, Args...>& sp) :
_shared_process(sp) {}

void add_sender() override { ++_shared_process._sender_count; }

Expand Down Expand Up @@ -652,9 +652,7 @@ struct shared_process_sender_helper;
template <typename Q, typename T, typename R, std::size_t... I, typename... Args>
struct shared_process_sender_helper<Q, T, R, std::index_sequence<I...>, Args...>
: shared_process_sender_indexed<Q, T, R, Args, I, Args...>... {

explicit
shared_process_sender_helper(shared_process<Q, T, R, Args...>& sp) :
explicit shared_process_sender_helper(shared_process<Q, T, R, Args...>& sp) :
shared_process_sender_indexed<Q, T, R, Args, I, Args...>(sp)... {}
};

Expand Down Expand Up @@ -768,8 +766,6 @@ struct shared_process

const std::tuple<std::shared_ptr<shared_process_receiver<Args>>...> _upstream;



template <typename E, typename F>
shared_process(E&& e, F&& f) :
shared_process_sender_helper<Q, T, R, std::make_index_sequence<sizeof...(Args)>, Args...>(
Expand Down Expand Up @@ -954,12 +950,12 @@ struct shared_process
std::chrono::nanoseconds::min())
broadcast(unwrap(*_process).yield());
else
execute_at(duration,
_executor)([_weak_this = make_weak_ptr(this->shared_from_this())] {
auto _this = _weak_this.lock();
if (!_this) return;
_this->try_broadcast();
});
execute_at(duration, _executor)(
[_weak_this = make_weak_ptr(this->shared_from_this())]() noexcept {
auto _this = _weak_this.lock();
if (!_this) return;
_this->try_broadcast();
});
}

/*
Expand All @@ -978,25 +974,25 @@ struct shared_process
} else {
/* Schedule a timeout. */
_timeout_function_active = true;
execute_at(duration,
_executor)([_weak_this = make_weak_ptr(this->shared_from_this())] {
auto _this = _weak_this.lock();
// It may be that the complete channel is gone in the meanwhile
if (!_this) return;

// try_lock can fail spuriously
while (true) {
lock_t lock(_this->_timeout_function_control, std::try_to_lock);
if (!lock) continue;

// we were cancelled
if (get_process_state(_this->_process).first != process_state::yield) {
_this->try_broadcast();
_this->_timeout_function_active = false;
execute_at(duration, _executor)(
[_weak_this = make_weak_ptr(this->shared_from_this())]() noexcept {
auto _this = _weak_this.lock();
// It may be that the complete channel is gone in the meanwhile
if (!_this) return;

// try_lock can fail spuriously
while (true) {
lock_t lock(_this->_timeout_function_control, std::try_to_lock);
if (!lock) continue;

// we were cancelled
if (get_process_state(_this->_process).first != process_state::yield) {
_this->try_broadcast();
_this->_timeout_function_active = false;
}
return;
}
return;
}
});
});
}
} catch (...) { // this catches exceptions during _process.await() and _process.yield()
broadcast(std::move(std::current_exception()));
Expand Down Expand Up @@ -1038,12 +1034,12 @@ struct shared_process
std::chrono::nanoseconds::min())
broadcast(unwrap(*_process).yield());
else
execute_at(duration,
_executor)([_weak_this = make_weak_ptr(this->shared_from_this())] {
auto _this = _weak_this.lock();
if (!_this) return;
_this->try_broadcast();
});
execute_at(duration, _executor)(
[_weak_this = make_weak_ptr(this->shared_from_this())]() noexcept {
auto _this = _weak_this.lock();
if (!_this) return;
_this->try_broadcast();
});
}

/*
Expand All @@ -1062,25 +1058,25 @@ struct shared_process
} else {
/* Schedule a timeout. */
_timeout_function_active = true;
execute_at(duration,
_executor)([_weak_this = make_weak_ptr(this->shared_from_this())] {
auto _this = _weak_this.lock();
// It may be that the complete channel is gone in the meanwhile
if (!_this) return;

// try_lock can fail spuriously
while (true) {
lock_t lock(_this->_timeout_function_control, std::try_to_lock);
if (!lock) continue;

// we were cancelled
if (get_process_state(_this->_process).first != process_state::yield) {
_this->try_broadcast();
_this->_timeout_function_active = false;
execute_at(duration, _executor)(
[_weak_this = make_weak_ptr(this->shared_from_this())]() noexcept {
auto _this = _weak_this.lock();
// It may be that the complete channel is gone in the meanwhile
if (!_this) return;

// try_lock can fail spuriously
while (true) {
lock_t lock(_this->_timeout_function_control, std::try_to_lock);
if (!lock) continue;

// we were cancelled
if (get_process_state(_this->_process).first != process_state::yield) {
_this->try_broadcast();
_this->_timeout_function_active = false;
}
return;
}
return;
}
});
});
}
} catch (...) { // this catches exceptions during _process.await() and _process.yield()
broadcast(std::move(std::current_exception()));
Expand All @@ -1098,10 +1094,12 @@ struct shared_process
REVISIT (sparent) : See above comments on step() and ensure consistency.
What is this code doing, if we don't have a yield then it also assumes no await?
This seems to be doing a lot for a (required) noexcept operation - are we sure?
*/

template <typename U>
auto step() -> std::enable_if_t<!has_process_yield_v<unwrap_reference_t<U>>> {
auto step() noexcept -> std::enable_if_t<!has_process_yield_v<unwrap_reference_t<U>>> {
using queue_t = typename Q::value_type;
stlab::optional<queue_t> message;
std::array<bool, sizeof...(Args)> do_cts;
Expand Down Expand Up @@ -1135,7 +1133,7 @@ struct shared_process
}

void run() {
_executor([_p = make_weak_ptr(this->shared_from_this())] {
_executor([_p = make_weak_ptr(this->shared_from_this())]() noexcept {
auto p = _p.lock();
if (p) p->template step<T>();
});
Expand Down Expand Up @@ -1352,8 +1350,7 @@ auto zip(S s, R... r) {

/**************************************************************************************************/

struct buffer_size
{
struct buffer_size {
std::size_t _value;
buffer_size(std::size_t b) : _value(b) {}
};
Expand All @@ -1378,15 +1375,16 @@ struct annotated_process {
F _f;
annotations _annotations;

explicit annotated_process(executor_task_pair<F>&& etp) : _f(std::move(etp._f)), _annotations(std::move(etp._executor)) {}
explicit annotated_process(executor_task_pair<F>&& etp) :
_f(std::move(etp._f)), _annotations(std::move(etp._executor)) {}

annotated_process(F f, const executor& e) : _f(std::move(f)), _annotations(e._executor) {}
annotated_process(F f, buffer_size bs) : _f(std::move(f)), _annotations(bs._value) {}

annotated_process(F f, executor&& e) : _f(std::move(f)), _annotations(std::move(e._executor)) {}
annotated_process(F f, annotations&& a) : _f(std::move(f)), _annotations(std::move(a)) {}
annotated_process(executor_task_pair<F>&& etp, buffer_size bs) : _f(std::move(etp._f)), _annotations(std::move(etp._executor), bs) {}

annotated_process(executor_task_pair<F>&& etp, buffer_size bs) :
_f(std::move(etp._f)), _annotations(std::move(etp._executor), bs) {}
};

template <typename B, typename E>
Expand Down Expand Up @@ -1553,8 +1551,8 @@ class STLAB_NODISCARD() receiver {
}

auto operator|(sender<T> send) {
return operator|
([_send = std::move(send)](auto&& x) { _send(std::forward<decltype(x)>(x)); });
return operator|(
[_send = std::move(send)](auto&& x) { _send(std::forward<decltype(x)>(x)); });
}
};

Expand Down
25 changes: 14 additions & 11 deletions stlab/concurrency/default_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@
#ifndef STLAB_CONCURRENCY_DEFAULT_EXECUTOR_HPP
#define STLAB_CONCURRENCY_DEFAULT_EXECUTOR_HPP

#include <stlab/concurrency/set_current_thread_name.hpp>
#include <stlab/concurrency/task.hpp>
#include <stlab/config.hpp>

#include <stlab/pre_exit.hpp>

#include <stlab/concurrency/set_current_thread_name.hpp>
#include <stlab/concurrency/task.hpp>

#include <cassert>
#include <chrono>
#include <functional>
#include <type_traits>

#if STLAB_TASK_SYSTEM(LIBDISPATCH)
#include <dispatch/dispatch.h>
Expand Down Expand Up @@ -91,7 +94,7 @@ struct executor_type {
using result_type = void;

template <typename F>
void operator()(F f) const {
auto operator()(F f) const -> std::enable_if_t<std::is_nothrow_invocable_v<F>> {
using f_t = decltype(f);

dispatch_group_async_f(detail::group()._group,
Expand Down Expand Up @@ -224,7 +227,7 @@ class waiter {
class notification_queue {
struct element_t {
std::size_t _priority;
task<void()> _task;
task<void() noexcept> _task;

template <class F>
element_t(F&& f, std::size_t priority) : _priority{priority}, _task{std::forward<F>(f)} {}
Expand All @@ -250,15 +253,15 @@ class notification_queue {
}

// Must be called under a lock with a non-empty _q, always returns a valid task
auto pop_not_empty() -> task<void()> {
auto pop_not_empty() -> task<void() noexcept> {
auto result = std::move(_q.front()._task);
std::pop_heap(begin(_q), end(_q), element_t::greater());
_q.pop_back();
return result;
}

public:
auto try_pop() -> task<void()> {
auto try_pop() -> task<void() noexcept> {
lock_t lock{_mutex, std::try_to_lock};
if (!lock || _q.empty()) return nullptr;
return pop_not_empty();
Expand All @@ -275,7 +278,7 @@ class notification_queue {
return true;
}

auto pop() -> std::pair<bool, task<void()>> {
auto pop() -> std::pair<bool, task<void() noexcept>> {
lock_t lock{_mutex};
_waiting = true;
while (_q.empty() && !_done && _waiting)
Expand Down Expand Up @@ -341,7 +344,7 @@ class priority_task_system {
void run(unsigned i) {
stlab::set_current_thread_name("cc.stlab.default_executor");
while (true) {
task<void()> f;
task<void() noexcept> f;

for (unsigned n = 0; n != _count && !f; ++n) {
f = _q[(i + n) % _count].try_pop();
Expand Down Expand Up @@ -408,7 +411,7 @@ class priority_task_system {
stlab::set_current_thread_name("cc.stlab.default_executor.expansion");

while (true) {
task<void()> f;
task<void() noexcept> f;

for (unsigned n = 0; n != _count && !f; ++n) {
f = _q[(i + n) % _count].try_pop();
Expand Down Expand Up @@ -458,7 +461,7 @@ template <executor_priority P = executor_priority::medium>
struct executor_type {
using result_type = void;

void operator()(task<void()> f) const {
void operator()(task<void() noexcept>&& f) const {
static task_system<P> only_task_system{[] {
at_pre_exit([]() noexcept { only_task_system.join(); });
return task_system<P>{};
Expand All @@ -473,7 +476,7 @@ template <executor_priority P = executor_priority::medium>
struct executor_type {
using result_type = void;

void operator()(task<void()> f) const {
void operator()(task<void() noexcept>&& f) const {
pts().execute<static_cast<std::size_t>(P)>(std::move(f));
}
};
Expand Down
14 changes: 7 additions & 7 deletions stlab/concurrency/executor_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace stlab {
inline namespace v1 {
/**************************************************************************************************/

using executor_t = std::function<void(stlab::task<void()>)>;
using executor_t = std::function<void(stlab::task<void() noexcept>)>;

/*
* returns an executor that will schedule any passed task to it to execute
Expand All @@ -35,16 +35,17 @@ template <typename Rep, typename Per = std::ratio<1>>
executor_t execute_at(std::chrono::duration<Rep, Per> duration, executor_t executor) {
return [_duration = std::move(duration), _executor = std::move(executor)](auto f) mutable {
if (_duration != std::chrono::duration<Rep, Per>{})
system_timer(_duration, [_f = std::move(f), _executor = std::move(_executor)]() mutable {
_executor(std::move(_f));
});
system_timer(_duration,
[_f = std::move(f), _executor = std::move(_executor)]() mutable noexcept {
_executor(std::move(_f));
});
else
_executor(std::move(f));
};
}

[[deprecated("Use chrono::duration as parameter instead")]]
inline executor_t execute_at(std::chrono::steady_clock::time_point when, executor_t executor) {
[[deprecated("Use chrono::duration as parameter instead")]] inline executor_t execute_at(
std::chrono::steady_clock::time_point when, executor_t executor) {
using namespace std::chrono;
return execute_at(duration_cast<nanoseconds>(when - steady_clock::now()), std::move(executor));
}
Expand Down Expand Up @@ -79,7 +80,6 @@ executor_task_pair<F> operator&(F&& f, executor e) {
return executor_task_pair<F>{std::move(e._executor), std::forward<F>(f)};
}


/**************************************************************************************************/

} // namespace v1
Expand Down
Loading

0 comments on commit 8300eb9

Please sign in to comment.