Skip to content

Commit

Permalink
Serialized subscriber (#273)
Browse files Browse the repository at this point in the history
  • Loading branch information
victimsnino authored Sep 11, 2022
1 parent a6db1fd commit ea672e7
Show file tree
Hide file tree
Showing 10 changed files with 192 additions and 159 deletions.
32 changes: 21 additions & 11 deletions src/rpp/rpp/operators/combine_latest.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ struct combine_latest_state : public merge_state
, combiner(combiner) {}

// don't use NO_UNIQUE_ADDRESS there due to issue in MSVC base class becomes invalid
TCombiner combiner;
/*NO_UNIQUE_ADDRESS*/ TCombiner combiner;
std::mutex values_mutex{};
std::tuple<std::optional<Types>...> values{};
};

Expand All @@ -53,9 +54,8 @@ struct combine_latest_on_next
const auto& subscriber,
const std::shared_ptr<combine_latest_state<TCombiner, Types...>>& state) const
{
// mutex need to be locked during changing of values, generating new values and
// sending of values to satisfy observable contract
std::scoped_lock lock{state->mutex};
// mutex need to be locked during changing of values, generating new values and sending of new values due to we can't update value while we are sending old one
std::scoped_lock lock{state->values_mutex};
std::get<I>(state->values) = std::forward<decltype(value)>(value);

std::apply([&](const auto&...cached_values)
Expand All @@ -70,6 +70,14 @@ struct combine_latest_on_next
using combine_latest_on_error = merge_on_error;
using combine_latest_on_completed = merge_on_completed;

template<typename TCombiner, constraint::decayed_type... Types>
struct combine_latest_state_with_serialized_mutex : combine_latest_state<TCombiner,Types...>
{
using combine_latest_state<TCombiner,Types...>::combine_latest_state;

std::mutex mutex{};
};

/**
* \brief "combine_latest" operator (an OperatorFn used by "lift").
*/
Expand All @@ -81,7 +89,6 @@ struct combine_latest_impl

private:
static constexpr size_t s_index_of_source_type = 0;
using State = combine_latest_state<TCombiner, Type, utils::extract_observable_type_t<TOtherObservable>...>;

/**
* \brief Templated helper function for subscribing to variadic 'other' observables.
Expand All @@ -93,23 +100,23 @@ struct combine_latest_impl
void subscribe_other_observables(std::index_sequence<I...>,
// Used in compile time for variadic expansion
const auto& subscriber,
const auto& state) const
const std::shared_ptr<combine_latest_state<TCombiner, Type, utils::extract_observable_type_t<TOtherObservable>...>>& state) const
{
// +1 because the first element in tuple is the current observable, and you want to subscribe to the 'other' observables.
// (Use variadic expansion to iterate the observables)
(subscribe_observable<I + 1>(std::get<I>(m_other_observables), subscriber, state), ...);
}

template<size_t I, constraint::observable TObservable>
static void subscribe_observable(const TObservable& observable, const auto& subscriber, const auto& state)
static void subscribe_observable(const TObservable& observable, const auto& subscriber, const std::shared_ptr<combine_latest_state<TCombiner, Type, utils::extract_observable_type_t<TOtherObservable>...>>& state)
{
using ValueType = utils::extract_observable_type_t<TObservable>;
observable.subscribe(create_inner_subscriber<ValueType, I>(subscriber, state));
}

template<typename ValueType, size_t I>
static auto create_inner_subscriber(auto&& subscriber,
std::shared_ptr<State> state)
std::shared_ptr<combine_latest_state<TCombiner, Type, utils::extract_observable_type_t<TOtherObservable>...>> state)
{
auto subscription = state->children_subscriptions.make_child();
return create_subscriber_with_state<ValueType>(std::move(subscription),
Expand All @@ -125,16 +132,19 @@ struct combine_latest_impl
using DownstreamType = utils::decayed_invoke_result_t<TCombiner, Type, utils::extract_observable_type_t<TOtherObservable>...>;

template<constraint::subscriber_of_type<DownstreamType> TSub>
auto operator()(TSub&& subscriber) const
auto operator()(TSub&& in_subscriber) const
{
auto state = std::make_shared<State>(m_combiner, subscriber.get_subscription());
auto state = std::make_shared<combine_latest_state_with_serialized_mutex<TCombiner, Type, utils::extract_observable_type_t<TOtherObservable>...>>(m_combiner, in_subscriber.get_subscription());
// change subscriber to serialized to avoid manual using of mutex
auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber), std::shared_ptr<std::mutex>{state, &state->mutex});

state->count_of_on_completed_needed.store(sizeof...(TOtherObservable) + 1, std::memory_order::relaxed);

// Subscribe to other observables and redirect on_next event to state
subscribe_other_observables(std::index_sequence_for<TOtherObservable...>{}, subscriber, state);

// Redirect values from this observable to the state for value composition
return create_inner_subscriber<Type, s_index_of_source_type>(std::forward<TSub>(subscriber), std::move(state));
return create_inner_subscriber<Type, s_index_of_source_type>(std::move(subscriber), std::move(state));
}
};
} // namespace rpp::details
30 changes: 20 additions & 10 deletions src/rpp/rpp/operators/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@ IMPLEMENTATION_FILE(concat_tag);
namespace rpp::details
{
template<constraint::decayed_type ValueType>
struct concat_state : early_unsubscribe_state
struct concat_state : public early_unsubscribe_state
{
concat_state(const composite_subscription& subscription_of_subscriber)
: early_unsubscribe_state{subscription_of_subscriber}
, source_subscription{children_subscriptions.make_child()} {}

std::mutex mutex{};
composite_subscription source_subscription;
std::mutex queue_mutex{};
std::queue<dynamic_observable<ValueType>> observables_to_subscribe{};
Expand All @@ -47,9 +46,10 @@ struct concat_state : early_unsubscribe_state
using concat_on_next_inner = merge_forwarding_on_next;
using concat_on_error = merge_on_error;

template<constraint::decayed_type ValueType>
struct concat_on_next_outer
{
template<constraint::decayed_type ValueType, constraint::observable TObs, constraint::subscriber TSub>
template<constraint::observable TObs, constraint::subscriber TSub>
void operator()(TObs&& new_observable,
const TSub& sub,
const std::shared_ptr<concat_state<ValueType>>& state) const
Expand All @@ -66,7 +66,6 @@ struct concat_on_next_outer
subscribe_inner_subscriber(new_observable, sub, state);
}
private:
template<constraint::decayed_type ValueType>
static void subscribe_inner_subscriber(const auto& observable,
const constraint::subscriber auto& subscriber,
const std::shared_ptr<concat_state<ValueType>>& state)
Expand Down Expand Up @@ -101,9 +100,9 @@ struct concat_on_next_outer
};


template<constraint::decayed_type ValueType>
struct concat_on_completed
{
template<constraint::decayed_type ValueType>
void operator()(const constraint::subscriber auto& sub,
const std::shared_ptr<concat_state<ValueType>>& state) const
{
Expand All @@ -113,22 +112,33 @@ struct concat_on_completed
}
};


template<constraint::decayed_type ValueType>
struct concat_state_with_serialized_mutex : concat_state<ValueType>
{
using concat_state<ValueType>::concat_state;

std::mutex mutex{};
};

template<constraint::decayed_type Type>
struct concat_impl
{
using ValueType = utils::extract_observable_type_t<Type>;

template<constraint::subscriber_of_type<ValueType> TSub>
auto operator()(TSub&& subscriber) const
auto operator()(TSub&& in_subscriber) const
{
auto state = std::make_shared<concat_state_with_serialized_mutex<ValueType>>(in_subscriber.get_subscription());

auto state = std::make_shared<concat_state<ValueType>>(subscriber.get_subscription());
// change subscriber to serialized to avoid manual using of mutex
auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber), std::shared_ptr<std::mutex>{state, &state->mutex});

return create_subscriber_with_state<Type>(state->source_subscription,
concat_on_next_outer{},
concat_on_next_outer<ValueType>{},
concat_on_error{},
concat_on_completed{},
std::forward<TSub>(subscriber),
concat_on_completed<ValueType>{},
std::move(subscriber),
std::move(state));
}
};
Expand Down
54 changes: 0 additions & 54 deletions src/rpp/rpp/operators/details/combining_utils.hpp

This file was deleted.

24 changes: 23 additions & 1 deletion src/rpp/rpp/operators/details/early_unsubscribe.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,31 @@ namespace rpp::details
{
struct early_unsubscribe_state
{
early_unsubscribe_state(const composite_subscription& subscription_of_subscriber) : children_subscriptions(subscription_of_subscriber.make_child()) {}
early_unsubscribe_state(const composite_subscription& subscription_of_subscriber)
: children_subscriptions(subscription_of_subscriber.make_child()) {}

// use this subscription as source for any child subscription that should be early unsubscribed
composite_subscription children_subscriptions;
};

struct early_unsubscribe_on_error
{
void operator()(const std::exception_ptr& err,
const constraint::subscriber auto& sub,
const std::shared_ptr<early_unsubscribe_state>& state) const
{
state->children_subscriptions.unsubscribe();
sub.on_error(err);
}
};

struct early_unsubscribe_on_completed
{
void operator()(const constraint::subscriber auto& sub,
const std::shared_ptr<early_unsubscribe_state>& state) const
{
state->children_subscriptions.unsubscribe();
sub.on_completed();
}
};
} // namespace rpp::details
65 changes: 65 additions & 0 deletions src/rpp/rpp/operators/details/serialized_subscriber.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2022 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus

#pragma once

#include <rpp/subscribers/constraints.hpp>
#include <rpp/operators/details/subscriber_with_state.hpp>
#include <rpp/subscriptions/composite_subscription.hpp>

#include <memory>
#include <mutex>

namespace rpp::details
{
struct forwarding_on_next_under_lock
{
template<typename T, typename TSerializationPrimitive>
void operator()(T&& v, const auto& subscriber, const std::shared_ptr<TSerializationPrimitive>& primitive) const
{
std::lock_guard lock{*primitive};
subscriber.on_next(std::forward<T>(v));
}
};

struct forwarding_on_error_under_lock
{
template<typename TSerializationPrimitive>
void operator()(const std::exception_ptr& err,
const auto& subscriber,
const std::shared_ptr<TSerializationPrimitive>& primitive) const
{
std::lock_guard lock{*primitive};
subscriber.on_error(err);
}
};

struct forwarding_on_completed_under_lock
{
template<typename TSerializationPrimitive>
void operator()(const auto& subscriber, const std::shared_ptr<TSerializationPrimitive>& primitive) const
{
std::lock_guard lock{*primitive};
subscriber.on_completed();
}
};

template<typename TSerializationPrimitive, constraint::subscriber TSub>
auto make_serialized_subscriber(TSub&& subscriber,
const std::shared_ptr<TSerializationPrimitive>& primitive)
{
auto sub = subscriber.get_subscription();
return create_subscriber_with_state<utils::extract_subscriber_type_t<std::decay_t<TSub>>>(std::move(sub),
forwarding_on_next_under_lock{},
forwarding_on_error_under_lock{},
forwarding_on_completed_under_lock{},
std::forward<TSub>(subscriber),
primitive);
}
} // namespace rpp::details
Loading

1 comment on commit ea672e7

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BENCHMARK RESULTS (AUTOGENERATED)

ci-ubuntu-clang

Observable construction

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Specific observable construction 0.33ns 0.402382 0.83 0.33ns
Dynamic observable construction 29.15ns 37.7809 0.77 24.49ns
Specific observable construction + as_dynamic 29.14ns 34.9965 0.83 24.47ns

Observable lift

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Specific observable lift specific observer 112.72ns 142.439 0.79 292.54ns
Specific observable lift dynamic observer 134.53ns 169.248 0.79 309.76ns
Dynamic observable lift specific observer 186.55ns 229.164 0.81 335.11ns
Dynamic observable lift dynamic observer 198.75ns 245.672 0.81 328.17ns

Observable subscribe

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Specific observable subscribe specific observer 82.74ns 92.9053 0.89 287.06ns
Specific observable subscribe dynamic observer 94.45ns 114.096 0.83 300.04ns
Dynamic observable subscribe specific observer 154.16ns 192.76 0.80 327.12ns
Dynamic observable subscribe dynamic observer 143.91ns 170.547 0.84 313.84ns

Observable subscribe #2

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Specific observable subscribe lambda 76.39ns 91.7158 0.83 289.40ns
Dynamic observable subscribe lambda 144.09ns 170.934 0.84 325.67ns
Specific observable subscribe lambda without subscription 76.71ns 92.2063 0.83 285.64ns
Dynamic observable subscribe lambda without subscription 141.97ns 167.897 0.85 326.95ns
Specific observable subscribe specific subscriber 43.11ns 54.4235 0.79 222.83ns
Dynamic observable subscribe specific subscriber 109.54ns 132.008 0.83 266.50ns
Specific observable subscribe dynamic observer 43.64ns 52.9944 0.82 242.41ns
Dynamic observable subscribe dynamic observer 96.34ns 125.914 0.77 259.72ns

Observer construction

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Specific observer construction 0.33ns 0.408298 0.82 0.34ns
Dynamic observer construction 29.13ns 35.0013 0.83 21.00ns
Specific observer construction + as_dynamic 29.25ns 35.0125 0.84 21.05ns

OnNext

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Specific observer OnNext 0.67ns 0.804535 0.83 0.67ns
Dynamic observer OnNext 1.67ns 2.64723 0.63 2.34ns

Subscriber construction

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Make subsriber 33.91ns 40.6776 0.83 64.46ns
Make copy of subscriber 16.75ns 20.1104 0.83 4.78ns
Transform subsriber to dynamic 43.58ns 53.701 0.81 26.28ns

Subscription

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
composite_subscription create 33.89ns 44.2585 0.77 52.51ns
composite_subscription add 50.00ns 61.4293 0.81 93.72ns
composite_subscription unsubscribe 44.20ns 52.4463 0.84 23.57ns

buffer

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
buffer 269.02ns 325.451 0.83 1854.14ns
sending of values from observable via buffer to subscriber 6.46ns 7.6413 0.85 27.42ns

chains creation test

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
long non-state chain creation + subscribe 270.90ns 324.995 0.83 543.72ns
long stateful chain creation + subscribe 401.26ns 480.55 0.84 797.95ns

combine_latest

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
combine_latest construction from observable via dot + subscribe 913.26ns 930.42 0.98 880.53ns
sending of values from observable via combine_latest to subscriber 36.53ns 21.6901 1.68 2.35ns

concat

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
concat 1900.22ns 2072.24 0.92 3276.05ns
concat_with 2242.61ns 2375.53 0.94 3629.28ns

distinct_until_changed

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
distinct_until_changed construction from observable via dot + subscribe 127.80ns 154.279 0.83 248.05ns
sending of values from observable via distinct_until_changed to subscriber 2.35ns 2.81519 0.83 1.34ns

first

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
first construction from observable via dot + subscribe 148.93ns 170.622 0.87 574.78ns
sending of values from observable via first to subscriber 0.67ns 0.804328 0.83 0.67ns

foundamental sources

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
empty 84.99ns 102.735 0.83 617.47ns
error 137.58ns 164.905 0.83 747.39ns
never 47.01ns 56.2781 0.84 250.72ns

from

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
from vector with int 101.43ns 121.745 0.83 650.81ns

immediate scheduler

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
no any re-schedule 0.67ns 0.805319 0.83 112.67ns
re-schedule 10 times 7.17ns 7.76451 0.92 143.61ns

just

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
just send int 88.41ns 105.314 0.84 628.24ns
just send variadic 118.98ns 142.292 0.84 738.01ns

last

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
last construction from observable via dot + subscribe 186.80ns 222.573 0.84 352.42ns
sending of values from observable via last to subscriber 2.54ns 2.92722 0.87 1.68ns

map

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
map construction from observable via dot + subscribe 89.10ns 106.821 0.83 238.21ns
sending of values from observable via map to subscriber 1.00ns 1.2059 0.83 2.35ns

merge

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
merge 1836.68ns 1985.71 0.92 3261.25ns
merge_with 2147.89ns 2292.62 0.94 3546.50ns

observe_on

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
observe_on construction from observable via dot + subscribe 580.46ns 690.921 0.84 2597.71ns
sending of values from observable via observe_on to subscriber 89.19ns 108.676 0.82 195.04ns

publish_subject callbacks

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
on_next 24.12ns 28.9387 0.83 11.11ns
on_error 0.67ns 0.807982 0.83 19.14ns
on_completed 0.67ns 0.808115 0.83 0.67ns

publish_subject routines

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
construct 198.24ns 238.29 0.83 184.26ns
get_observable 29.07ns 34.9398 0.83 50.78ns
get_subscriber 60.55ns 72.6744 0.83 13.50ns

repeat

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
repeat construction from observable via dot + subscribe 3911.56ns 4709.74 0.83 3076.55ns

scan

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
scan construction from observable via dot + subscribe 126.38ns 151.949 0.83 299.95ns
sending of values from observable via scan to subscriber 2.01ns 2.40636 0.83 2.34ns

skip

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
skip construction from observable via dot + subscribe 120.42ns 145.327 0.83 483.09ns
sending of values from observable via skip to subscriber 2.01ns 2.41187 0.83 2.01ns

switch_on_next

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
switch_on_next construction from observable via dot + subscribe 2246.27ns 2508.89 0.90 2764.94ns
sending of values from observable via switch_on_next to subscriber 575.50ns 645.174 0.89 643.97ns

take

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
take construction from observable via dot + subscribe 187.43ns 222.395 0.84 487.12ns
sending of values from observable via take to subscriber 2.35ns 2.81377 0.83 2.66ns

take_last

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
take_last construction from observable via dot + subscribe 232.58ns 276.018 0.84 541.40ns
sending of values from observable via take_last to subscriber 3.37ns 3.52272 0.96 3.49ns

take_until

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
take_until construction from observable via dot + subscribe 1064.36ns 1133.28 0.94 1173.02ns
sending of values from observable via take_until to subscriber 18.10ns 21.2883 0.85 1.79ns

trampoline scheduler

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
no any re-schedule 11.73ns 14.0596 0.83 159.44ns
re-schedule 10 times 30.54ns 36.8419 0.83 191.89ns
recursively schedule 10 times 1404.93ns 1627.73 0.86 5478.75ns

window

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
window 2019.34ns 2434.28 0.83 3177.05ns
sending of values from observable via window to subscriber 548.55ns 653.092 0.84 368.56ns

with_latest_from

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
with_latest_from construction from observable via dot + subscribe 1053.38ns 1148.04 0.92 1163.92ns
sending of values from observable via with_latest_from to subscriber 35.16ns 41.7622 0.84 3.02ns

ci-ubuntu-gcc

Observable construction

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Specific observable construction 0.39ns 0.392027 0.99 0.38ns
Dynamic observable construction 29.29ns 33.3971 0.88 26.24ns
Specific observable construction + as_dynamic 28.55ns 33.3669 0.86 25.19ns

Observable lift

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Specific observable lift specific observer 132.93ns 135.243 0.98 343.96ns
Specific observable lift dynamic observer 168.10ns 173.972 0.97 410.11ns
Dynamic observable lift specific observer 287.36ns 228.427 1.26 416.84ns
Dynamic observable lift dynamic observer 253.84ns 241.065 1.05 399.23ns

Observable subscribe

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Specific observable subscribe specific observer 90.04ns 109.333 0.82 346.74ns
Specific observable subscribe dynamic observer 112.67ns 109.909 1.03 342.12ns
Dynamic observable subscribe specific observer 157.57ns 182.123 0.87 375.85ns
Dynamic observable subscribe dynamic observer 164.45ns 167.219 0.98 359.83ns

Observable subscribe #2

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Specific observable subscribe lambda 91.44ns 94.032 0.97 330.99ns
Dynamic observable subscribe lambda 276.08ns 169.622 1.63 371.22ns
Specific observable subscribe lambda without subscription 84.19ns 93.7881 0.90 355.53ns
Dynamic observable subscribe lambda without subscription 175.45ns 171.308 1.02 432.66ns
Specific observable subscribe specific subscriber 52.38ns 57.7295 0.91 337.31ns
Dynamic observable subscribe specific subscriber 133.74ns 135.73 0.99 354.73ns
Specific observable subscribe dynamic observer 66.99ns 58.2667 1.15 329.38ns
Dynamic observable subscribe dynamic observer 100.74ns 126.628 0.80 356.65ns

Observer construction

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Specific observer construction 0.34ns 0.373755 0.91 0.40ns
Dynamic observer construction 31.49ns 35.7627 0.88 23.37ns
Specific observer construction + as_dynamic 32.38ns 34.2005 0.95 26.23ns

OnNext

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Specific observer OnNext 0.43ns 0.397011 1.08 0.37ns
Dynamic observer OnNext 2.39ns 2.38892 1.00 2.06ns

Subscriber construction

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Make subsriber 36.77ns 39.0954 0.94 76.69ns
Make copy of subscriber 22.11ns 19.3567 1.14 6.07ns
Transform subsriber to dynamic 42.32ns 50.9164 0.83 24.23ns

Subscription

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
composite_subscription create 39.24ns 39.534 0.99 56.07ns
composite_subscription add 60.85ns 49.9737 1.22 88.07ns
composite_subscription unsubscribe 45.79ns 48.3208 0.95 23.33ns

buffer

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
buffer 354.53ns 295.81 1.20 2320.91ns
sending of values from observable via buffer to subscriber 10.47ns 7.80814 1.34 34.35ns

chains creation test

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
long non-state chain creation + subscribe 349.28ns 353.34 0.99 728.22ns
long stateful chain creation + subscribe 468.82ns 901.073 0.52 1228.49ns

combine_latest

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
combine_latest construction from observable via dot + subscribe 975.97ns 979.685 1.00 1349.86ns
sending of values from observable via combine_latest to subscriber 44.89ns 20.1483 2.23 1.81ns

concat

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
concat 2222.16ns 2039.28 1.09 4726.78ns
concat_with 2715.86ns 2318.02 1.17 5163.03ns

distinct_until_changed

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
distinct_until_changed construction from observable via dot + subscribe 161.31ns 157.051 1.03 438.43ns
sending of values from observable via distinct_until_changed to subscriber 4.18ns 3.33104 1.26 1.43ns

first

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
first construction from observable via dot + subscribe 172.91ns 237.033 0.73 865.92ns
sending of values from observable via first to subscriber 0.69ns 0.772704 0.90 0.50ns

foundamental sources

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
empty 98.55ns 93.6273 1.05 808.70ns
error 156.89ns 157.052 1.00 935.21ns
never 59.21ns 58.6496 1.01 292.68ns

from

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
from vector with int 151.30ns 123.651 1.22 905.92ns

immediate scheduler

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
no any re-schedule 1.10ns 1.08544 1.01 138.98ns
re-schedule 10 times 22.10ns 21.2824 1.04 160.10ns

just

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
just send int 95.52ns 109.945 0.87 912.55ns
just send variadic 134.16ns 186.184 0.72 1460.53ns

last

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
last construction from observable via dot + subscribe 217.66ns 231.339 0.94 476.55ns
sending of values from observable via last to subscriber 3.28ns 3.49952 0.94 1.30ns

map

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
map construction from observable via dot + subscribe 126.71ns 115.557 1.10 363.59ns
sending of values from observable via map to subscriber 1.00ns 0.99361 1.00 1.69ns

merge

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
merge 2017.95ns 2173.94 0.93 4299.40ns
merge_with 2651.02ns 2376.71 1.12 4258.17ns

observe_on

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
observe_on construction from observable via dot + subscribe 655.74ns 744.153 0.88 3270.12ns
sending of values from observable via observe_on to subscriber 89.96ns 99.4019 0.91 269.14ns

publish_subject callbacks

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
on_next 25.80ns 32.7514 0.79 11.08ns
on_error 0.75ns 0.7522 0.99 18.39ns
on_completed 0.75ns 0.776605 0.97 0.75ns

publish_subject routines

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
construct 219.33ns 311.746 0.70 192.97ns
get_observable 27.73ns 35.2327 0.79 47.54ns
get_subscriber 63.47ns 61.8948 1.03 22.73ns

repeat

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
repeat construction from observable via dot + subscribe 4449.03ns 4700.18 0.95 3639.02ns

scan

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
scan construction from observable via dot + subscribe 178.28ns 157.086 1.13 440.99ns
sending of values from observable via scan to subscriber 4.25ns 3.99694 1.06 1.77ns

skip

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
skip construction from observable via dot + subscribe 156.15ns 156.273 1.00 628.58ns
sending of values from observable via skip to subscriber 3.28ns 4.28091 0.77 2.23ns

switch_on_next

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
switch_on_next construction from observable via dot + subscribe 2903.02ns 2538.19 1.14 4755.53ns
sending of values from observable via switch_on_next to subscriber 639.11ns 644.94 0.99 1338.69ns

take

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
take construction from observable via dot + subscribe 208.08ns 260.916 0.80 710.09ns
sending of values from observable via take to subscriber 4.71ns 6.50712 0.72 3.60ns

take_last

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
take_last construction from observable via dot + subscribe 252.17ns 295.475 0.85 793.38ns
sending of values from observable via take_last to subscriber 4.00ns 3.84424 1.04 6.72ns

take_until

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
take_until construction from observable via dot + subscribe 1261.97ns 1132.86 1.11 1816.42ns
sending of values from observable via take_until to subscriber 26.42ns 20.8374 1.27 2.14ns

trampoline scheduler

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
no any re-schedule 24.32ns 25.1154 0.97 193.98ns
re-schedule 10 times 55.59ns 51.4066 1.08 257.35ns
recursively schedule 10 times 1472.06ns 1618.54 0.91 7469.64ns

window

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
window 2412.69ns 2332.63 1.03 4922.60ns
sending of values from observable via window to subscriber 637.63ns 720.761 0.88 524.39ns

with_latest_from

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
with_latest_from construction from observable via dot + subscribe 1257.63ns 1133.92 1.11 1617.77ns
sending of values from observable via with_latest_from to subscriber 43.12ns 41.5899 1.04 3.82ns

ci-windows

Observable construction

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Specific observable construction 1.50ns 1.50321 1.00 0.67ns
Dynamic observable construction 80.20ns 80.619 0.99 138.07ns
Specific observable construction + as_dynamic 79.97ns 80.3292 1.00 122.53ns

Observable lift

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Specific observable lift specific observer 173.64ns 174.544 0.99 1205.23ns
Specific observable lift dynamic observer 200.86ns 204.272 0.98 1263.76ns
Dynamic observable lift specific observer 301.42ns 303.459 0.99 1415.95ns
Dynamic observable lift dynamic observer 261.22ns 262.536 0.99 1288.15ns

Observable subscribe

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Specific observable subscribe specific observer 141.77ns 138.408 1.02 1180.59ns
Specific observable subscribe dynamic observer 152.26ns 153.281 0.99 1205.59ns
Dynamic observable subscribe specific observer 254.93ns 257.7 0.99 1341.84ns
Dynamic observable subscribe dynamic observer 201.63ns 202.704 0.99 1235.81ns

Observable subscribe #2

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Specific observable subscribe lambda 139.54ns 139.07 1.00 1168.55ns
Dynamic observable subscribe lambda 255.44ns 257.388 0.99 1342.53ns
Specific observable subscribe lambda without subscription 138.76ns 138.93 1.00 1163.77ns
Dynamic observable subscribe lambda without subscription 255.61ns 256.6 1.00 1353.05ns
Specific observable subscribe specific subscriber 56.81ns 49.1712 1.16 841.03ns
Dynamic observable subscribe specific subscriber 165.04ns 164.781 1.00 1011.46ns
Specific observable subscribe dynamic observer 52.27ns 52.1803 1.00 876.37ns
Dynamic observable subscribe dynamic observer 101.20ns 101.726 0.99 907.86ns

Observer construction

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Specific observer construction 1.51ns 1.56348 0.96 1.61ns
Dynamic observer construction 81.85ns 81.887 1.00 118.02ns
Specific observer construction + as_dynamic 82.10ns 81.7636 1.00 113.65ns

OnNext

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Specific observer OnNext 0.67ns 0.668657 1.00 0.67ns
Dynamic observer OnNext 2.04ns 2.00901 1.01 2.01ns

Subscriber construction

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
Make subsriber 84.98ns 86.1781 0.99 346.92ns
Make copy of subscriber 16.71ns 16.713 1.00 31.60ns
Transform subsriber to dynamic 96.19ns 96.3561 1.00 149.30ns

Subscription

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
composite_subscription create 84.83ns 85.9094 0.99 340.84ns
composite_subscription add 71.16ns 68.3295 1.04 168.06ns
composite_subscription unsubscribe 63.75ns 63.1552 1.01 121.98ns

buffer

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
buffer 379.18ns 379.742 1.00 4428.67ns
sending of values from observable via buffer to subscriber 7.43ns 6.71659 1.11 91.72ns

chains creation test

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
long non-state chain creation + subscribe 286.24ns 298.076 0.96 1714.13ns
long stateful chain creation + subscribe 694.31ns 682.222 1.02 3399.25ns

combine_latest

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
combine_latest construction from observable via dot + subscribe 1680.36ns 1482.0 1.13 3415.12ns
sending of values from observable via combine_latest to subscriber 61.45ns 36.2511 1.70 3.84ns

concat

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
concat 2818.89ns 3045.1 0.93 10518.70ns
concat_with 3529.88ns 3295.25 1.07 13346.70ns

distinct_until_changed

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
distinct_until_changed construction from observable via dot + subscribe 202.10ns 204.904 0.99 1027.12ns
sending of values from observable via distinct_until_changed to subscriber 4.28ns 4.91462 0.87 3.62ns

first

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
first construction from observable via dot + subscribe 147.06ns 147.699 1.00 2604.00ns
sending of values from observable via first to subscriber 2.54ns 2.34642 1.08 1.77ns

foundamental sources

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
empty 82.00ns 82.4644 0.99 2382.64ns
error 138.63ns 153.864 0.90 2457.09ns
never 51.05ns 51.7255 0.99 878.46ns

from

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
from vector with int 179.40ns 157.843 1.14 2444.36ns

immediate scheduler

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
no any re-schedule 1.84ns 1.67929 1.10 405.24ns
re-schedule 10 times 97.73ns 97.8779 1.00 435.54ns

just

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
just send int 88.67ns 88.8894 1.00 2391.55ns
just send variadic 129.99ns 128.709 1.01 2444.09ns

last

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
last construction from observable via dot + subscribe 243.14ns 241.656 1.01 1433.41ns
sending of values from observable via last to subscriber 3.20ns 3.20135 1.00 3.36ns

map

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
map construction from observable via dot + subscribe 106.50ns 104.362 1.02 993.12ns
sending of values from observable via map to subscriber 4.01ns 3.73086 1.08 7.41ns

merge

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
merge 2690.10ns 2468.82 1.09 10882.00ns
merge_with 3413.50ns 3118.11 1.09 11698.30ns

observe_on

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
observe_on construction from observable via dot + subscribe 828.07ns 840.867 0.98 5620.60ns
sending of values from observable via observe_on to subscriber 85.29ns 84.8433 1.01 848.42ns

publish_subject callbacks

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
on_next 20.11ns 19.8808 1.01 33.03ns
on_error 3.05ns 2.82096 1.08 18.40ns
on_completed 2.45ns 2.36775 1.03 0.68ns

publish_subject routines

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
construct 376.79ns 362.0 1.04 591.24ns
get_observable 26.08ns 240.14 0.11 163.66ns
get_subscriber 50.17ns 50.1723 1.00 91.48ns

repeat

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
repeat construction from observable via dot + subscribe 6107.00ns 6072.4 1.01 11183.30ns

scan

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
scan construction from observable via dot + subscribe 206.97ns 206.597 1.00 1256.75ns
sending of values from observable via scan to subscriber 6.02ns 5.36625 1.12 8.88ns

skip

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
skip construction from observable via dot + subscribe 194.70ns 193.538 1.01 1555.75ns
sending of values from observable via skip to subscriber 3.73ns 3.67917 1.01 3.37ns

switch_on_next

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
switch_on_next construction from observable via dot + subscribe 3626.00ns 3418.88 1.06 11998.00ns
sending of values from observable via switch_on_next to subscriber 870.67ns 818.344 1.06 3080.00ns

take

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
take construction from observable via dot + subscribe 248.58ns 249.914 0.99 2157.50ns
sending of values from observable via take to subscriber 6.12ns 5.35744 1.14 6.13ns

take_last

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
take_last construction from observable via dot + subscribe 343.59ns 344.791 1.00 2432.18ns
sending of values from observable via take_last to subscriber 4.45ns 4.25169 1.05 20.38ns

take_until

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
take_until construction from observable via dot + subscribe 1637.94ns 3246.88 0.50 5198.20ns
sending of values from observable via take_until to subscriber 29.64ns 28.8348 1.03 4.74ns

trampoline scheduler

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
no any re-schedule 21.52ns 19.1845 1.12 606.79ns
re-schedule 10 times 121.02ns 120.981 1.00 638.90ns
recursively schedule 10 times 2628.80ns 2684.75 0.98 18624.50ns

window

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
window 2989.67ns 2958.11 1.01 9720.00ns
sending of values from observable via window to subscriber 842.81ns 836.742 1.01 1623.31ns

with_latest_from

Table
Test Name Current, ns Prev, ns Ratio RxCpp current, ns
with_latest_from construction from observable via dot + subscribe 2022.67ns 1960.38 1.03 3745.50ns
sending of values from observable via with_latest_from to subscriber 56.90ns 54.8678 1.04 6.66ns

Please sign in to comment.