Skip to content

Commit

Permalink
Auto dispose disposable in case of destruction if needed (#518)
Browse files Browse the repository at this point in the history
* add auto-dispose

* handle refcount
  • Loading branch information
victimsnino authored Jan 30, 2024
1 parent a1aa575 commit a1f5d1b
Show file tree
Hide file tree
Showing 13 changed files with 51 additions and 30 deletions.
2 changes: 1 addition & 1 deletion src/rpp/rpp/disposables/callback_disposable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class callback_disposable final : public details::base_disposable
}

private:
void dispose_impl() noexcept override { std::move(m_fn)(); } // NOLINT(bugprone-exception-escape)
void base_dispose_impl(interface_disposable::Mode) noexcept override { std::move(m_fn)(); } // NOLINT(bugprone-exception-escape)

private:
Fn m_fn;
Expand Down
6 changes: 3 additions & 3 deletions src/rpp/rpp/disposables/composite_disposable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ class composite_disposable_impl : public interface_composite_disposable
return m_current_state.load(std::memory_order::seq_cst) == State::Disposed;
}

void dispose() noexcept final
void dispose_impl(interface_disposable::Mode mode) noexcept final
{
while (true)
{
State expected{State::None};
// need to acquire possible state changing from `add`
if (m_current_state.compare_exchange_strong(expected, State::Disposed, std::memory_order::seq_cst))
{
dispose_impl();
composite_dispose_impl(mode);

m_disposables.dispose();
m_disposables.clear();
Expand Down Expand Up @@ -150,7 +150,7 @@ class composite_disposable_impl : public interface_composite_disposable
}

protected:
virtual void dispose_impl() noexcept {}
virtual void composite_dispose_impl(interface_disposable::Mode) noexcept {}

private:
enum class State : uint8_t
Expand Down
7 changes: 4 additions & 3 deletions src/rpp/rpp/disposables/details/base_disposable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,16 @@ class base_disposable_impl : public BaseInterface
return m_disposed.load(std::memory_order::seq_cst);
}

void dispose() noexcept final
private:
void dispose_impl(interface_disposable::Mode mode) noexcept final
{
// just need atomicity, not guarding anything
if (m_disposed.exchange(true, std::memory_order::seq_cst) == false)
dispose_impl();
base_dispose_impl(mode);
}

protected:
virtual void dispose_impl() noexcept = 0;
virtual void base_dispose_impl(interface_disposable::Mode mode) noexcept = 0;

private:
std::atomic_bool m_disposed{};
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/disposables/disposable_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class auto_dispose_wrapper final

~auto_dispose_wrapper() noexcept
{
// static_cast<interface_disposable&>(m_data).dispose_impl(rpp::interface_disposable::Mode::Destroying);
static_cast<interface_disposable&>(m_data).dispose_impl(rpp::interface_disposable::Mode::Destroying);
}

TDisposable* get() { return &m_data; }
Expand Down
6 changes: 6 additions & 0 deletions src/rpp/rpp/disposables/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@

#include <rpp/utils/constraints.hpp>

namespace rpp::details
{
template<rpp::constraint::decayed_type TDisposable>
class auto_dispose_wrapper;
}

namespace rpp
{
struct interface_disposable;
Expand Down
14 changes: 13 additions & 1 deletion src/rpp/rpp/disposables/interface_disposable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@ struct interface_disposable
* @brief Dispose disposable and free any underlying resources and etc.
* @warning This function must be thread-safe
*/
virtual void dispose() noexcept = 0;
void dispose() noexcept { dispose_impl(Mode::Disposing); }

template<rpp::constraint::decayed_type TStrategy>
friend class rpp::details::auto_dispose_wrapper;

protected:
enum class Mode : bool
{
Disposing = 0, // someone called "dispose" method manually
Destroying = 1 // called during destruction -> not needed to clear self in other disposables and etc + not allowed to call `shared_from_this`
};

virtual void dispose_impl(Mode mode) noexcept = 0;
};
}
8 changes: 5 additions & 3 deletions src/rpp/rpp/disposables/refcount_disposable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class refcount_disposable : public rpp::details::enable_wrapper_from_this<refcou
}
}

void dispose_impl() noexcept override
void composite_dispose_impl(interface_disposable::Mode) noexcept override
{
m_refcount.store(s_disposed, std::memory_order::seq_cst);
}
Expand All @@ -70,9 +70,11 @@ class refocunt_disposable_inner final : public rpp::composite_disposable, public
refocunt_disposable_inner(disposable_wrapper_impl<refcount_disposable> state)
: m_state{std::move(state)} {}

void dispose_impl() noexcept override
void composite_dispose_impl(interface_disposable::Mode mode) noexcept override
{
m_state.remove(this->wrapper_from_this());
if (mode != interface_disposable::Mode::Destroying)
m_state.remove(this->wrapper_from_this());

if (const auto locked = m_state.lock())
locked->release();
m_state = disposable_wrapper_impl<refcount_disposable>::empty();
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/observables/blocking_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class blocking_disposble final : public base_disposable
m_cv.wait(lock, [this] { return m_completed; });
}

void dispose_impl() noexcept override
void base_dispose_impl(interface_disposable::Mode) noexcept override
{
{
std::lock_guard lock{m_mutex};
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/schedulers/new_thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class new_thread
}

private:
void dispose_impl() noexcept override
void base_dispose_impl(interface_disposable::Mode) noexcept override
{
if (!m_thread.joinable())
return;
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/schedulers/run_loop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class run_loop final
return !m_queue.is_empty() && (m_queue.top()->is_disposed() || m_queue.top()->get_timepoint() <= now);
}

void dispose_impl() noexcept override
void base_dispose_impl(interface_disposable::Mode) noexcept override
{
{
std::lock_guard lock{m_mutex};
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/subjects/details/subject_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class subject_state : public std::enable_shared_from_this<subject_state<Type>>
}

private:
void dispose_impl() noexcept override
void composite_dispose_impl(interface_disposable::Mode) noexcept override
{
exchange_observers_under_lock_if_there(disposed{});
}
Expand Down
26 changes: 13 additions & 13 deletions src/tests/rpp/test_disposables.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ struct custom_disposable : public rpp::interface_disposable

bool is_disposed() const noexcept final { return dispose_count > 1; }

void dispose() noexcept final { ++dispose_count; }
void dispose_impl(rpp::interface_disposable::Mode) noexcept final { ++dispose_count; }

size_t dispose_count{};
};
Expand Down Expand Up @@ -134,18 +134,18 @@ TEMPLATE_TEST_CASE("disposable keeps state", "", rpp::details::disposables::dyna
CHECK(other.is_disposed());
}
}
// SECTION("disposable dispose on destruction")
// {
// {
// auto other = rpp::composite_disposable_wrapper::make();
// CHECK(!other.is_disposed());
// CHECK(!d.is_disposed());
// other.add(d);
// CHECK(!other.is_disposed());
// CHECK(!d.is_disposed());
// }
// CHECK(d.is_disposed());
// }
SECTION("disposable dispose on destruction")
{
{
auto other = rpp::composite_disposable_wrapper::make();
CHECK(!other.is_disposed());
CHECK(!d.is_disposed());
other.add(d);
CHECK(!other.is_disposed());
CHECK(!d.is_disposed());
}
CHECK(d.is_disposed());
}

SECTION("add callback_disposable")
{
Expand Down
2 changes: 1 addition & 1 deletion src/tests/utils/test_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class test_scheduler final
}
}

void dispose_impl() noexcept override {}
void base_dispose_impl(interface_disposable::Mode) noexcept override {}

std::vector<rpp::schedulers::time_point> schedulings{};
std::vector<rpp::schedulers::time_point> executions{};
Expand Down

1 comment on commit a1f5d1b

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BENCHMARK RESULTS (AUTOGENERATED)

ci-ubuntu-gcc

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 333.48 ns 5.36 ns 2.16 ns 2.48
Subscribe empty callbacks to empty observable via pipe operator 307.65 ns 2.22 ns 2.16 ns 1.03

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 753.63 ns 0.31 ns 0.31 ns 1.00
from array of 1 - create + subscribe + current_thread 1050.65 ns 5.34 ns 4.63 ns 1.15
concat_as_source of just(1 immediate) create + subscribe 2269.55 ns 129.74 ns 106.64 ns 1.22
defer from array of 1 - defer + create + subscribe + immediate 749.65 ns 0.31 ns 0.31 ns 1.01
interval - interval + take(3) + subscribe + immediate 2119.38 ns 57.97 ns 57.98 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3068.44 ns 32.58 ns 32.10 ns 1.02

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1144.17 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 903.79 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1018.87 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 864.43 ns 0.31 ns 0.31 ns 1.01
immediate_just(1,2)+first()+subscribe 1306.27 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 915.03 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1190.81 ns 18.52 ns 17.91 ns 1.03

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 264.66 ns 2.16 ns 2.16 ns 1.00
current_thread scheduler create worker + schedule 366.62 ns 7.15 ns 6.18 ns 1.16
current_thread scheduler create worker + schedule + recursive schedule 810.26 ns 63.91 ns 63.69 ns 1.00

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 886.64 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 914.10 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2360.00 ns 165.73 ns 148.75 ns 1.11
immediate_just+buffer(2)+subscribe 1546.03 ns 14.19 ns 14.20 ns 1.00
immediate_just+window(2)+subscribe + subscsribe inner 2367.04 ns 1029.60 ns 951.30 ns 1.08

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 843.50 ns - - 0.00
immediate_just+take_while(true)+subscribe 846.91 ns 0.31 ns 0.31 ns 1.01

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 1964.80 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3393.67 ns 180.66 ns 164.65 ns 1.10
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 4174.17 ns 176.60 ns 148.56 ns 1.19
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 165.45 ns 128.58 ns 1.29
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3546.83 ns 1032.65 ns 1015.30 ns 1.02

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 34.56 ns 12.02 ns 11.72 ns 1.03

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1460.47 ns 14.37 ns 14.81 ns 0.97
basic sample with immediate scheduler 1450.93 ns 5.55 ns 5.55 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 923.35 ns 0.31 ns 0.31 ns 1.01

ci-macos

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 1070.83 ns 5.39 ns 4.42 ns 1.22
Subscribe empty callbacks to empty observable via pipe operator 1070.13 ns 5.38 ns 4.93 ns 1.09

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 2508.69 ns 0.44 ns 0.27 ns 1.61
from array of 1 - create + subscribe + current_thread 2961.61 ns 8.89 ns 29.57 ns 0.30
concat_as_source of just(1 immediate) create + subscribe 7031.88 ns 386.07 ns 375.69 ns 1.03
defer from array of 1 - defer + create + subscribe + immediate 2552.79 ns 0.28 ns 0.30 ns 0.94
interval - interval + take(3) + subscribe + immediate 6636.35 ns 90.66 ns 142.62 ns 0.64
interval - interval + take(3) + subscribe + current_thread 7743.43 ns 59.20 ns 127.01 ns 0.47

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 3575.58 ns 0.28 ns 0.29 ns 0.96
immediate_just+filter(true)+subscribe 2467.77 ns 0.28 ns 0.28 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 3293.40 ns 0.28 ns 0.28 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 2444.93 ns 0.54 ns 0.55 ns 1.00
immediate_just(1,2)+first()+subscribe 3860.23 ns 0.28 ns 0.27 ns 1.01
immediate_just(1,2)+last()+subscribe 2867.23 ns 0.28 ns 0.28 ns 1.00
immediate_just+take_last(1)+subscribe 3848.25 ns 71.50 ns 83.22 ns 0.86

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 1024.72 ns 5.73 ns 5.44 ns 1.05
current_thread scheduler create worker + schedule 1308.68 ns 15.00 ns 43.14 ns 0.35
current_thread scheduler create worker + schedule + recursive schedule 2351.56 ns 157.06 ns 264.99 ns 0.59

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 2692.84 ns 0.27 ns 0.27 ns 0.99
immediate_just+scan(10, std::plus)+subscribe 3032.58 ns 0.56 ns 0.56 ns 0.99
immediate_just+flat_map(immediate_just(v*2))+subscribe 6865.28 ns 463.12 ns 499.37 ns 0.93
immediate_just+buffer(2)+subscribe 3007.12 ns 69.10 ns 82.35 ns 0.84
immediate_just+window(2)+subscribe + subscsribe inner 6670.98 ns 2637.71 ns 2642.27 ns 1.00

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 2713.39 ns - - 0.00
immediate_just+take_while(true)+subscribe 2729.01 ns 0.28 ns 0.28 ns 1.01

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 5937.28 ns 0.28 ns 0.27 ns 1.02

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 9817.57 ns 519.60 ns 521.70 ns 1.00
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 10549.04 ns 524.14 ns 550.83 ns 0.95
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 516.38 ns 568.09 ns 0.91
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 10525.20 ns 2346.19 ns 2434.04 ns 0.96

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 96.81 ns 60.46 ns 57.42 ns 1.05

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 3387.29 ns 44.50 ns 124.90 ns 0.36
basic sample with immediate scheduler 3288.73 ns 5.69 ns 15.90 ns 0.36

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 2864.55 ns 0.28 ns 0.27 ns 1.04

ci-ubuntu-clang

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 266.90 ns 0.88 ns 0.88 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 265.20 ns 0.88 ns 0.88 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 567.20 ns 0.31 ns 0.31 ns 1.00
from array of 1 - create + subscribe + current_thread 805.76 ns 5.55 ns 5.55 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 1941.08 ns 113.13 ns 109.35 ns 1.03
defer from array of 1 - defer + create + subscribe + immediate 600.05 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 1548.44 ns 57.03 ns 57.24 ns 1.00
interval - interval + take(3) + subscribe + current_thread 2130.02 ns 30.86 ns 30.86 ns 1.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 971.21 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 693.94 ns 0.31 ns 0.31 ns 0.99
immediate_just(1,2)+skip(1)+subscribe 867.67 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 706.46 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+first()+subscribe 1102.07 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 765.41 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 973.91 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 198.12 ns 0.88 ns 0.89 ns 0.99
current_thread scheduler create worker + schedule 316.19 ns 5.57 ns 5.60 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 639.04 ns 60.46 ns 60.77 ns 0.99

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 699.01 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 724.68 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 1803.44 ns 118.96 ns 118.04 ns 1.01
immediate_just+buffer(2)+subscribe 1382.87 ns 15.12 ns 13.58 ns 1.11
immediate_just+window(2)+subscribe + subscsribe inner 2162.10 ns 762.89 ns 745.91 ns 1.02

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 675.02 ns - - 0.00
immediate_just+take_while(true)+subscribe 687.73 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 1602.10 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 2528.03 ns 125.15 ns 123.78 ns 1.01
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3039.15 ns 120.94 ns 120.15 ns 1.01
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 112.92 ns 109.86 ns 1.03
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 2702.87 ns 723.38 ns 723.82 ns 1.00

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 26.32 ns 12.95 ns 13.88 ns 0.93

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1029.45 ns 14.19 ns 13.59 ns 1.04
basic sample with immediate scheduler 1042.23 ns 6.17 ns 6.17 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 757.95 ns 0.31 ns 0.31 ns 1.00

ci-windows

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 573.53 ns 4.01 ns 4.02 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 585.87 ns 4.01 ns 4.01 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1176.46 ns 4.93 ns 5.24 ns 0.94
from array of 1 - create + subscribe + current_thread 1448.92 ns 19.75 ns 20.38 ns 0.97
concat_as_source of just(1 immediate) create + subscribe 4743.87 ns 171.84 ns 177.31 ns 0.97
defer from array of 1 - defer + create + subscribe + immediate 1221.36 ns 4.93 ns 5.24 ns 0.94
interval - interval + take(3) + subscribe + immediate 3182.58 ns 129.44 ns 129.57 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3515.92 ns 60.75 ns 60.75 ns 1.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1872.03 ns 12.87 ns 12.83 ns 1.00
immediate_just+filter(true)+subscribe 1338.12 ns 12.40 ns 12.35 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1809.75 ns 13.07 ns 13.11 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 1679.15 ns 15.94 ns 15.94 ns 1.00
immediate_just(1,2)+first()+subscribe 2443.95 ns 12.95 ns 12.98 ns 1.00
immediate_just(1,2)+last()+subscribe 1503.85 ns 14.10 ns 14.12 ns 1.00
immediate_just+take_last(1)+subscribe 2054.98 ns 59.37 ns 59.27 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 489.99 ns 7.29 ns 7.40 ns 0.98
current_thread scheduler create worker + schedule 668.15 ns 18.20 ns 16.92 ns 1.08
current_thread scheduler create worker + schedule + recursive schedule 1105.76 ns 111.02 ns 109.11 ns 1.02

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 1335.35 ns 12.34 ns 12.32 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 1444.85 ns 21.27 ns 21.27 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 3576.66 ns 231.19 ns 228.89 ns 1.01
immediate_just+buffer(2)+subscribe 2727.25 ns 61.10 ns 60.65 ns 1.01
immediate_just+window(2)+subscribe + subscsribe inner 4095.60 ns 1145.57 ns 1123.25 ns 1.02

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 1599.86 ns 11.47 ns 11.47 ns 1.00
immediate_just+take_while(true)+subscribe 1338.46 ns 12.37 ns 12.42 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 3186.67 ns 7.40 ns 7.40 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 5296.55 ns 255.12 ns 236.62 ns 1.08
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 6605.13 ns 247.20 ns 234.63 ns 1.05
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 231.73 ns 221.59 ns 1.05
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 6393.30 ns 965.34 ns 930.46 ns 1.04

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 36.65 ns 25.59 ns 26.57 ns 0.96

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1906.85 ns 59.40 ns 59.48 ns 1.00
basic sample with immediate scheduler 1898.70 ns 38.25 ns 34.86 ns 1.10

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 1807.56 ns 19.98 ns 19.98 ns 1.00

Please sign in to comment.