From 2f52e9b86ac37312d01fd1a6327c63d395cf6ffd Mon Sep 17 00:00:00 2001 From: Nils Deppe Date: Tue, 30 May 2023 13:31:08 -0700 Subject: [PATCH] Add a static single-producer single-consumer queue --- src/Parallel/CMakeLists.txt | 1 + src/Parallel/StaticSpscQueue.hpp | 266 +++++++++++++++++++ tests/Unit/Parallel/CMakeLists.txt | 1 + tests/Unit/Parallel/Test_StaticSpscQueue.cpp | 107 ++++++++ tools/FileTestDefs.sh | 4 +- 5 files changed, 378 insertions(+), 1 deletion(-) create mode 100644 src/Parallel/StaticSpscQueue.hpp create mode 100644 tests/Unit/Parallel/Test_StaticSpscQueue.cpp diff --git a/src/Parallel/CMakeLists.txt b/src/Parallel/CMakeLists.txt index 057f9461463f..0ef4dfc5cc14 100644 --- a/src/Parallel/CMakeLists.txt +++ b/src/Parallel/CMakeLists.txt @@ -55,6 +55,7 @@ spectre_target_headers( ReductionDeclare.hpp ResourceInfo.hpp Section.hpp + StaticSpscQueue.hpp TypeTraits.hpp ) diff --git a/src/Parallel/StaticSpscQueue.hpp b/src/Parallel/StaticSpscQueue.hpp new file mode 100644 index 000000000000..8e78b1dafef6 --- /dev/null +++ b/src/Parallel/StaticSpscQueue.hpp @@ -0,0 +1,266 @@ +// Distributed under the MIT License. +// See LICENSE.txt for details. + +/* + *The original code is distributed under the following copyright and license: + * + * Copyright (c) 2020 Erik Rigtorp + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * + * + * SXS Modifications: + * 1. Casing to match SpECTRE conventions + * 2. Static capacity + * 3. Storage is std::array + * 4. Switch to west-const + * + */ + +#pragma once + +#include +#include +#include +#include // Placement new +#include +#include + +#include "Utilities/ErrorHandling/Assert.hpp" +#include "Utilities/Requires.hpp" + +namespace Parallel { +/*! + * \brief A static capacity runtime-sized single-producer single-consumer + * lockfree queue. + * + * As long as only one thread reads and writes simultaneously the queue is + * threadsafe. Which threads read and write can change throughout program + * execution, the important thing is that there is no instance during the + * execution where more than one thread tries to read and where more than one + * thread tries to write. + * + * \note This class is intentionally not serializable since handling + * threadsafety around serialization requires careful thought of the individual + * circumstances. + */ +template +class StaticSpscQueue { + private: +#ifdef __cpp_lib_hardware_interference_size + static constexpr size_t cache_line_size_ = + std::hardware_destructive_interference_size; +#else + static constexpr size_t cache_line_size_ = 64; +#endif + + // Padding to avoid false sharing between slots_ and adjacent allocations + static constexpr size_t padding_ = (cache_line_size_ - 1) / sizeof(T) + 1; + + public: + StaticSpscQueue() = default; + ~StaticSpscQueue() { + // Destruct objects in the buffer. + while (front()) { + pop(); + } + } + + StaticSpscQueue(const StaticSpscQueue&) = delete; + StaticSpscQueue& operator=(const StaticSpscQueue&) = delete; + StaticSpscQueue(StaticSpscQueue&&) = delete; + StaticSpscQueue& operator=(StaticSpscQueue&&) = delete; + + /// Construct a new element at the end of the queue in place. + /// + /// Uses placement new for in-place construction. + /// + /// \warning This may overwrite existing elements if `capacity()` is + /// exceeded without warning. + template + void emplace(Args&&... args) noexcept( + std::is_nothrow_constructible_v) { + static_assert(std::is_constructible_v, + "T must be constructible with Args&&..."); + const auto write_index = write_index_.load(std::memory_order_relaxed); + auto next_write_index = write_index + 1; + if (next_write_index == capacity_) { + next_write_index = 0; + } + while (next_write_index == read_index_cache_) { + read_index_cache_ = read_index_.load(std::memory_order_acquire); + } + new (&data_[write_index + padding_]) T(std::forward(args)...); + write_index_.store(next_write_index, std::memory_order_release); + } + + /// Construct a new element at the end of the queue in place. + /// + /// Uses placement new for in-place construction. + /// + /// Returns `true` if the emplacement succeeded and `false` if it did + /// not. If it failed then the queue is currently full. + template + [[nodiscard]] bool try_emplace(Args&&... args) noexcept( + std::is_nothrow_constructible_v) { + static_assert(std::is_constructible_v, + "T must be constructible with Args&&..."); + const auto write_index = write_index_.load(std::memory_order_relaxed); + auto next_write_index = write_index + 1; + if (next_write_index == capacity_) { + next_write_index = 0; + } + if (next_write_index == read_index_cache_) { + read_index_cache_ = read_index_.load(std::memory_order_acquire); + if (next_write_index == read_index_cache_) { + return false; + } + } + new (&data_[write_index + padding_]) T(std::forward(args)...); + write_index_.store(next_write_index, std::memory_order_release); + return true; + } + + /// Push a new element to the end of the queue. + /// + /// Uses `emplace()` internally. + /// + /// \warning This may overwrite existing elements if `capacity()` is + /// exceeded without warning. + void push(const T& v) noexcept(std::is_nothrow_copy_constructible_v) { + static_assert(std::is_copy_constructible_v, + "T must be copy constructible"); + emplace(v); + } + + /// Push a new element to the end of the queue. + /// + /// Uses `emplace()` internally. + /// + /// \warning This may overwrite existing elements if `capacity()` is + /// exceeded without warning. + template > = nullptr> + void push(P&& v) noexcept(std::is_nothrow_constructible_v) { + emplace(std::forward

(v)); + } + + /// Push a new element to the end of the queue. Returns `false` if the queue + /// is at capacity and does not push the new object, otherwise returns `true`. + /// + /// Uses `try_emplace()` internally. + [[nodiscard]] bool try_push(const T& v) noexcept( + std::is_nothrow_copy_constructible_v) { + static_assert(std::is_copy_constructible_v, + "T must be copy constructible"); + return try_emplace(v); + } + + /// Push a new element to the end of the queue. Returns `false` if the queue + /// is at capacity and does not push the new object, otherwise returns `true`. + /// + /// Uses `try_emplace()` internally. + template > = nullptr> + [[nodiscard]] bool try_push(P&& v) noexcept( + std::is_nothrow_constructible_v) { + return try_emplace(std::forward

(v)); + } + + /// Returns the first element from the queue. + /// + /// \note Returns `nullptr` if the queue is empty. + [[nodiscard]] T* front() noexcept { + const auto read_index = read_index_.load(std::memory_order_relaxed); + if (read_index == write_index_cache_) { + write_index_cache_ = write_index_.load(std::memory_order_acquire); + if (write_index_cache_ == read_index) { + return nullptr; + } + } + return &data_[read_index + padding_]; + } + + /// Removes the first element from the queue. + void pop() { + static_assert(std::is_nothrow_destructible_v, + "T must be nothrow destructible"); + const auto read_index = read_index_.load(std::memory_order_relaxed); +#ifdef SPECTRE_DEBUG + const auto write_index = write_index_.load(std::memory_order_acquire); + ASSERT(write_index != read_index, + "Can't pop an element from an empty queue. read_index: " + << read_index << " write_index " << write_index); +#endif // SPECTRE_DEBUG + data_[read_index + padding_].~T(); + auto next_read_index = read_index + 1; + if (next_read_index == capacity_) { + next_read_index = 0; + } + if (read_index == write_index_cache_) { + write_index_cache_ = next_read_index; + } + read_index_.store(next_read_index, std::memory_order_release); + } + + /// Returns the size of the queue at a particular hardware state. + /// + /// Note that while this can be checked in a threadsafe manner, it is up to + /// the user to guarantee that another thread does not change the queue + /// between when `size()` is called and how the result is used. + [[nodiscard]] size_t size() const noexcept { + std::ptrdiff_t diff = static_cast( + write_index_.load(std::memory_order_acquire)) - + static_cast( + read_index_.load(std::memory_order_acquire)); + if (diff < 0) { + diff += static_cast(capacity_); + } + return static_cast(diff); + } + + /// Returns `true` if the queue may be empty, otherwise `false`. + /// + /// Note that while this can be checked in a threadsafe manner, it is up to + /// the user to guarantee that another thread does not change the queue + /// between when `empty()` is called and how the result is used. + [[nodiscard]] bool empty() const noexcept { + return write_index_.load(std::memory_order_acquire) == + read_index_.load(std::memory_order_acquire); + } + + /// Returns the capacity of the queue. + [[nodiscard]] size_t capacity() const noexcept { return capacity_ - 1; } + + private: + static constexpr size_t capacity_ = Capacity + 1; + std::array data_{}; + + // Align to cache line size in order to avoid false sharing + // read_index_cache_ and write_index_cache_ is used to reduce the amount of + // cache coherency traffic + alignas(cache_line_size_) std::atomic write_index_{0}; + alignas(cache_line_size_) size_t read_index_cache_{0}; + alignas(cache_line_size_) std::atomic read_index_{0}; + alignas(cache_line_size_) size_t write_index_cache_{0}; + + // Padding to avoid adjacent allocations from sharing a cache line with + // write_index_cache_ + // NOLINTNEXTLINE(modernize-avoid-c-arrays) + char padding_data_[cache_line_size_ - sizeof(write_index_cache_)]{}; +}; +} // namespace Parallel diff --git a/tests/Unit/Parallel/CMakeLists.txt b/tests/Unit/Parallel/CMakeLists.txt index 843f36a1705f..fad00d10b8a8 100644 --- a/tests/Unit/Parallel/CMakeLists.txt +++ b/tests/Unit/Parallel/CMakeLists.txt @@ -167,6 +167,7 @@ set(LIBRARY_SOURCES Test_ParallelComponentHelpers.cpp Test_Phase.cpp Test_ResourceInfo.cpp + Test_StaticSpscQueue.cpp Test_TypeTraits.cpp ) diff --git a/tests/Unit/Parallel/Test_StaticSpscQueue.cpp b/tests/Unit/Parallel/Test_StaticSpscQueue.cpp new file mode 100644 index 000000000000..5466c0b93500 --- /dev/null +++ b/tests/Unit/Parallel/Test_StaticSpscQueue.cpp @@ -0,0 +1,107 @@ +// Distributed under the MIT License. +// See LICENSE.txt for details. + +#include "Framework/TestingFramework.hpp" + +#include "Parallel/StaticSpscQueue.hpp" + +SPECTRE_TEST_CASE("Unit.Parallel.StaticSpscQueue", "[Unit][Parallel]") { + // We can only test basic functionality, it's difficult to test proper + // threadsafety since that requires generating a race condition. + Parallel::StaticSpscQueue queue{}; + CHECK(queue.empty()); + CHECK(queue.size() == 0); // NOLINT + CHECK(queue.capacity() == 5); + queue.emplace(3); + CHECK_FALSE(queue.empty()); + CHECK(queue.size() == 1); + CHECK(queue.capacity() == 5); + queue.push(5); + CHECK_FALSE(queue.empty()); + CHECK(queue.size() == 2); + CHECK(queue.capacity() == 5); + const int a = 7; + queue.push(a); + CHECK_FALSE(queue.empty()); + CHECK(queue.size() == 3); + CHECK(queue.capacity() == 5); + + CHECK(queue.try_emplace(11)); + CHECK_FALSE(queue.empty()); + CHECK(queue.size() == 4); + CHECK(queue.capacity() == 5); + + CHECK(queue.try_push(15)); + CHECK_FALSE(queue.empty()); + CHECK(queue.size() == 5); + CHECK(queue.capacity() == 5); + + CHECK_FALSE(queue.try_push(a)); + CHECK_FALSE(queue.empty()); + CHECK(queue.size() == 5); + CHECK(queue.capacity() == 5); + + CHECK_FALSE(queue.try_push(19)); + CHECK_FALSE(queue.empty()); + CHECK(queue.size() == 5); + CHECK(queue.capacity() == 5); + + CHECK_FALSE(queue.try_emplace(21)); + CHECK_FALSE(queue.empty()); + CHECK(queue.size() == 5); + CHECK(queue.capacity() == 5); + + int* front = queue.front(); + REQUIRE(front != nullptr); + REQUIRE(*front == 3); + CHECK_FALSE(queue.empty()); + CHECK(queue.size() == 5); + CHECK(queue.capacity() == 5); + queue.pop(); + CHECK(queue.size() == 4); + + front = queue.front(); + REQUIRE(front != nullptr); + REQUIRE(*front == 5); + CHECK_FALSE(queue.empty()); + CHECK(queue.size() == 4); + CHECK(queue.capacity() == 5); + queue.pop(); + CHECK(queue.size() == 3); + + front = queue.front(); + REQUIRE(front != nullptr); + REQUIRE(*front == 7); + CHECK_FALSE(queue.empty()); + CHECK(queue.size() == 3); + CHECK(queue.capacity() == 5); + queue.pop(); + CHECK(queue.size() == 2); + + front = queue.front(); + REQUIRE(front != nullptr); + REQUIRE(*front == 11); + CHECK_FALSE(queue.empty()); + CHECK(queue.size() == 2); + CHECK(queue.capacity() == 5); + queue.pop(); + CHECK(queue.size() == 1); + + front = queue.front(); + REQUIRE(front != nullptr); + REQUIRE(*front == 15); + CHECK_FALSE(queue.empty()); + CHECK(queue.size() == 1); + CHECK(queue.capacity() == 5); + queue.pop(); + CHECK(queue.empty()); + + front = queue.front(); + REQUIRE(front == nullptr); +#ifdef SPECTRE_DEBUG + CHECK_THROWS_WITH(queue.pop(), + Catch::Matchers::ContainsSubstring( + "Can't pop an element from an empty queue.")); + +#endif // SPECTRE_DEBUG +} diff --git a/tools/FileTestDefs.sh b/tools/FileTestDefs.sh index a55bcfc9918d..eb41b2b27e49 100755 --- a/tools/FileTestDefs.sh +++ b/tools/FileTestDefs.sh @@ -619,7 +619,9 @@ standard_checks+=(enable_if) # Check for noexcept noexcept() { - is_c++ "$1" && \ + whitelist "$1" \ + 'src/Parallel/StaticSpscQueue.hpp' && \ + is_c++ "$1" && \ staged_grep -q 'noexcept ' "$1" } noexcept_report() {