Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout operator #287

Merged
merged 8 commits into from
Nov 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/Implementation Status.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@
- [x] do_on_next
- [x] do_on_error
- [x] do_on_completed
- [ ] timeout
- [x] timeout

### Connectable

Expand Down
21 changes: 21 additions & 0 deletions src/benchmarks/rpp_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <rpp/observers/specific_observer.hpp>
#include <rpp/operators.hpp>
#include <rpp/subjects.hpp>
#include <rpp/schedulers/run_loop_scheduler.hpp>
#include <rpp/schedulers/trampoline_scheduler.hpp>
#include <rpp/utils/spinlock.hpp>

Expand Down Expand Up @@ -859,6 +860,26 @@ TEST_CASE("skip")
};
}

TEST_CASE("timeout")
{

BENCHMARK_ADVANCED("timeout construction from observable via dot + subscribe with run_loop")(Catch::Benchmark::Chronometer meter)
{
const auto obs = rpp::observable::create<int>([](const auto& sub) { sub.on_next(1); });
auto sub = rpp::specific_subscriber{[](const int&) {}};
rpp::schedulers::run_loop rl{};
meter.measure([&] { return obs.timeout(std::chrono::days{30}, rl).subscribe(sub); });
};

BENCHMARK_ADVANCED("sending of values from observable via timeout to subscriber with unreachable timeout interval with run_loop")(Catch::Benchmark::Chronometer meter)
{
rpp::schedulers::run_loop rl{};
rpp::source::create<int>([&](const auto& sub) { meter.measure([&] { sub.on_next(1); }); })
.timeout(std::chrono::days{30}, rl)
.subscribe([](const auto&) {});
};
}

TEST_CASE("chains creation test")
{
BENCHMARK_ADVANCED("long non-state chain creation + subscribe")(Catch::Benchmark::Chronometer meter)
Expand Down
19 changes: 19 additions & 0 deletions src/benchmarks/rxcpp_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,25 @@ TEST_CASE("skip")
};
}

TEST_CASE("timeout")
{
BENCHMARK_ADVANCED("timeout construction from observable via dot + subscribe with run_loop")(Catch::Benchmark::Chronometer meter)
{
const auto obs = rxcpp::sources::create<int>([](const auto& sub) { sub.on_next(1); });
auto sub = rxcpp::make_subscriber<int>([](const int&) {});
rxcpp::schedulers::run_loop rl{};
meter.measure([&] { return obs.timeout(std::chrono::days{30}, rxcpp::observe_on_run_loop(rl)).subscribe(sub); });
};

BENCHMARK_ADVANCED("sending of values from observable via timeout to subscriber with unreachable timeout interval with run_loop")(Catch::Benchmark::Chronometer meter)
{
rxcpp::schedulers::run_loop rl{};
rxcpp::sources::create<int>([&](const auto& sub) { meter.measure([&] { sub.on_next(1); }); })
.timeout(std::chrono::days{30}, rxcpp::observe_on_run_loop(rl))
.subscribe([](const auto&) {});
};
}

TEST_CASE("chains creation test")
{
BENCHMARK_ADVANCED("long non-state chain creation + subscribe")(Catch::Benchmark::Chronometer meter)
Expand Down
42 changes: 42 additions & 0 deletions src/examples/doxygen/timeout.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include <rpp/rpp.hpp>

#include <iostream>

/**
* \example timeout.cpp
**/
int main()
{
//! [timeout]
rpp::subjects::publish_subject<int> subj{};
subj.get_observable()
.timeout(std::chrono::milliseconds{450}, rpp::schedulers::new_thread{})
.subscribe([](int v) { std::cout << "new value " << v << std::endl; },
[](std::exception_ptr err)
{
try
{
std::rethrow_exception(err);
}
catch (const std::exception& exc)
{
std::cout << "ERR: " << exc.what() << std::endl;
}
},
[]() { std::cout << "completed" << std::endl; });
for (int i = 0; i < 10; ++i)
{
std::this_thread::sleep_for(std::chrono::milliseconds{i * 100});
subj.get_subscriber().on_next(i);
}

// Output:
// new value 0
// new value 1
// new value 2
// new value 3
// new value 4
// ERR : Timeout reached
//! [timeout]
return 0;
}
1 change: 1 addition & 0 deletions src/rpp/rpp/observables/interface_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ struct RPP_EMPTY_BASES interface_observable
, details::member_overload<Type, SpecificObservable, details::take_tag>
, details::member_overload<Type, SpecificObservable, details::take_until_tag>
, details::member_overload<Type, SpecificObservable, details::take_while_tag>
, details::member_overload<Type, SpecificObservable, details::timeout_tag>
, details::member_overload<Type, SpecificObservable, details::window_tag>
, details::member_overload<Type, SpecificObservable, details::with_latest_from_tag>
{
Expand Down
1 change: 1 addition & 0 deletions src/rpp/rpp/operators.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
#include <rpp/operators/observe_on.hpp>
#include <rpp/operators/repeat.hpp>
#include <rpp/operators/subscribe_on.hpp>
#include <rpp/operators/timeout.hpp>

/**
* \defgroup connectable_operators Connectable Operators
Expand Down
1 change: 1 addition & 0 deletions src/rpp/rpp/operators/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,6 @@
#include <rpp/operators/fwd/take_last.hpp>
#include <rpp/operators/fwd/take_until.hpp>
#include <rpp/operators/fwd/take_while.hpp>
#include <rpp/operators/fwd/timeout.hpp>
#include <rpp/operators/fwd/window.hpp>
#include <rpp/operators/fwd/with_latest_from.hpp>
61 changes: 61 additions & 0 deletions src/rpp/rpp/operators/fwd/timeout.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2022 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

#include <rpp/schedulers/constraints.hpp>

#include <rpp/observables/details/member_overload.hpp>

namespace rpp::details
{
struct timeout_tag;
}

namespace rpp::details
{
template<constraint::decayed_type Type, schedulers::constraint::scheduler TScheduler>
struct timeout_impl;

template<constraint::decayed_type Type, typename SpecificObservable>
struct member_overload<Type, SpecificObservable, timeout_tag>
{
/**
* \brief Forwards emissions from original observable, but emit error if no any events during specified period of time (since last emission)
*
* \marble timeout
{
source observable : +--1-2-3-4------5-|
operator "timeout(4)" : +--1-2-3-4----#
}
* \param period is maximum duration between emitted items before a timeout occurs
* \param scheduler is scheduler used to run timer for timeout
* \return new specific_observable with the timeout operator as most recent operator.
* \warning #include <rpp/operators/timeout.hpp>
*
* \par Example
* \snippet timeout.cpp timeout
*
* \ingroup utility_operators
* \see https://reactivex.io/documentation/operators/timeout.html
*/
template<schedulers::constraint::scheduler TScheduler>
auto timeout(schedulers::duration period, const TScheduler& scheduler = TScheduler{}) const & requires is_header_included<timeout_tag, TScheduler>
{
return static_cast<const SpecificObservable*>(this)->template lift<Type>(timeout_impl<Type, TScheduler>{period, scheduler});
}

template<schedulers::constraint::scheduler TScheduler>
auto timeout(schedulers::duration period, const TScheduler& scheduler = TScheduler{}) && requires is_header_included<timeout_tag, TScheduler>
{
return std::move(*static_cast<SpecificObservable*>(this)).template lift<Type>(timeout_impl<Type, TScheduler>{period, scheduler});
}
};
} // namespace rpp::details
119 changes: 119 additions & 0 deletions src/rpp/rpp/operators/timeout.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2022 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus
//

#pragma once

#include <rpp/operators/lift.hpp> // required due to operator uses lift
#include <rpp/operators/details/early_unsubscribe.hpp>
#include <rpp/operators/details/serialized_subscriber.hpp>
#include <rpp/operators/details/subscriber_with_state.hpp>
#include <rpp/operators/fwd/timeout.hpp>
#include <rpp/subscribers/constraints.hpp>
#include <rpp/utils/exceptions.hpp>

#include <rpp/utils/spinlock.hpp>

#include <atomic>

IMPLEMENTATION_FILE(timeout_tag);

namespace rpp::details
{
struct timeout_state : early_unsubscribe_state
{
using early_unsubscribe_state::early_unsubscribe_state;

std::atomic<schedulers::time_point> last_emission_time{};

static constexpr schedulers::time_point s_timeout_reached = schedulers::time_point::min();
};

template<typename Worker>
struct timeout_on_next
{
template<typename Value>
void operator()(Value&& v, const auto& subscriber, const std::shared_ptr<timeout_state>& state) const
{
if (state->last_emission_time.exchange(Worker::now(), std::memory_order_acq_rel) != timeout_state::s_timeout_reached)
subscriber.on_next(std::forward<Value>(v));
}
};

using timeout_on_error = early_unsubscribe_on_error;
using timeout_on_completed = early_unsubscribe_on_completed;

struct timeout_state_with_serialized_spinlock : timeout_state
{
using timeout_state::timeout_state;

// spinlock because most part of time there is only one thread would be active
utils::spinlock spinlock{};
};

template<constraint::decayed_type Type, schedulers::constraint::scheduler TScheduler>
struct timeout_impl
{
schedulers::duration period;
TScheduler scheduler;

template<constraint::subscriber_of_type<Type> TSub>
auto operator()(TSub&& in_subscriber) const
{
auto state = std::make_shared<timeout_state_with_serialized_spinlock>(in_subscriber.get_subscription());
// change subscriber to serialized to avoid manual using of mutex
auto subscriber = make_serialized_subscriber(std::forward<TSub>(in_subscriber),
std::shared_ptr<utils::spinlock>{state, &state->spinlock});

const auto worker = scheduler.create_worker(state->children_subscriptions);
state->last_emission_time.store(worker.now(), std::memory_order_relaxed);

const auto last_emission_time = state->last_emission_time.load(std::memory_order_relaxed);
worker.schedule(last_emission_time + period,
[period = period, prev_emission_time = last_emission_time, subscriber, state]() mutable -> schedulers::optional_duration
{
while (true)
{
// last emission time still same value -> timeout reached, else -> prev_emission_time
// would be update to actual emission time
if (state->last_emission_time.compare_exchange_strong(prev_emission_time,
timeout_state::s_timeout_reached,
std::memory_order_acq_rel))
return time_is_out(state, subscriber);

// if we still need to wait a bit more -> let's wait
if (const auto diff_to_schedule = (prev_emission_time + period) - decltype(worker)::now();
diff_to_schedule > rpp::schedulers::duration{0})
return diff_to_schedule;

// okay, we here because:
// 1) last_emission_time was not equal to prev_emission_time
// 2) last_emission_time + period before now -> we are still in timeout state
// 3) prev_emission_time updated to last_emission_time
// So we can return to begin
}
});

return create_subscriber_with_state<Type>(state->children_subscriptions,
timeout_on_next<decltype(worker)>{},
timeout_on_error{},
timeout_on_completed{},
std::move(subscriber),
std::move(state));
}

private:
static schedulers::optional_duration time_is_out(const auto& state, const auto& subscriber)
{
state->children_subscriptions.unsubscribe();
subscriber.on_error(std::make_exception_ptr(utils::timeout{"Timeout reached"}));
return std::nullopt;
}
};
} // namespace rpp::details
2 changes: 2 additions & 0 deletions src/rpp/rpp/schedulers/details/worker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class worker final : public details::worker_tag
m_strategy.defer_at(time_point, std::forward<decltype(fn)>(fn));
}

static time_point now() { return Strategy::now(); }

private:
Strategy m_strategy;
};
Expand Down
5 changes: 4 additions & 1 deletion src/rpp/rpp/utils/exceptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@

namespace rpp::utils
{

struct not_enough_emissions : std::runtime_error
{
using std::runtime_error::runtime_error;
};

struct timeout : std::runtime_error
{
using std::runtime_error::runtime_error;
};
} // namespace rpp::utils
2 changes: 1 addition & 1 deletion src/tests/test_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

#include <rpp/schedulers.hpp>

static rpp::schedulers::time_point s_current_time{std::chrono::seconds{0}};
static rpp::schedulers::time_point s_current_time{std::chrono::seconds{10}};

class test_scheduler final : public rpp::schedulers::details::scheduler_tag
{
Expand Down
Loading