Skip to content

Commit

Permalink
Document dynamic_ versions (#534)
Browse files Browse the repository at this point in the history
* Document dynamic_

* update

* fix comment
  • Loading branch information
victimsnino authored Feb 21, 2024
1 parent 9bcdcde commit 7bae02c
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 1 deletion.
46 changes: 46 additions & 0 deletions docs/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -397,3 +397,49 @@ rpp::source::just<rpp::memory_model::use_shared>(my_custom_variable);
makes only 1 copy/move to shared_ptr and then uses it instead.

As a a result, users can select preferable way of handling of their types.

## ReactivePlusPlus specific
### dynamic_* versions to keep classes as variables

Most of the classes inside rpp library including `observable`, `observer` and others are heavy-templated classes. It means, it could has a lot of template params. In most cases you shouldn't worry about it due to it is purely internal problem.

But in some cases you want to keep observable or observer inside your classes or return it from function. In most cases I strongly recommend you to use `auto` to deduce type automatically. But in some cases it is not possible (for example, to keep observable as member variable). For such an usage you could use `dynamic_observable` and `dynamic_observer`:
- they are type-erased wrappers over regular observable/observer with goal to hide all unnecessary stuff from user's code. For example, you can easily use it as:
```cpp
#include <rpp/rpp.hpp>

#include <iostream>

struct some_data
{
rpp::dynamic_observable<int> observable;
rpp::dynamic_observer<int> observer;
};

int main() {
some_data v{rpp::source::just(1,2,3),
rpp::make_lambda_observer([](int value){
std::cout << value << std::endl;
})};

v.observable.subscribe(v.observer);
}
```
- to convert observable/observer to dynamic_* version you could manually call `as_dynamic()` member function or just pass them to ctor
- actually they are similar to rxcpp's `observer<T>` and `observable<T>` but provides EXPLICIT definition of `dynamic` fact
- due to type-erasure mechanism `dynamic_` provides some minor performance penalties due to extra usage of `shared_ptr` to keep internal state + indirect calls. It is not critical in case of storing it as member function, but could be important in case of using it on hot paths like this:
```cpp
rpp::source::just(1,2,3)
| rpp::ops::map([](int v) { return rpp::source::just(v); })
| rpp::ops::flat_map([](rpp::dynamic_observable<int> observable) {
return observable | rpp::ops::filter([](int v){ return v % 2 == 0;});
});
```
^^^ while it is fully valid code, `flat_map` have to convert observable to dynamic version via extra heap, but it is unnecessary. It is better to use `auto` in this case.
```cpp
rpp::source::just(1,2,3)
| rpp::ops::map([](int v) { return rpp::source::just(v); })
| rpp::ops::flat_map([](const rpp::constraint::observable_of_type<int> auto& observable) { // or just `const auto& observable`
return observable | rpp::ops::filter([](int v){ return v % 2 == 0;});
});
```
2 changes: 2 additions & 0 deletions src/rpp/rpp/observables/observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ namespace rpp
* @warning Actually observable "doesn't emit nothing", it only **invokes Strategy!** Strategy COULD emit emissions immediately OR place observer to some queue or something like this to obtain emissions later (for example subjects)
* @warning Expected that observable's strategy would work with observer in serialized way
*
* @note In case of you are need to keep some "abstract" observable of `Type`, you can use type-erased version: `rpp::dynamic_observable`
*
* @tparam Type of value this observable would provide. Only observers of same type can be subscribed to this observable.
* @tparam Strategy used to provide logic over observable's callbacks.
*
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/observers/observer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ namespace rpp
public:
template<constraint::observer_strategy<Type> TStrategy>
requires (!std::same_as<TStrategy, rpp::details::observers::dynamic_strategy<Type>>)
explicit observer(observer<Type, TStrategy>&& other)
observer(observer<Type, TStrategy>&& other)
: details::observer_impl<Type, rpp::details::observers::dynamic_strategy<Type>, details::observers::none_disposable_strategy>{details::observers::none_disposable_strategy{}, std::move(other)}
{
}
Expand Down

1 comment on commit 7bae02c

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BENCHMARK RESULTS (AUTOGENERATED)

ci-ubuntu-gcc

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 305.40 ns 2.16 ns 2.16 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 308.15 ns 2.18 ns 2.16 ns 1.01

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 768.00 ns 0.31 ns 0.31 ns 1.00
from array of 1 - create + subscribe + current_thread 1074.59 ns 5.25 ns 5.25 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 2323.36 ns 113.31 ns 115.89 ns 0.98
defer from array of 1 - defer + create + subscribe + immediate 737.18 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 2175.84 ns 58.02 ns 57.99 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3348.50 ns 33.16 ns 32.12 ns 1.03

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1079.11 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 833.01 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 996.21 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 879.40 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+first()+subscribe 1252.45 ns 0.62 ns 0.62 ns 1.00
immediate_just(1,2)+last()+subscribe 920.81 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 1097.52 ns 17.28 ns 17.29 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 275.86 ns 2.25 ns 2.16 ns 1.04
current_thread scheduler create worker + schedule 385.00 ns 7.67 ns 7.41 ns 1.04
current_thread scheduler create worker + schedule + recursive schedule 836.95 ns 72.79 ns 64.64 ns 1.13

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 1056.15 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 951.39 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 2408.19 ns 155.83 ns 153.20 ns 1.02
immediate_just+buffer(2)+subscribe 1520.86 ns 13.59 ns 13.58 ns 1.00
immediate_just+window(2)+subscribe + subscsribe inner 2364.67 ns 1057.69 ns 1133.98 ns 0.93

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 829.68 ns - - 0.00
immediate_just+take_while(true)+subscribe 931.02 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 1950.48 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 3893.22 ns 175.02 ns 175.15 ns 1.00
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3840.41 ns 162.81 ns 164.32 ns 0.99
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 138.80 ns 138.62 ns 1.00
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 3940.47 ns 1226.89 ns 1169.27 ns 1.05
immediate_just(1) + zip(immediate_just(2)) + subscribe 2226.10 ns 222.55 ns 208.36 ns 1.07

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 34.58 ns 12.02 ns 11.72 ns 1.03

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1333.26 ns 16.35 ns 16.37 ns 1.00
basic sample with immediate scheduler 1436.05 ns 5.55 ns 5.55 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 909.74 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 1043.28 ns 127.92 ns 123.53 ns 1.04

ci-macos

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 1707.32 ns 7.61 ns 5.00 ns 1.52
Subscribe empty callbacks to empty observable via pipe operator 1777.18 ns 4.79 ns 5.06 ns 0.95

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 3086.42 ns 0.33 ns 0.26 ns 1.23
from array of 1 - create + subscribe + current_thread 3225.66 ns 39.10 ns 28.76 ns 1.36
concat_as_source of just(1 immediate) create + subscribe 6701.93 ns 437.09 ns 399.75 ns 1.09
defer from array of 1 - defer + create + subscribe + immediate 3487.00 ns 0.29 ns 0.33 ns 0.86
interval - interval + take(3) + subscribe + immediate 6186.89 ns 140.14 ns 143.57 ns 0.98
interval - interval + take(3) + subscribe + current_thread 7798.41 ns 147.39 ns 129.66 ns 1.14

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 3502.53 ns 0.29 ns 0.27 ns 1.05
immediate_just+filter(true)+subscribe 2850.55 ns 0.31 ns 0.27 ns 1.15
immediate_just(1,2)+skip(1)+subscribe 3424.12 ns 0.29 ns 0.28 ns 1.02
immediate_just(1,1,2)+distinct_until_changed()+subscribe 3189.64 ns 0.59 ns 0.63 ns 0.95
immediate_just(1,2)+first()+subscribe 3921.58 ns 0.29 ns 0.27 ns 1.05
immediate_just(1,2)+last()+subscribe 2955.90 ns 0.29 ns 0.27 ns 1.06
immediate_just+take_last(1)+subscribe 3690.46 ns 85.66 ns 80.66 ns 1.06

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 1111.65 ns 5.10 ns 5.10 ns 1.00
current_thread scheduler create worker + schedule 1554.93 ns 50.31 ns 45.52 ns 1.11
current_thread scheduler create worker + schedule + recursive schedule 2713.03 ns 295.61 ns 263.42 ns 1.12

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 2614.82 ns 0.29 ns 0.28 ns 1.02
immediate_just+scan(10, std::plus)+subscribe 2884.96 ns 0.57 ns 0.56 ns 1.03
immediate_just+flat_map(immediate_just(v*2))+subscribe 6618.75 ns 515.01 ns 509.02 ns 1.01
immediate_just+buffer(2)+subscribe 3070.55 ns 84.24 ns 83.08 ns 1.01
immediate_just+window(2)+subscribe + subscsribe inner 6542.01 ns 2836.31 ns 2651.26 ns 1.07

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 2592.61 ns - - 0.00
immediate_just+take_while(true)+subscribe 2607.05 ns 0.29 ns 0.28 ns 1.04

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 6099.73 ns 0.29 ns 0.28 ns 1.03

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 13285.60 ns 645.90 ns 546.35 ns 1.18
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 13013.85 ns 663.80 ns 536.50 ns 1.24
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 773.93 ns 541.11 ns 1.43
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 12483.28 ns 2855.61 ns 2201.15 ns 1.30
immediate_just(1) + zip(immediate_just(2)) + subscribe 6379.35 ns 1377.25 ns 916.73 ns 1.50

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 94.24 ns 60.66 ns 61.28 ns 0.99

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 3437.03 ns 124.22 ns 119.97 ns 1.04
basic sample with immediate scheduler 4562.33 ns 18.54 ns 18.00 ns 1.03

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 2979.68 ns 0.29 ns 0.28 ns 1.03

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 8167.92 ns 5082.78 ns 4875.22 ns 1.04

ci-ubuntu-clang

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 276.30 ns 1.58 ns 1.54 ns 1.03
Subscribe empty callbacks to empty observable via pipe operator 271.59 ns 1.56 ns 1.54 ns 1.01

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 561.16 ns 0.31 ns 0.31 ns 0.99
from array of 1 - create + subscribe + current_thread 802.98 ns 5.55 ns 5.56 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 1927.33 ns 113.55 ns 112.48 ns 1.01
defer from array of 1 - defer + create + subscribe + immediate 585.17 ns 0.31 ns 0.31 ns 1.00
interval - interval + take(3) + subscribe + immediate 1501.08 ns 57.08 ns 57.09 ns 1.00
interval - interval + take(3) + subscribe + current_thread 2087.31 ns 30.88 ns 30.88 ns 1.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 914.56 ns 0.31 ns 0.31 ns 1.00
immediate_just+filter(true)+subscribe 663.05 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 855.04 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 678.30 ns 0.62 ns 0.62 ns 1.00
immediate_just(1,2)+first()+subscribe 1057.50 ns 0.31 ns 0.31 ns 1.00
immediate_just(1,2)+last()+subscribe 728.03 ns 0.31 ns 0.31 ns 1.00
immediate_just+take_last(1)+subscribe 947.94 ns 0.31 ns 0.31 ns 1.00

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 197.91 ns 1.56 ns 1.54 ns 1.01
current_thread scheduler create worker + schedule 306.74 ns 5.57 ns 5.57 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 620.33 ns 58.50 ns 58.50 ns 1.00

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 668.67 ns 0.31 ns 0.31 ns 1.00
immediate_just+scan(10, std::plus)+subscribe 714.87 ns 0.31 ns 0.31 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 1784.88 ns 122.23 ns 121.86 ns 1.00
immediate_just+buffer(2)+subscribe 1358.10 ns 14.81 ns 14.51 ns 1.02
immediate_just+window(2)+subscribe + subscsribe inner 2195.82 ns 817.37 ns 814.28 ns 1.00

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 647.63 ns - - 0.00
immediate_just+take_while(true)+subscribe 648.87 ns 0.31 ns 0.31 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 1590.36 ns 0.31 ns 0.31 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 2543.78 ns 126.14 ns 127.12 ns 0.99
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 3027.58 ns 120.79 ns 120.70 ns 1.00
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 112.43 ns 112.34 ns 1.00
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 2593.04 ns 724.95 ns 729.64 ns 0.99
immediate_just(1) + zip(immediate_just(2)) + subscribe 1801.15 ns 177.20 ns 175.98 ns 1.01

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 22.89 ns 14.18 ns 14.81 ns 0.96

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1028.48 ns 13.58 ns 13.58 ns 1.00
basic sample with immediate scheduler 1031.29 ns 5.86 ns 5.86 ns 1.00

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 736.72 ns 0.31 ns 0.31 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 861.02 ns 126.31 ns 133.45 ns 0.95

ci-windows

General

name rxcpp rpp prev rpp ratio
Subscribe empty callbacks to empty observable 589.76 ns 4.93 ns 4.94 ns 1.00
Subscribe empty callbacks to empty observable via pipe operator 605.51 ns 4.93 ns 4.94 ns 1.00

Sources

name rxcpp rpp prev rpp ratio
from array of 1 - create + subscribe + immediate 1193.53 ns 5.55 ns 5.55 ns 1.00
from array of 1 - create + subscribe + current_thread 1468.65 ns 18.50 ns 18.51 ns 1.00
concat_as_source of just(1 immediate) create + subscribe 4771.36 ns 176.36 ns 175.46 ns 1.01
defer from array of 1 - defer + create + subscribe + immediate 1235.50 ns 5.86 ns 5.86 ns 1.00
interval - interval + take(3) + subscribe + immediate 3161.23 ns 133.62 ns 133.69 ns 1.00
interval - interval + take(3) + subscribe + current_thread 3510.58 ns 58.78 ns 58.72 ns 1.00

Filtering Operators

name rxcpp rpp prev rpp ratio
immediate_just+take(1)+subscribe 1874.09 ns 12.87 ns 12.87 ns 1.00
immediate_just+filter(true)+subscribe 1711.31 ns 11.72 ns 11.71 ns 1.00
immediate_just(1,2)+skip(1)+subscribe 1809.19 ns 13.21 ns 13.21 ns 1.00
immediate_just(1,1,2)+distinct_until_changed()+subscribe 1375.38 ns 15.77 ns 15.77 ns 1.00
immediate_just(1,2)+first()+subscribe 2101.05 ns 12.96 ns 20.83 ns 0.62
immediate_just(1,2)+last()+subscribe 1496.08 ns 14.18 ns 22.17 ns 0.64
immediate_just+take_last(1)+subscribe 2071.25 ns 58.58 ns 59.02 ns 0.99

Schedulers

name rxcpp rpp prev rpp ratio
immediate scheduler create worker + schedule 504.09 ns 6.17 ns 6.18 ns 1.00
current_thread scheduler create worker + schedule 688.33 ns 18.20 ns 18.20 ns 1.00
current_thread scheduler create worker + schedule + recursive schedule 1123.09 ns 110.38 ns 108.66 ns 1.02

Transforming Operators

name rxcpp rpp prev rpp ratio
immediate_just+map(v*2)+subscribe 1342.92 ns 11.67 ns 11.11 ns 1.05
immediate_just+scan(10, std::plus)+subscribe 1455.85 ns 21.58 ns 21.58 ns 1.00
immediate_just+flat_map(immediate_just(v*2))+subscribe 3964.14 ns 227.35 ns 232.45 ns 0.98
immediate_just+buffer(2)+subscribe 2363.24 ns 57.68 ns 58.17 ns 0.99
immediate_just+window(2)+subscribe + subscsribe inner 4117.02 ns 1568.36 ns 1586.78 ns 0.99

Conditional Operators

name rxcpp rpp prev rpp ratio
immediate_just+take_while(false)+subscribe 1345.10 ns 11.45 ns 11.45 ns 1.00
immediate_just+take_while(true)+subscribe 1361.35 ns 11.71 ns 11.72 ns 1.00

Utility Operators

name rxcpp rpp prev rpp ratio
immediate_just(1)+subscribe_on(immediate)+subscribe 4218.88 ns 8.02 ns 8.02 ns 1.00

Combining Operators

name rxcpp rpp prev rpp ratio
immediate_just(immediate_just(1), immediate_just(1)) + merge() + subscribe 5253.77 ns 252.18 ns 247.07 ns 1.02
immediate_just(1) + merge_with(immediate_just(2)) + subscribe 6464.67 ns 239.40 ns 233.09 ns 1.03
immediate_just(1) + with_latest_from(immediate_just(2)) + subscribe - 244.57 ns 236.08 ns 1.04
immediate_just(immediate_just(1),immediate_just(1)) + switch_on_next() + subscribe 6309.89 ns 964.21 ns 962.35 ns 1.00
immediate_just(1) + zip(immediate_just(2)) + subscribe 3934.71 ns 545.31 ns 545.03 ns 1.00

Subjects

name rxcpp rpp prev rpp ratio
publish_subject with 1 observer - on_next 36.57 ns 25.90 ns 26.20 ns 0.99

Scenarios

name rxcpp rpp prev rpp ratio
basic sample 1887.99 ns 59.74 ns 66.96 ns 0.89
basic sample with immediate scheduler 1892.78 ns 37.00 ns 37.56 ns 0.98

Aggregating Operators

name rxcpp rpp prev rpp ratio
immediate_just+reduce(10, std::plus)+subscribe 1490.27 ns 19.95 ns 19.96 ns 1.00

Error Handling Operators

name rxcpp rpp prev rpp ratio
create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe 1995.33 ns 345.57 ns 341.80 ns 1.01

Please sign in to comment.