Skip to content

Commit

Permalink
Support early-unsubscribing.
Browse files Browse the repository at this point in the history
  • Loading branch information
tcw165 committed Sep 8, 2022
1 parent 736ea8d commit d6be308
Showing 1 changed file with 17 additions and 5 deletions.
22 changes: 17 additions & 5 deletions src/rpp/rpp/operators/take_until.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#pragma once

#include <rpp/defs.hpp>
#include <rpp/operators/details/early_unsubscribe.hpp>
#include <rpp/operators/fwd/take_until.hpp>
#include <rpp/subscribers/constraints.hpp>
#include <rpp/utils/functors.hpp>
Expand All @@ -25,8 +26,10 @@ IMPLEMENTATION_FILE(take_until_tag);
namespace rpp::details
{

struct take_until_state
struct take_until_state : early_unsubscribe_state
{
using early_unsubscribe_state::early_unsubscribe_state;

std::mutex mutex;
bool is_stopped{false};
};
Expand Down Expand Up @@ -56,6 +59,9 @@ struct take_until_on_error
const auto &subscriber,
const std::shared_ptr<take_until_state> &state) const
{
// Early unsubscribe the sub-subscription tree for the streams of this and above. This early-unsubscribing prevents the race-condition in between on_next and on_error events.
state->childs_subscriptions.unsubscribe();

std::lock_guard lock{state->mutex};

state->is_stopped = true;
Expand All @@ -71,6 +77,9 @@ struct take_until_on_completed
void operator()(const auto& subscriber,
const std::shared_ptr<take_until_state>& state) const
{
// Early unsubscribe the sub-subscription tree for the streams of this and above. This early-unsubscribing prevents the race-condition in between on_next and on_completed events.
state->childs_subscriptions.unsubscribe();

std::lock_guard lock{state->mutex};

state->is_stopped = true;
Expand All @@ -87,6 +96,9 @@ struct take_until_throttler_on_next {
const auto &subscriber,
const std::shared_ptr<take_until_state> &state) const
{
// Early unsubscribe the sub-subscription tree for the streams of this and above. This early-unsubscribing prevents the race-condition in between on_next and on_completed events.
state->childs_subscriptions.unsubscribe();

std::lock_guard lock{state->mutex};

state->is_stopped = true;
Expand Down Expand Up @@ -117,19 +129,19 @@ struct take_until_impl
template<constraint::subscriber_of_type<Type> TSub>
auto operator()(TSub&& subscriber) const
{
auto state = std::make_shared<take_until_state>();
auto state = std::make_shared<take_until_state>(subscriber.get_subscription());

// Subscribe to trigger observable
auto child_subscription = subscriber.get_subscription().make_child();
auto until_subscription = state->childs_subscriptions.make_child();
m_until_observable.subscribe(
create_subscriber_with_state<TriggerType>(std::move(child_subscription),
create_subscriber_with_state<TriggerType>(std::move(until_subscription),
take_until_throttler_on_next<TriggerType>{},
take_until_throttler_on_error{},
take_until_throttler_on_completed{},
std::forward<decltype(subscriber)>(subscriber),
state));

auto subscription = subscriber.get_subscription();
auto subscription = state->childs_subscriptions.make_child();
return create_subscriber_with_state<Type>(std::move(subscription),
take_until_on_next<Type>{},
take_until_on_error{},
Expand Down

0 comments on commit d6be308

Please sign in to comment.