-
Notifications
You must be signed in to change notification settings - Fork 24
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add on_error_resume_next operator (#528)
- Loading branch information
1 parent
09b74b8
commit 151959c
Showing
4 changed files
with
298 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
// ReactivePlusPlus library | ||
// | ||
// Copyright Aleksey Loginov 2023 - present. | ||
// Distributed under the Boost Software License, Version 1.0. | ||
// (See accompanying file LICENSE_1_0.txt or copy at | ||
// https://www.boost.org/LICENSE_1_0.txt) | ||
// | ||
// Project home: https://github.com/victimsnino/ReactivePlusPlus | ||
// | ||
|
||
#pragma once | ||
|
||
#include <rpp/operators/fwd.hpp> | ||
|
||
#include <rpp/defs.hpp> | ||
#include <rpp/operators/details/strategy.hpp> | ||
|
||
namespace rpp::operators::details | ||
{ | ||
template<rpp::constraint::observer TObserver, rpp::constraint::decayed_type Selector> | ||
struct on_error_resume_next_observer_strategy | ||
{ | ||
using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; | ||
|
||
RPP_NO_UNIQUE_ADDRESS mutable TObserver observer; | ||
RPP_NO_UNIQUE_ADDRESS Selector selector; | ||
// Manually control disposable to ensure observer is not used after move in on_error emission | ||
mutable rpp::composite_disposable_wrapper disposable = composite_disposable_wrapper::make(); | ||
|
||
RPP_CALL_DURING_CONSTRUCTION( | ||
{ | ||
observer.set_upstream(disposable); | ||
}); | ||
|
||
template<typename T> | ||
void on_next(T&& v) const | ||
{ | ||
observer.on_next(std::forward<T>(v)); | ||
} | ||
|
||
void on_error(const std::exception_ptr& err) const | ||
{ | ||
disposable.dispose(); | ||
selector(err).subscribe(std::move(observer)); | ||
} | ||
|
||
void on_completed() const | ||
{ | ||
disposable.dispose(); | ||
observer.on_completed(); | ||
} | ||
|
||
void set_upstream(const disposable_wrapper& d) | ||
{ | ||
disposable.add(d); | ||
} | ||
|
||
bool is_disposed() const { return disposable.is_disposed(); } | ||
}; | ||
|
||
template<rpp::constraint::decayed_type Selector> | ||
struct on_error_resume_next_t : lift_operator<on_error_resume_next_t<Selector>, Selector> | ||
{ | ||
template<rpp::constraint::decayed_type T> | ||
struct operator_traits | ||
{ | ||
using selector_observable_result_type = | ||
rpp::utils::extract_observable_type_t<std::invoke_result_t<Selector, std::exception_ptr>>; | ||
|
||
static_assert( | ||
rpp::constraint::decayed_same_as<selector_observable_result_type, T>, | ||
"Selector observable result type is not the same as T"); | ||
|
||
using result_type = T; | ||
|
||
template<rpp::constraint::observer_of_type<result_type> TObserver> | ||
using observer_strategy = on_error_resume_next_observer_strategy<TObserver, Selector>; | ||
}; | ||
|
||
template<rpp::details::observables::constraint::disposable_strategy Prev> | ||
using updated_disposable_strategy = rpp::details::observables::atomic_dynamic_disposable_strategy_selector<1>; | ||
}; | ||
} // namespace rpp::operators::details | ||
|
||
namespace rpp::operators | ||
{ | ||
/** | ||
* @brief If an error occurs, take the result from the Selector and subscribe to that instead. | ||
* | ||
* @marble on_error_resume_next | ||
{ | ||
source observable : +-1-x | ||
operator "on_error_resume_next: () => obs(-2-3-|)" : +-1-2-3-| | ||
} | ||
* | ||
* @param selector callable taking a std::exception_ptr and returning observable to continue on | ||
* | ||
* @warning #include <rpp/operators/on_error_resume_next.hpp> | ||
* | ||
* @ingroup error_handling_operators | ||
* @see https://reactivex.io/documentation/operators/catch.html | ||
*/ | ||
template<typename Selector> | ||
requires rpp::constraint::observable<std::invoke_result_t<Selector, std::exception_ptr>> | ||
auto on_error_resume_next(Selector&& selector) | ||
{ | ||
return details::on_error_resume_next_t<std::decay_t<Selector>>{std::forward<Selector>(selector)}; | ||
} | ||
} // namespace rpp::operators |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
// ReactivePlusPlus library | ||
// | ||
// Copyright Aleksey Loginov 2023 - present. | ||
// Distributed under the Boost Software License, Version 1.0. | ||
// (See accompanying file LICENSE_1_0.txt or copy at | ||
// https://www.boost.org/LICENSE_1_0.txt) | ||
// | ||
// Project home: https://github.com/victimsnino/ReactivePlusPlus | ||
// | ||
|
||
#include <snitch/snitch.hpp> | ||
|
||
#include <rpp/operators/on_error_resume_next.hpp> | ||
#include <rpp/schedulers/immediate.hpp> | ||
#include <rpp/sources/create.hpp> | ||
#include <rpp/sources/empty.hpp> | ||
#include <rpp/sources/just.hpp> | ||
|
||
#include "disposable_observable.hpp" | ||
#include "mock_observer.hpp" | ||
|
||
TEMPLATE_TEST_CASE("on_error_resume_next switches observable on error", "", rpp::memory_model::use_stack, rpp::memory_model::use_shared) | ||
{ | ||
auto mock = mock_observer_strategy<int>(); | ||
SECTION("observable without error emission") | ||
{ | ||
auto obs = rpp::source::just<TestType>(rpp::schedulers::immediate{}, 1, 2, 3); | ||
SECTION("subscribe") | ||
{ | ||
obs | rpp::operators::on_error_resume_next([](const std::exception_ptr&) { return rpp::source::empty<int>(); }) | ||
| rpp::ops::subscribe(mock); | ||
SECTION("observer obtains values from observable") | ||
{ | ||
CHECK(mock.get_received_values() == std::vector{1, 2, 3}); | ||
CHECK(mock.get_total_on_next_count() == 3); | ||
CHECK(mock.get_on_error_count() == 0); | ||
CHECK(mock.get_on_completed_count() == 1); | ||
} | ||
} | ||
} | ||
|
||
SECTION("observable with one error emission") | ||
{ | ||
auto obs = rpp::source::create<int>([](const auto& sub) { | ||
sub.on_next(1); | ||
sub.on_next(2); | ||
sub.on_error(std::make_exception_ptr(std::runtime_error{""})); | ||
}); | ||
SECTION("subscribe") | ||
{ | ||
obs | rpp::operators::on_error_resume_next([](const std::exception_ptr&) { | ||
return rpp::source::just<TestType>(rpp::schedulers::immediate{}, 3); | ||
}) | ||
| rpp::ops::subscribe(mock); | ||
SECTION("observer obtains values from both outer and inner observable") | ||
{ | ||
CHECK(mock.get_received_values() == std::vector{1, 2, 3}); | ||
CHECK(mock.get_total_on_next_count() == 3); | ||
CHECK(mock.get_on_error_count() == 0); | ||
CHECK(mock.get_on_completed_count() == 1); | ||
} | ||
} | ||
} | ||
|
||
SECTION("observable with two error emissions") | ||
{ | ||
auto obs = rpp::source::create<int>([](const auto& sub) { | ||
sub.on_next(1); | ||
sub.on_next(2); | ||
sub.on_error(std::make_exception_ptr(std::runtime_error{""})); | ||
sub.on_error(std::make_exception_ptr(std::runtime_error{""})); | ||
}); | ||
SECTION("subscribe") | ||
{ | ||
obs | rpp::operators::on_error_resume_next([](const std::exception_ptr&) { | ||
return rpp::source::just<TestType>(rpp::schedulers::immediate{}, 3); | ||
}) | ||
| rpp::ops::subscribe(mock); | ||
SECTION("observer only receives values from first inner observable") | ||
{ | ||
CHECK(mock.get_received_values() == std::vector{1, 2, 3}); | ||
CHECK(mock.get_total_on_next_count() == 3); | ||
CHECK(mock.get_on_error_count() == 0); | ||
CHECK(mock.get_on_completed_count() == 1); | ||
} | ||
} | ||
} | ||
|
||
SECTION("inner observable different emission type") | ||
{ | ||
auto obs = rpp::source::create<int>([](const auto& sub) { | ||
sub.on_next(1); | ||
sub.on_next(2); | ||
sub.on_error(std::make_exception_ptr(std::runtime_error{""})); | ||
sub.on_error(std::make_exception_ptr(std::runtime_error{""})); | ||
}); | ||
SECTION("subscribe") | ||
{ | ||
obs | rpp::operators::on_error_resume_next([](const std::exception_ptr&) { | ||
return rpp::source::just<TestType>(rpp::schedulers::immediate{}, 3); | ||
}) | ||
| rpp::ops::subscribe(mock); | ||
SECTION("observer only receives values from first inner observable") | ||
{ | ||
CHECK(mock.get_received_values() == std::vector{1, 2, 3}); | ||
CHECK(mock.get_total_on_next_count() == 3); | ||
CHECK(mock.get_on_error_count() == 0); | ||
CHECK(mock.get_on_completed_count() == 1); | ||
} | ||
} | ||
} | ||
|
||
SECTION("nested on_error_resume_next operators") | ||
{ | ||
auto obs = rpp::source::create<int>([](const auto& sub) { | ||
sub.on_next(1); | ||
sub.on_error(std::make_exception_ptr(std::runtime_error{""})); | ||
}); | ||
SECTION("subscribe") | ||
{ | ||
obs | rpp::operators::on_error_resume_next([](const std::exception_ptr&) { | ||
return rpp::source::create<int>([](const auto& sub) { | ||
sub.on_next(2); | ||
sub.on_error(std::make_exception_ptr(std::runtime_error{""})); | ||
}) | ||
| rpp::operators::on_error_resume_next([](const std::exception_ptr&) { | ||
return rpp::source::create<int>([](const auto& sub) { | ||
sub.on_next(3); | ||
sub.on_completed(); | ||
}); | ||
}); | ||
}) | ||
| rpp::ops::subscribe(mock); | ||
SECTION("observer receives values without any errors") | ||
{ | ||
CHECK(mock.get_received_values() == std::vector{1, 2, 3}); | ||
CHECK(mock.get_total_on_next_count() == 3); | ||
CHECK(mock.get_on_error_count() == 0); | ||
CHECK(mock.get_on_completed_count() == 1); | ||
} | ||
} | ||
} | ||
} | ||
|
||
TEST_CASE("on_error_resume_next satisfies disposable contracts") | ||
{ | ||
auto observable_disposable = rpp::composite_disposable_wrapper::make(); | ||
{ | ||
auto observable = observable_with_disposable<int>(observable_disposable); | ||
|
||
test_operator_with_disposable<int>( | ||
rpp::ops::on_error_resume_next([](const std::exception_ptr&) { return rpp::source::empty<int>(); })); | ||
} | ||
|
||
CHECK(observable_disposable.is_disposed() || observable_disposable.lock().use_count() == 2); | ||
} |
151959c
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BENCHMARK RESULTS (AUTOGENERATED)
ci-ubuntu-gcc
General
Sources
Filtering Operators
Schedulers
Transforming Operators
Conditional Operators
Utility Operators
Combining Operators
Subjects
Scenarios
Aggregating Operators
Error Handling Operators
ci-macos
General
Sources
Filtering Operators
Schedulers
Transforming Operators
Conditional Operators
Utility Operators
Combining Operators
Subjects
Scenarios
Aggregating Operators
Error Handling Operators
ci-ubuntu-clang
General
Sources
Filtering Operators
Schedulers
Transforming Operators
Conditional Operators
Utility Operators
Combining Operators
Subjects
Scenarios
Aggregating Operators
Error Handling Operators
ci-windows
General
Sources
Filtering Operators
Schedulers
Transforming Operators
Conditional Operators
Utility Operators
Combining Operators
Subjects
Scenarios
Aggregating Operators
Error Handling Operators