diff --git a/src/rpp/rpp/observables/observable.hpp b/src/rpp/rpp/observables/observable.hpp index eef06500d..2a01affcc 100644 --- a/src/rpp/rpp/observables/observable.hpp +++ b/src/rpp/rpp/observables/observable.hpp @@ -142,7 +142,7 @@ class observable [[nodiscard("Use returned disposable or use subscribe(observer) instead")]] composite_disposable_wrapper subscribe_with_disposable(observer&& observer) const { if (!observer.is_disposed()) - return m_strategy.subscribe(rpp::composite_disposable_wrapper{std::make_shared>()}, std::move(observer)); + return subscribe(rpp::composite_disposable_wrapper{std::make_shared>()}, std::move(observer)); return {}; } @@ -264,10 +264,10 @@ class observable composite_disposable_wrapper subscribe(const composite_disposable_wrapper& d, OnNext&& on_next, OnError&& on_error = {}, OnCompleted&& on_completed = {}) const { if (!d.is_disposed()) - m_strategy.subscribe(make_lambda_observer(d, - std::forward(on_next), - std::forward(on_error), - std::forward(on_completed))); + subscribe(make_lambda_observer(d, + std::forward(on_next), + std::forward(on_error), + std::forward(on_completed))); return d; } diff --git a/src/rpp/rpp/operators/details/forwarding_subject.hpp b/src/rpp/rpp/operators/details/forwarding_subject.hpp index 6a285e918..de40f9eaa 100644 --- a/src/rpp/rpp/operators/details/forwarding_subject.hpp +++ b/src/rpp/rpp/operators/details/forwarding_subject.hpp @@ -25,6 +25,8 @@ class forwarding_strategy { struct observer_strategy { + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + std::shared_ptr> state{}; void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); } @@ -48,7 +50,7 @@ class forwarding_strategy auto get_observer() const { - return rpp::observer>{composite_disposable_wrapper::from_shared(m_state), observer_strategy{m_state}}; + return rpp::observer{observer_strategy{m_state}}; } template TObs> diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index 656e32f52..6dbd0636c 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -79,12 +79,14 @@ class subject_state : public std::enable_shared_from_this> { if (const auto observers = exchange_observers_under_lock_if_there(err)) rpp::utils::for_each(*observers, [&](const auto& sub) { sub.on_error(err); }); + dispose(); } void on_completed() { if (const auto observers = exchange_observers_under_lock_if_there(completed{})) rpp::utils::for_each(*observers, rpp::utils::static_mem_fn<&dynamic_observer::on_completed>{}); + dispose(); } private: diff --git a/src/rpp/rpp/subjects/publish_subject.hpp b/src/rpp/rpp/subjects/publish_subject.hpp index bbdb170d9..15258f9ab 100644 --- a/src/rpp/rpp/subjects/publish_subject.hpp +++ b/src/rpp/rpp/subjects/publish_subject.hpp @@ -25,6 +25,8 @@ class publish_strategy { struct observer_strategy { + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + std::shared_ptr> state{}; void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); } @@ -44,7 +46,7 @@ class publish_strategy auto get_observer() const { - return rpp::observer>{composite_disposable_wrapper{m_state}, observer_strategy{m_state}}; + return rpp::observer{observer_strategy{m_state}}; } template TObs> diff --git a/src/rpp/rpp/subjects/serialized_subject.hpp b/src/rpp/rpp/subjects/serialized_subject.hpp index 56bd3220d..07cad6eaa 100644 --- a/src/rpp/rpp/subjects/serialized_subject.hpp +++ b/src/rpp/rpp/subjects/serialized_subject.hpp @@ -30,6 +30,8 @@ class serialized_strategy struct observer_strategy { + using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy; + std::shared_ptr state{}; void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); } @@ -64,7 +66,7 @@ class serialized_strategy auto get_observer() const { - return rpp::observer>{get_disposable(), observer_strategy{m_state}}; + return rpp::observer{observer_strategy{m_state}}; } template TObs>