From 151959c1d683134551c8812aee957158c4586439 Mon Sep 17 00:00:00 2001 From: Corentin B <32176761+CorentinBT@users.noreply.github.com> Date: Sun, 18 Feb 2024 20:27:52 +0100 Subject: [PATCH] Add on_error_resume_next operator (#528) --- src/benchmarks/benchmarks.cpp | 24 +++ src/rpp/rpp/operators.hpp | 9 + .../rpp/operators/on_error_resume_next.hpp | 109 ++++++++++++ src/tests/rpp/test_on_error_resume_next.cpp | 156 ++++++++++++++++++ 4 files changed, 298 insertions(+) create mode 100644 src/rpp/rpp/operators/on_error_resume_next.hpp create mode 100644 src/tests/rpp/test_on_error_resume_next.cpp diff --git a/src/benchmarks/benchmarks.cpp b/src/benchmarks/benchmarks.cpp index af3ca7490..55b0f4f62 100644 --- a/src/benchmarks/benchmarks.cpp +++ b/src/benchmarks/benchmarks.cpp @@ -581,6 +581,30 @@ int main(int argc, char* argv[]) // NOLINT(bugprone-exception-escape) } } // BENCHMARK("Aggregating Operators") + BENCHMARK("Error Handling Operators") + { + SECTION("create(on_next(1), on_error())+on_error_resume_next(immediate_just(2)))+subscribe") + { + TEST_RPP([&]() { + rpp::source::create([&](auto&& observer) { + observer.on_next(1); + observer.on_error(std::make_exception_ptr(std::runtime_error{""})); + }) + | rpp::operators::on_error_resume_next([](const std::exception_ptr&) { return rpp::immediate_just(2); }) + | rpp::operators::subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); + }); + + TEST_RXCPP([&]() { + rxcpp::observable<>::create([&](auto&& observer) { + observer.on_next(1); + observer.on_error(std::make_exception_ptr(std::runtime_error{""})); + }) + | rxcpp::operators::on_error_resume_next([](const std::exception_ptr&) { return rxcpp::immediate_just(2); }) + | rxcpp::operators::subscribe([](int v) { ankerl::nanobench::doNotOptimizeAway(v); }); + }); + } + } // BENCHMARK("Error Handling Operators") + BENCHMARK("Subjects") { SECTION("publish_subject with 1 observer - on_next") diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index 3ec4ec6ba..d95860c95 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -117,3 +117,12 @@ #include #include + +/** + * @defgroup error_handling_operators Error Handling Operators + * @brief Operators that help to recover from error notifications from an Observable + * @see https://reactivex.io/documentation/operators.html#error + * @ingroup operators + */ + +#include diff --git a/src/rpp/rpp/operators/on_error_resume_next.hpp b/src/rpp/rpp/operators/on_error_resume_next.hpp new file mode 100644 index 000000000..f6449d6c9 --- /dev/null +++ b/src/rpp/rpp/operators/on_error_resume_next.hpp @@ -0,0 +1,109 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include +#include + +namespace rpp::operators::details +{ + template + struct on_error_resume_next_observer_strategy + { + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + RPP_NO_UNIQUE_ADDRESS mutable TObserver observer; + RPP_NO_UNIQUE_ADDRESS Selector selector; + // Manually control disposable to ensure observer is not used after move in on_error emission + mutable rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make(); + + RPP_CALL_DURING_CONSTRUCTION( + { + observer.set_upstream(disposable); + }); + + template + void on_next(T&& v) const + { + observer.on_next(std::forward(v)); + } + + void on_error(const std::exception_ptr& err) const + { + disposable.dispose(); + selector(err).subscribe(std::move(observer)); + } + + void on_completed() const + { + disposable.dispose(); + observer.on_completed(); + } + + void set_upstream(const disposable_wrapper& d) + { + disposable.add(d); + } + + bool is_disposed() const { return disposable.is_disposed(); } + }; + + template + struct on_error_resume_next_t : lift_operator, Selector> + { + template + struct operator_traits + { + using selector_observable_result_type = + rpp::utils::extract_observable_type_t>; + + static_assert( + rpp::constraint::decayed_same_as, + "Selector observable result type is not the same as T"); + + using result_type = T; + + template TObserver> + using observer_strategy = on_error_resume_next_observer_strategy; + }; + + template + using updated_disposable_strategy = rpp::details::observables::atomic_dynamic_disposable_strategy_selector<1>; + }; +} // namespace rpp::operators::details + +namespace rpp::operators +{ + /** + * @brief If an error occurs, take the result from the Selector and subscribe to that instead. + * + * @marble on_error_resume_next + { + source observable : +-1-x + operator "on_error_resume_next: () => obs(-2-3-|)" : +-1-2-3-| + } + * + * @param selector callable taking a std::exception_ptr and returning observable to continue on + * + * @warning #include + * + * @ingroup error_handling_operators + * @see https://reactivex.io/documentation/operators/catch.html + */ + template + requires rpp::constraint::observable> + auto on_error_resume_next(Selector&& selector) + { + return details::on_error_resume_next_t>{std::forward(selector)}; + } +} // namespace rpp::operators \ No newline at end of file diff --git a/src/tests/rpp/test_on_error_resume_next.cpp b/src/tests/rpp/test_on_error_resume_next.cpp new file mode 100644 index 000000000..c30719665 --- /dev/null +++ b/src/tests/rpp/test_on_error_resume_next.cpp @@ -0,0 +1,156 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include + +#include +#include +#include +#include +#include + +#include "disposable_observable.hpp" +#include "mock_observer.hpp" + +TEMPLATE_TEST_CASE("on_error_resume_next switches observable on error", "", rpp::memory_model::use_stack, rpp::memory_model::use_shared) +{ + auto mock = mock_observer_strategy(); + SECTION("observable without error emission") + { + auto obs = rpp::source::just(rpp::schedulers::immediate{}, 1, 2, 3); + SECTION("subscribe") + { + obs | rpp::operators::on_error_resume_next([](const std::exception_ptr&) { return rpp::source::empty(); }) + | rpp::ops::subscribe(mock); + SECTION("observer obtains values from observable") + { + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock.get_total_on_next_count() == 3); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } + + SECTION("observable with one error emission") + { + auto obs = rpp::source::create([](const auto& sub) { + sub.on_next(1); + sub.on_next(2); + sub.on_error(std::make_exception_ptr(std::runtime_error{""})); + }); + SECTION("subscribe") + { + obs | rpp::operators::on_error_resume_next([](const std::exception_ptr&) { + return rpp::source::just(rpp::schedulers::immediate{}, 3); + }) + | rpp::ops::subscribe(mock); + SECTION("observer obtains values from both outer and inner observable") + { + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock.get_total_on_next_count() == 3); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } + + SECTION("observable with two error emissions") + { + auto obs = rpp::source::create([](const auto& sub) { + sub.on_next(1); + sub.on_next(2); + sub.on_error(std::make_exception_ptr(std::runtime_error{""})); + sub.on_error(std::make_exception_ptr(std::runtime_error{""})); + }); + SECTION("subscribe") + { + obs | rpp::operators::on_error_resume_next([](const std::exception_ptr&) { + return rpp::source::just(rpp::schedulers::immediate{}, 3); + }) + | rpp::ops::subscribe(mock); + SECTION("observer only receives values from first inner observable") + { + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock.get_total_on_next_count() == 3); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } + + SECTION("inner observable different emission type") + { + auto obs = rpp::source::create([](const auto& sub) { + sub.on_next(1); + sub.on_next(2); + sub.on_error(std::make_exception_ptr(std::runtime_error{""})); + sub.on_error(std::make_exception_ptr(std::runtime_error{""})); + }); + SECTION("subscribe") + { + obs | rpp::operators::on_error_resume_next([](const std::exception_ptr&) { + return rpp::source::just(rpp::schedulers::immediate{}, 3); + }) + | rpp::ops::subscribe(mock); + SECTION("observer only receives values from first inner observable") + { + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock.get_total_on_next_count() == 3); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } + + SECTION("nested on_error_resume_next operators") + { + auto obs = rpp::source::create([](const auto& sub) { + sub.on_next(1); + sub.on_error(std::make_exception_ptr(std::runtime_error{""})); + }); + SECTION("subscribe") + { + obs | rpp::operators::on_error_resume_next([](const std::exception_ptr&) { + return rpp::source::create([](const auto& sub) { + sub.on_next(2); + sub.on_error(std::make_exception_ptr(std::runtime_error{""})); + }) + | rpp::operators::on_error_resume_next([](const std::exception_ptr&) { + return rpp::source::create([](const auto& sub) { + sub.on_next(3); + sub.on_completed(); + }); + }); + }) + | rpp::ops::subscribe(mock); + SECTION("observer receives values without any errors") + { + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock.get_total_on_next_count() == 3); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + } + } + } +} + +TEST_CASE("on_error_resume_next satisfies disposable contracts") +{ + auto observable_disposable = rpp::composite_disposable_wrapper::make(); + { + auto observable = observable_with_disposable(observable_disposable); + + test_operator_with_disposable( + rpp::ops::on_error_resume_next([](const std::exception_ptr&) { return rpp::source::empty(); })); + } + + CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2); +} \ No newline at end of file