diff --git a/src/benchmarks/rpp_benchmark.cpp b/src/benchmarks/rpp_benchmark.cpp index 001c40168..40e604426 100644 --- a/src/benchmarks/rpp_benchmark.cpp +++ b/src/benchmarks/rpp_benchmark.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -1044,3 +1045,36 @@ TEST_CASE("trampoline scheduler") }); }; } + +TEST_CASE("single-threaded locks") +{ + int target{}; + BENCHMARK("no-lock increment", i) + { + target += i; + return target; + }; + target = 0; + std::mutex mutex{}; + + BENCHMARK("mutex lock increment", i) + { + { + std::lock_guard lock{mutex}; + target += i; + } + return target; + }; + + target = 0; + rpp::utils::spinlock spinlock{}; + + BENCHMARK("spin-lock increment", i) + { + { + std::lock_guard lock{spinlock}; + target += i; + } + return target; + }; +} \ No newline at end of file diff --git a/src/rpp/rpp/operators/combine_latest.hpp b/src/rpp/rpp/operators/combine_latest.hpp index 45017f713..4d96c9688 100644 --- a/src/rpp/rpp/operators/combine_latest.hpp +++ b/src/rpp/rpp/operators/combine_latest.hpp @@ -19,7 +19,7 @@ #include #include #include - +#include #include // create_subscriber_with_state @@ -71,11 +71,12 @@ using combine_latest_on_error = merge_on_error; using combine_latest_on_completed = merge_on_completed; template -struct combine_latest_state_with_serialized_mutex : combine_latest_state +struct combine_latest_state_with_serialized_spinlock : combine_latest_state { using combine_latest_state::combine_latest_state; - std::mutex mutex{}; + // we can use spinlock there because 99.9% of time only one ever thread would send values from on_next serialized (due to values_mutex), but we have small probability to get error from another observable immediately + utils::spinlock spinlock{}; }; /** @@ -134,9 +135,9 @@ struct combine_latest_impl template TSub> auto operator()(TSub&& in_subscriber) const { - auto state = std::make_shared...>>(m_combiner, in_subscriber.get_subscription()); + auto state = std::make_shared...>>(m_combiner, in_subscriber.get_subscription()); // change subscriber to serialized to avoid manual using of mutex - auto subscriber = make_serialized_subscriber(std::forward(in_subscriber), std::shared_ptr{state, &state->mutex}); + auto subscriber = make_serialized_subscriber(std::forward(in_subscriber), std::shared_ptr{state, &state->spinlock}); state->count_of_on_completed_needed.store(sizeof...(TOtherObservable) + 1, std::memory_order::relaxed); diff --git a/src/rpp/rpp/operators/concat.hpp b/src/rpp/rpp/operators/concat.hpp index 26a3b421a..377d85da4 100644 --- a/src/rpp/rpp/operators/concat.hpp +++ b/src/rpp/rpp/operators/concat.hpp @@ -20,6 +20,8 @@ #include #include +#include + #include #include @@ -114,11 +116,12 @@ struct concat_on_completed template -struct concat_state_with_serialized_mutex : concat_state +struct concat_state_with_serialized_spinlock : concat_state { using concat_state::concat_state; - std::mutex mutex{}; + // we can use spinlock there because 99.9% of time only one ever thread would send values from on_next (only one active observable), but we have small probability to get error from main observable immediately + utils::spinlock spinlock{}; }; template @@ -129,10 +132,10 @@ struct concat_impl template TSub> auto operator()(TSub&& in_subscriber) const { - auto state = std::make_shared>(in_subscriber.get_subscription()); + auto state = std::make_shared>(in_subscriber.get_subscription()); // change subscriber to serialized to avoid manual using of mutex - auto subscriber = make_serialized_subscriber(std::forward(in_subscriber), std::shared_ptr{state, &state->mutex}); + auto subscriber = make_serialized_subscriber(std::forward(in_subscriber), std::shared_ptr{state, &state->spinlock}); return create_subscriber_with_state(state->source_subscription, concat_on_next_outer{}, diff --git a/src/rpp/rpp/operators/switch_on_next.hpp b/src/rpp/rpp/operators/switch_on_next.hpp index c172ed025..2c9799234 100644 --- a/src/rpp/rpp/operators/switch_on_next.hpp +++ b/src/rpp/rpp/operators/switch_on_next.hpp @@ -16,6 +16,8 @@ #include #include #include +#include + #include #include @@ -75,11 +77,12 @@ struct switch_on_next_on_next using switch_on_next_on_completed_outer = merge_on_completed; -struct switch_on_next_state_with_serialized_mutex : switch_on_next_state +struct switch_on_next_state_with_serialized_spinlock : switch_on_next_state { using switch_on_next_state::switch_on_next_state; - std::mutex mutex{}; + // we can use spinlock there because 99.9% of time only one ever thread would send values from on_next (only one active observable), but we have small probability to get error from main observable immediately + utils::spinlock spinlock{}; }; template @@ -90,10 +93,10 @@ struct switch_on_next_impl template TSub> auto operator()(TSub&& in_subscriber) const { - auto state = std::make_shared(in_subscriber.get_subscription()); + auto state = std::make_shared(in_subscriber.get_subscription()); // change subscriber to serialized to avoid manual using of mutex - auto subscriber = make_serialized_subscriber(std::forward(in_subscriber), std::shared_ptr{state, &state->mutex}); + auto subscriber = make_serialized_subscriber(std::forward(in_subscriber), std::shared_ptr{state, &state->spinlock}); state->count_of_on_completed_needed.fetch_add(1, std::memory_order::relaxed); diff --git a/src/rpp/rpp/operators/take_until.hpp b/src/rpp/rpp/operators/take_until.hpp index bf88eb3e0..aff5050da 100644 --- a/src/rpp/rpp/operators/take_until.hpp +++ b/src/rpp/rpp/operators/take_until.hpp @@ -16,6 +16,7 @@ #include #include #include +#include #include // create_subscriber_with_state @@ -48,11 +49,12 @@ struct take_until_throttler_on_next using take_until_throttler_on_error = take_until_on_error; using take_until_throttler_on_completed = take_until_on_completed; -struct take_until_state_with_serialized_mutex : take_until_state +struct take_until_state_with_serialized_spinlock : take_until_state { using take_until_state::take_until_state; - std::mutex mutex{}; + // we can use spinlock there because 99.9% of time only one ever thread would send values from on_next (main observable), but we have small probability to get error from "until observable" immediately + utils::spinlock spinlock{}; }; /** * \brief "combine_latest" operator (an OperatorFn used by "lift"). @@ -67,9 +69,9 @@ struct take_until_impl template TSub> auto operator()(TSub&& in_subscriber) const { - auto state = std::make_shared(in_subscriber.get_subscription()); + auto state = std::make_shared(in_subscriber.get_subscription()); // change subscriber to serialized to avoid manual using of mutex - auto subscriber = make_serialized_subscriber(std::forward(in_subscriber), std::shared_ptr{state, &state->mutex}); + auto subscriber = make_serialized_subscriber(std::forward(in_subscriber), std::shared_ptr{state, &state->spinlock}); // Subscribe to trigger observable m_until_observable.subscribe(create_subscriber_with_state(state->children_subscriptions.make_child(), diff --git a/src/rpp/rpp/operators/with_latest_from.hpp b/src/rpp/rpp/operators/with_latest_from.hpp index dbf03a4b6..8099405c9 100644 --- a/src/rpp/rpp/operators/with_latest_from.hpp +++ b/src/rpp/rpp/operators/with_latest_from.hpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include @@ -105,11 +106,12 @@ struct with_latest_from_on_next_outer }; template -struct with_latest_from_state_with_serialized_mutex : public with_latest_from_state +struct with_latest_from_state_with_serialized_spinlock : public with_latest_from_state { using with_latest_from_state::with_latest_from_state; - std::mutex mutex{}; + // we can use spinlock there because 99.9% of time only one ever thread would send values from on_next (main observable), but we have small probability to get error from inner observables immediately + utils::spinlock spinlock{}; }; template @@ -124,9 +126,9 @@ struct with_latest_from_impl template TSub> auto operator()(TSub&& in_subscriber) const { - auto state = std::make_shared...>>(selector, in_subscriber.get_subscription()); + auto state = std::make_shared...>>(selector, in_subscriber.get_subscription()); // change subscriber to serialized to avoid manual using of mutex - auto subscriber = make_serialized_subscriber(std::forward(in_subscriber), std::shared_ptr{state, &state->mutex}); + auto subscriber = make_serialized_subscriber(std::forward(in_subscriber), std::shared_ptr{state, &state->spinlock}); with_latest_from_subscribe_observables(std::index_sequence_for{}, state, diff --git a/src/rpp/rpp/utils/spinlock.hpp b/src/rpp/rpp/utils/spinlock.hpp new file mode 100644 index 000000000..f94876e8c --- /dev/null +++ b/src/rpp/rpp/utils/spinlock.hpp @@ -0,0 +1,36 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2022 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus + +#pragma once +#include + +namespace rpp::utils +{ +class spinlock +{ +public: + spinlock() = default; + + void lock() + { + while(m_lock_flag.exchange(true, std::memory_order_acquire)) + { + while(m_lock_flag.load(std::memory_order_relaxed)){}; + } + } + + void unlock() + { + m_lock_flag.store(false, std::memory_order_release); + } + +private: + std::atomic_bool m_lock_flag{false}; +}; +} // namespace rpp::utils