diff --git a/src/rpp/rpp/operators.hpp b/src/rpp/rpp/operators.hpp index a21a73327..d91434379 100644 --- a/src/rpp/rpp/operators.hpp +++ b/src/rpp/rpp/operators.hpp @@ -94,6 +94,7 @@ #include #include #include +#include /** * @defgroup connectable_operators Connectable Operators diff --git a/src/rpp/rpp/operators/tap.hpp b/src/rpp/rpp/operators/tap.hpp new file mode 100644 index 000000000..17b44619f --- /dev/null +++ b/src/rpp/rpp/operators/tap.hpp @@ -0,0 +1,161 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#pragma once + +#include + +#include +#include + +namespace rpp::operators::details +{ +template< + rpp::constraint::observer TObserver, + rpp::constraint::decayed_type OnNext, + rpp::constraint::decayed_type OnError, + rpp::constraint::decayed_type OnCompleted> +struct tap_observer_strategy +{ + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + + RPP_NO_UNIQUE_ADDRESS TObserver observer; + RPP_NO_UNIQUE_ADDRESS OnNext onNext; + RPP_NO_UNIQUE_ADDRESS OnError onError; + RPP_NO_UNIQUE_ADDRESS OnCompleted onCompleted; + + template + void on_next(T&& v) const + { + onNext(utils::as_const(v)); + observer.on_next(std::forward(v)); + } + + void on_error(const std::exception_ptr& err) const + { + onError(err); + observer.on_error(err); + } + + void on_completed() const + { + onCompleted(); + observer.on_completed(); + } + + void set_upstream(const disposable_wrapper& d) { observer.set_upstream(d); } + + bool is_disposed() const { return observer.is_disposed(); } +}; + +template< + rpp::constraint::decayed_type OnNext, + rpp::constraint::decayed_type OnError, + rpp::constraint::decayed_type OnCompleted> +struct tap_t : public operators::details::operator_observable_strategy +{ + using operators::details::operator_observable_strategy::operator_observable_strategy; + + template + requires rpp::constraint::invocable_r_v + using result_value = T; + + template + using updated_disposable_strategy = Prev; +}; +} + +namespace rpp::operators +{ +/** + * @brief Register callbacks to inspect observable emissions and perform side-effects + * + * @param on_error error handler + * + * @ingroup utility_operators + * @see https://reactivex.io/documentation/operators/do.html + */ +template OnError = rpp::utils::empty_function_t> +auto tap(OnError&& on_error) +{ + using OnNext = rpp::utils::empty_function_any_t; + using OnCompleted = rpp::utils::empty_function_t<>; + + return details::tap_t, std::decay_t, std::decay_t>{ + OnNext{}, + std::forward(on_error), + OnCompleted{}}; +} + +/** + * @brief Register callbacks to inspect observable emissions and perform side-effects + * + * @param on_completed completion handler + * + * @ingroup utility_operators + * @see https://reactivex.io/documentation/operators/do.html + */ +template OnCompleted = rpp::utils::empty_function_t<>> +auto tap(OnCompleted&& on_completed) +{ + using OnNext = rpp::utils::empty_function_any_t; + using OnError = rpp::utils::empty_function_t; + + return details::tap_t, std::decay_t, std::decay_t>{ + OnNext{}, + OnError{}, + std::forward(on_completed)}; +} + +/** + * @brief Register callbacks to inspect observable emissions and perform side-effects + * + * @param on_next next handler + * @param on_completed completion handler + * + * @ingroup utility_operators + * @see https://reactivex.io/documentation/operators/do.html + */ +template OnCompleted = rpp::utils::empty_function_t<>> +auto tap(OnNext&& on_next, + OnCompleted&& on_completed) +{ + using OnError = rpp::utils::empty_function_t; + + return details::tap_t, std::decay_t, std::decay_t>{ + std::forward(on_next), + OnError{}, + std::forward(on_completed)}; +} + +/** + * @brief Register callbacks to inspect observable emissions and perform side-effects + * + * @param on_next next handler + * @param on_error error handler + * @param on_completed completion handler + * + * @ingroup utility_operators + * @see https://reactivex.io/documentation/operators/do.html + */ +template OnError = rpp::utils::empty_function_t, + std::invocable<> OnCompleted = rpp::utils::empty_function_t<>> +auto tap(OnNext&& on_next = {}, + OnError&& on_error = {}, + OnCompleted&& on_completed = {}) +{ + return details::tap_t, std::decay_t, std::decay_t>{ + std::forward(on_next), + std::forward(on_error), + std::forward(on_completed)}; +} +} // namespace rpp::operators \ No newline at end of file diff --git a/src/tests/rpp/test_tap.cpp b/src/tests/rpp/test_tap.cpp new file mode 100644 index 000000000..f08d5e5e0 --- /dev/null +++ b/src/tests/rpp/test_tap.cpp @@ -0,0 +1,95 @@ +// ReactivePlusPlus library +// +// Copyright Aleksey Loginov 2023 - present. +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// https://www.boost.org/LICENSE_1_0.txt) +// +// Project home: https://github.com/victimsnino/ReactivePlusPlus +// + +#include + +#include +#include +#include +#include + +#include "copy_count_tracker.hpp" +#include "disposable_observable.hpp" +#include "mock_observer.hpp" + +TEMPLATE_TEST_CASE("tap observes emissions and doesn't modify them", "", rpp::memory_model::use_stack, rpp::memory_model::use_shared) +{ + auto mock = mock_observer_strategy{}; + + SECTION("observable with error emission") + { + auto obs = + rpp::source::concat(rpp::source::just(1, 2, 3), + rpp::source::error(std::make_exception_ptr(std::runtime_error{""}))); + + SECTION("subscribe") + { + size_t on_next_invoked = 0; + size_t on_error_invoked = 0; + + // clang-format off + obs | rpp::ops::tap( + [&](const int&) { ++on_next_invoked; }, + [&](const std::exception_ptr&) { ++on_error_invoked; }) + | rpp::ops::subscribe(mock); + // clang-format on + + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock.get_on_error_count() == 1); + CHECK(mock.get_on_completed_count() == 0); + + CHECK(on_next_invoked == mock.get_total_on_next_count()); + CHECK(on_error_invoked == mock.get_on_error_count()); + } + } + + SECTION("observable with completed emission") + { + auto obs = rpp::source::just(1, 2, 3); + + SECTION("subscribe") + { + size_t on_next_invoked = 0; + size_t on_completed_invoked = 0; + + // clang-format off + obs | rpp::ops::tap( + [&](const int&) { ++on_next_invoked; }, + [&]() { ++on_completed_invoked; }) + | rpp::ops::subscribe(mock); + // clang-format on + + CHECK(mock.get_received_values() == std::vector{1, 2, 3}); + CHECK(mock.get_on_error_count() == 0); + CHECK(mock.get_on_completed_count() == 1); + + CHECK(on_next_invoked == mock.get_total_on_next_count()); + CHECK(on_completed_invoked == mock.get_on_completed_count()); + } + } +} + +TEST_CASE("tap doesn't produce extra copies") +{ + // clang-format off + copy_count_tracker::test_operator(rpp::ops::tap(), + { + .send_by_copy = { .copy_count = 1, // 1 copy on emission + .move_count = 0 }, + .send_by_move = { .copy_count = 0, + .move_count = 1 } // 1 move on emission + }); + // clang-format on +} + +TEST_CASE("tap satisfies disposable contracts") +{ + test_operator_with_disposable(rpp::ops::tap()); +} \ No newline at end of file