From ba9e6fb563ff59cc5484e6a2d45f6c8202cc65fd Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 30 Aug 2022 14:56:36 +0300 Subject: [PATCH 1/2] Fix memory leaks --- CMakeLists.txt | 2 +- .../observables/connectable_observable.hpp | 43 ++++++++++++------- src/rpp/rpp/operators/ref_count.hpp | 23 +++++++--- .../rpp/subjects/details/subject_state.hpp | 2 +- src/rpp/rpp/subjects/publish_subject.hpp | 5 ++- 5 files changed, 50 insertions(+), 25 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 11762defe..46cb46281 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -26,7 +26,7 @@ include(cmake/prelude.cmake) project( ReactivePlusPlus - VERSION 0.1.0 + VERSION 0.1.1 DESCRIPTION "ReactivePlusPlus is library for building asynchronous event-driven streams of data with help of sequences of primitive operators in the declarative form" HOMEPAGE_URL "https://github.com/victimsnino/ReactivePlusPlus" LANGUAGES CXX diff --git a/src/rpp/rpp/observables/connectable_observable.hpp b/src/rpp/rpp/observables/connectable_observable.hpp index df0674168..37fcea975 100644 --- a/src/rpp/rpp/observables/connectable_observable.hpp +++ b/src/rpp/rpp/observables/connectable_observable.hpp @@ -16,6 +16,8 @@ #include // lifetime #include // RPP_EMPTY_BASES +#include // create_subscriber_with_state + #include #include @@ -42,18 +44,18 @@ class RPP_EMPTY_BASES connectable_observable connectable_observable(const OriginalObservable& original_observable, const Subject& subject = Subject{}) : base{subject.get_observable()} , m_original_observable{original_observable} - , m_subject{subject} {} + , m_state{std::make_shared(subject)} {} connectable_observable(OriginalObservable&& original_observable, const Subject& subject = Subject{}) : base{subject.get_observable()} , m_original_observable{std::move(original_observable)} - , m_subject{subject} {} + , m_state{std::make_shared(subject)} {} composite_subscription connect(const composite_subscription& subscription = composite_subscription{}) const { - auto subscriber = m_subject.get_subscriber(); + auto subscriber = m_state->subject.get_subscriber(); const auto& subscriber_subscription = subscriber.get_subscription(); - + { std::lock_guard lock(m_state->mutex); @@ -64,33 +66,44 @@ class RPP_EMPTY_BASES connectable_observable m_state->sub = subscription; } - subscription.add([state = m_state, subscriber_subscription] + subscription.add([state = std::weak_ptr{m_state}] { - auto current_sub = composite_subscription::empty(); + if (const auto locked = state.lock()) { - std::lock_guard lock(state->mutex); - std::swap(current_sub, state->sub); + auto current_sub = composite_subscription::empty(); + { + std::lock_guard lock(locked->mutex); + std::swap(current_sub, locked->sub); + } + current_sub.unsubscribe(); + locked->subject.get_subscriber().get_subscription().remove(current_sub); } - current_sub.unsubscribe(); - subscriber_subscription.remove(current_sub); }); - m_original_observable.subscribe(m_state->sub, subscriber.get_observer()); + + m_original_observable.subscribe(create_subscriber_with_state(m_state->sub, + utils::forwarding_on_next{}, + utils::forwarding_on_error{}, + utils::forwarding_on_completed{}, + subscriber.get_observer(), + // capture state to be sure that state is alive while ANY subscriber is alive + m_state)); return subscription; } private: - OriginalObservable m_original_observable; - Subject m_subject; - + OriginalObservable m_original_observable; struct state_t { + state_t(const Subject& subj) : subject{subj} {} + + Subject subject; std::mutex mutex{}; composite_subscription sub = composite_subscription::empty(); }; - std::shared_ptr m_state = std::make_shared(); + std::shared_ptr m_state{}; }; template diff --git a/src/rpp/rpp/operators/ref_count.hpp b/src/rpp/rpp/operators/ref_count.hpp index 6874c9b0e..c2348254e 100644 --- a/src/rpp/rpp/operators/ref_count.hpp +++ b/src/rpp/rpp/operators/ref_count.hpp @@ -15,6 +15,9 @@ #include #include +#include // create_subscriber_with_state + + IMPLEMENTATION_FILE(ref_count_tag); namespace rpp::details @@ -53,16 +56,24 @@ struct ref_count_on_subscribe std::shared_ptr state = std::make_shared(); template TSub> - void operator()(const TSub &subscriber) const + void operator()(TSub&& subscriber) const { const bool need_to_connect = state->on_subscribe(); - subscriber.get_subscription().add([state = state] - { - state->on_unsubscribe(); - }); + subscriber.get_subscription().add([state = std::weak_ptr{state}] + { + if (const auto locked = state.lock()) + locked->on_unsubscribe(); + }); - observable.subscribe(subscriber); + auto sub = subscriber.get_subscription(); + observable.subscribe(create_subscriber_with_state(std::move(sub), + utils::forwarding_on_next{}, + utils::forwarding_on_error{}, + utils::forwarding_on_completed{}, + std::forward(subscriber), + // capture state to be sure that state is alive while ANY subscriber is alive + state)); if (need_to_connect) observable.connect(state->get_subscription()); } diff --git a/src/rpp/rpp/subjects/details/subject_state.hpp b/src/rpp/rpp/subjects/details/subject_state.hpp index 8ad0f86d6..dcb603015 100644 --- a/src/rpp/rpp/subjects/details/subject_state.hpp +++ b/src/rpp/rpp/subjects/details/subject_state.hpp @@ -42,7 +42,7 @@ class subject_state : public std::enable_shared_from_this> std::unique_lock lock{m_mutex}; process_state(m_state, - [&](shared_subscribers subs) + [&](const shared_subscribers& subs) { auto new_subs = make_copy_of_subscribed_subs(subs->size() + 1, subs); new_subs->push_back(subscriber); diff --git a/src/rpp/rpp/subjects/publish_subject.hpp b/src/rpp/rpp/subjects/publish_subject.hpp index 83837ea75..0417de5af 100644 --- a/src/rpp/rpp/subjects/publish_subject.hpp +++ b/src/rpp/rpp/subjects/publish_subject.hpp @@ -24,9 +24,10 @@ class publish_strategy publish_strategy(const composite_subscription& sub) : m_sub{sub} { - m_sub.add([state = m_state] + m_sub.add([state = std::weak_ptr{m_state}] { - state->on_unsubscribe(); + if(const auto locked = state.lock()) + locked->on_unsubscribe(); }); } From 6b78aa16cc601f56482507b49dabe90d4367257a Mon Sep 17 00:00:00 2001 From: Aleksey Loginov Date: Tue, 30 Aug 2022 14:57:05 +0300 Subject: [PATCH 2/2] enable CI --- .github/workflows/ci.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6b7a9cac9..4a7c4e7ea 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -86,8 +86,6 @@ jobs: file: build/coverage/coverage.info sanitize: - # temporarly disabled - if: false name: Sanitizer needs: [download_deps]