Skip to content

Commit

Permalink
Merge pull request #5659 from nilsdeppe/add_threading_2
Browse files Browse the repository at this point in the history
Add a static single-producer single-consumer queue
  • Loading branch information
kidder authored Dec 8, 2023
2 parents 9c0957e + 2f52e9b commit c16e7aa
Show file tree
Hide file tree
Showing 5 changed files with 378 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/Parallel/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ spectre_target_headers(
ReductionDeclare.hpp
ResourceInfo.hpp
Section.hpp
StaticSpscQueue.hpp
TypeTraits.hpp
)

Expand Down
266 changes: 266 additions & 0 deletions src/Parallel/StaticSpscQueue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
// Distributed under the MIT License.
// See LICENSE.txt for details.

/*
*The original code is distributed under the following copyright and license:
*
* Copyright (c) 2020 Erik Rigtorp <erik@rigtorp.se>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*
* SXS Modifications:
* 1. Casing to match SpECTRE conventions
* 2. Static capacity
* 3. Storage is std::array
* 4. Switch to west-const
*
*/

#pragma once

#include <atomic>
#include <cassert>
#include <cstddef>
#include <new> // Placement new
#include <stdexcept>
#include <type_traits>

#include "Utilities/ErrorHandling/Assert.hpp"
#include "Utilities/Requires.hpp"

namespace Parallel {
/*!
* \brief A static capacity runtime-sized single-producer single-consumer
* lockfree queue.
*
* As long as only one thread reads and writes simultaneously the queue is
* threadsafe. Which threads read and write can change throughout program
* execution, the important thing is that there is no instance during the
* execution where more than one thread tries to read and where more than one
* thread tries to write.
*
* \note This class is intentionally not serializable since handling
* threadsafety around serialization requires careful thought of the individual
* circumstances.
*/
template <typename T, size_t Capacity>
class StaticSpscQueue {
private:
#ifdef __cpp_lib_hardware_interference_size
static constexpr size_t cache_line_size_ =
std::hardware_destructive_interference_size;
#else
static constexpr size_t cache_line_size_ = 64;
#endif

// Padding to avoid false sharing between slots_ and adjacent allocations
static constexpr size_t padding_ = (cache_line_size_ - 1) / sizeof(T) + 1;

public:
StaticSpscQueue() = default;
~StaticSpscQueue() {
// Destruct objects in the buffer.
while (front()) {
pop();
}
}

StaticSpscQueue(const StaticSpscQueue&) = delete;
StaticSpscQueue& operator=(const StaticSpscQueue&) = delete;
StaticSpscQueue(StaticSpscQueue&&) = delete;
StaticSpscQueue& operator=(StaticSpscQueue&&) = delete;

/// Construct a new element at the end of the queue in place.
///
/// Uses placement new for in-place construction.
///
/// \warning This may overwrite existing elements if `capacity()` is
/// exceeded without warning.
template <typename... Args>
void emplace(Args&&... args) noexcept(
std::is_nothrow_constructible_v<T, Args&&...>) {
static_assert(std::is_constructible_v<T, Args&&...>,
"T must be constructible with Args&&...");
const auto write_index = write_index_.load(std::memory_order_relaxed);
auto next_write_index = write_index + 1;
if (next_write_index == capacity_) {
next_write_index = 0;
}
while (next_write_index == read_index_cache_) {
read_index_cache_ = read_index_.load(std::memory_order_acquire);
}
new (&data_[write_index + padding_]) T(std::forward<Args>(args)...);
write_index_.store(next_write_index, std::memory_order_release);
}

/// Construct a new element at the end of the queue in place.
///
/// Uses placement new for in-place construction.
///
/// Returns `true` if the emplacement succeeded and `false` if it did
/// not. If it failed then the queue is currently full.
template <typename... Args>
[[nodiscard]] bool try_emplace(Args&&... args) noexcept(
std::is_nothrow_constructible_v<T, Args&&...>) {
static_assert(std::is_constructible_v<T, Args&&...>,
"T must be constructible with Args&&...");
const auto write_index = write_index_.load(std::memory_order_relaxed);
auto next_write_index = write_index + 1;
if (next_write_index == capacity_) {
next_write_index = 0;
}
if (next_write_index == read_index_cache_) {
read_index_cache_ = read_index_.load(std::memory_order_acquire);
if (next_write_index == read_index_cache_) {
return false;
}
}
new (&data_[write_index + padding_]) T(std::forward<Args>(args)...);
write_index_.store(next_write_index, std::memory_order_release);
return true;
}

/// Push a new element to the end of the queue.
///
/// Uses `emplace()` internally.
///
/// \warning This may overwrite existing elements if `capacity()` is
/// exceeded without warning.
void push(const T& v) noexcept(std::is_nothrow_copy_constructible_v<T>) {
static_assert(std::is_copy_constructible_v<T>,
"T must be copy constructible");
emplace(v);
}

/// Push a new element to the end of the queue.
///
/// Uses `emplace()` internally.
///
/// \warning This may overwrite existing elements if `capacity()` is
/// exceeded without warning.
template <typename P, Requires<std::is_constructible_v<T, P&&>> = nullptr>
void push(P&& v) noexcept(std::is_nothrow_constructible_v<T, P&&>) {
emplace(std::forward<P>(v));
}

/// Push a new element to the end of the queue. Returns `false` if the queue
/// is at capacity and does not push the new object, otherwise returns `true`.
///
/// Uses `try_emplace()` internally.
[[nodiscard]] bool try_push(const T& v) noexcept(
std::is_nothrow_copy_constructible_v<T>) {
static_assert(std::is_copy_constructible_v<T>,
"T must be copy constructible");
return try_emplace(v);
}

/// Push a new element to the end of the queue. Returns `false` if the queue
/// is at capacity and does not push the new object, otherwise returns `true`.
///
/// Uses `try_emplace()` internally.
template <typename P, Requires<std::is_constructible_v<T, P&&>> = nullptr>
[[nodiscard]] bool try_push(P&& v) noexcept(
std::is_nothrow_constructible_v<T, P&&>) {
return try_emplace(std::forward<P>(v));
}

/// Returns the first element from the queue.
///
/// \note Returns `nullptr` if the queue is empty.
[[nodiscard]] T* front() noexcept {
const auto read_index = read_index_.load(std::memory_order_relaxed);
if (read_index == write_index_cache_) {
write_index_cache_ = write_index_.load(std::memory_order_acquire);
if (write_index_cache_ == read_index) {
return nullptr;
}
}
return &data_[read_index + padding_];
}

/// Removes the first element from the queue.
void pop() {
static_assert(std::is_nothrow_destructible_v<T>,
"T must be nothrow destructible");
const auto read_index = read_index_.load(std::memory_order_relaxed);
#ifdef SPECTRE_DEBUG
const auto write_index = write_index_.load(std::memory_order_acquire);
ASSERT(write_index != read_index,
"Can't pop an element from an empty queue. read_index: "
<< read_index << " write_index " << write_index);
#endif // SPECTRE_DEBUG
data_[read_index + padding_].~T();
auto next_read_index = read_index + 1;
if (next_read_index == capacity_) {
next_read_index = 0;
}
if (read_index == write_index_cache_) {
write_index_cache_ = next_read_index;
}
read_index_.store(next_read_index, std::memory_order_release);
}

/// Returns the size of the queue at a particular hardware state.
///
/// Note that while this can be checked in a threadsafe manner, it is up to
/// the user to guarantee that another thread does not change the queue
/// between when `size()` is called and how the result is used.
[[nodiscard]] size_t size() const noexcept {
std::ptrdiff_t diff = static_cast<std::ptrdiff_t>(
write_index_.load(std::memory_order_acquire)) -
static_cast<std::ptrdiff_t>(
read_index_.load(std::memory_order_acquire));
if (diff < 0) {
diff += static_cast<std::ptrdiff_t>(capacity_);
}
return static_cast<size_t>(diff);
}

/// Returns `true` if the queue may be empty, otherwise `false`.
///
/// Note that while this can be checked in a threadsafe manner, it is up to
/// the user to guarantee that another thread does not change the queue
/// between when `empty()` is called and how the result is used.
[[nodiscard]] bool empty() const noexcept {
return write_index_.load(std::memory_order_acquire) ==
read_index_.load(std::memory_order_acquire);
}

/// Returns the capacity of the queue.
[[nodiscard]] size_t capacity() const noexcept { return capacity_ - 1; }

private:
static constexpr size_t capacity_ = Capacity + 1;
std::array<T, capacity_ + 2 * padding_> data_{};

// Align to cache line size in order to avoid false sharing
// read_index_cache_ and write_index_cache_ is used to reduce the amount of
// cache coherency traffic
alignas(cache_line_size_) std::atomic<size_t> write_index_{0};
alignas(cache_line_size_) size_t read_index_cache_{0};
alignas(cache_line_size_) std::atomic<size_t> read_index_{0};
alignas(cache_line_size_) size_t write_index_cache_{0};

// Padding to avoid adjacent allocations from sharing a cache line with
// write_index_cache_
// NOLINTNEXTLINE(modernize-avoid-c-arrays)
char padding_data_[cache_line_size_ - sizeof(write_index_cache_)]{};
};
} // namespace Parallel
1 change: 1 addition & 0 deletions tests/Unit/Parallel/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ set(LIBRARY_SOURCES
Test_ParallelComponentHelpers.cpp
Test_Phase.cpp
Test_ResourceInfo.cpp
Test_StaticSpscQueue.cpp
Test_TypeTraits.cpp
)

Expand Down
107 changes: 107 additions & 0 deletions tests/Unit/Parallel/Test_StaticSpscQueue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Distributed under the MIT License.
// See LICENSE.txt for details.

#include "Framework/TestingFramework.hpp"

#include "Parallel/StaticSpscQueue.hpp"

SPECTRE_TEST_CASE("Unit.Parallel.StaticSpscQueue", "[Unit][Parallel]") {
// We can only test basic functionality, it's difficult to test proper
// threadsafety since that requires generating a race condition.
Parallel::StaticSpscQueue<int, 5> queue{};
CHECK(queue.empty());
CHECK(queue.size() == 0); // NOLINT
CHECK(queue.capacity() == 5);
queue.emplace(3);
CHECK_FALSE(queue.empty());
CHECK(queue.size() == 1);
CHECK(queue.capacity() == 5);
queue.push(5);
CHECK_FALSE(queue.empty());
CHECK(queue.size() == 2);
CHECK(queue.capacity() == 5);
const int a = 7;
queue.push(a);
CHECK_FALSE(queue.empty());
CHECK(queue.size() == 3);
CHECK(queue.capacity() == 5);

CHECK(queue.try_emplace(11));
CHECK_FALSE(queue.empty());
CHECK(queue.size() == 4);
CHECK(queue.capacity() == 5);

CHECK(queue.try_push(15));
CHECK_FALSE(queue.empty());
CHECK(queue.size() == 5);
CHECK(queue.capacity() == 5);

CHECK_FALSE(queue.try_push(a));
CHECK_FALSE(queue.empty());
CHECK(queue.size() == 5);
CHECK(queue.capacity() == 5);

CHECK_FALSE(queue.try_push(19));
CHECK_FALSE(queue.empty());
CHECK(queue.size() == 5);
CHECK(queue.capacity() == 5);

CHECK_FALSE(queue.try_emplace(21));
CHECK_FALSE(queue.empty());
CHECK(queue.size() == 5);
CHECK(queue.capacity() == 5);

int* front = queue.front();
REQUIRE(front != nullptr);
REQUIRE(*front == 3);
CHECK_FALSE(queue.empty());
CHECK(queue.size() == 5);
CHECK(queue.capacity() == 5);
queue.pop();
CHECK(queue.size() == 4);

front = queue.front();
REQUIRE(front != nullptr);
REQUIRE(*front == 5);
CHECK_FALSE(queue.empty());
CHECK(queue.size() == 4);
CHECK(queue.capacity() == 5);
queue.pop();
CHECK(queue.size() == 3);

front = queue.front();
REQUIRE(front != nullptr);
REQUIRE(*front == 7);
CHECK_FALSE(queue.empty());
CHECK(queue.size() == 3);
CHECK(queue.capacity() == 5);
queue.pop();
CHECK(queue.size() == 2);

front = queue.front();
REQUIRE(front != nullptr);
REQUIRE(*front == 11);
CHECK_FALSE(queue.empty());
CHECK(queue.size() == 2);
CHECK(queue.capacity() == 5);
queue.pop();
CHECK(queue.size() == 1);

front = queue.front();
REQUIRE(front != nullptr);
REQUIRE(*front == 15);
CHECK_FALSE(queue.empty());
CHECK(queue.size() == 1);
CHECK(queue.capacity() == 5);
queue.pop();
CHECK(queue.empty());

front = queue.front();
REQUIRE(front == nullptr);
#ifdef SPECTRE_DEBUG
CHECK_THROWS_WITH(queue.pop(),
Catch::Matchers::ContainsSubstring(
"Can't pop an element from an empty queue."));

#endif // SPECTRE_DEBUG
}
Loading

0 comments on commit c16e7aa

Please sign in to comment.