Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix memory leaks #245

Merged
merged 3 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ jobs:
file: build/coverage/coverage.info

sanitize:
# temporarly disabled
if: false
name: Sanitizer

needs: [download_deps]
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ include(cmake/prelude.cmake)

project(
ReactivePlusPlus
VERSION 0.1.0
VERSION 0.1.1
DESCRIPTION "ReactivePlusPlus is library for building asynchronous event-driven streams of data with help of sequences of primitive operators in the declarative form"
HOMEPAGE_URL "https://github.com/victimsnino/ReactivePlusPlus"
LANGUAGES CXX
Expand Down
43 changes: 28 additions & 15 deletions src/rpp/rpp/observables/connectable_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
#include <rpp/subscriptions/composite_subscription.hpp> // lifetime
#include <rpp/defs.hpp> // RPP_EMPTY_BASES

#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state


#include <memory>
#include <mutex>
Expand All @@ -42,18 +44,18 @@ class RPP_EMPTY_BASES connectable_observable
connectable_observable(const OriginalObservable& original_observable, const Subject& subject = Subject{})
: base{subject.get_observable()}
, m_original_observable{original_observable}
, m_subject{subject} {}
, m_state{std::make_shared<state_t>(subject)} {}

connectable_observable(OriginalObservable&& original_observable, const Subject& subject = Subject{})
: base{subject.get_observable()}
, m_original_observable{std::move(original_observable)}
, m_subject{subject} {}
, m_state{std::make_shared<state_t>(subject)} {}

composite_subscription connect(const composite_subscription& subscription = composite_subscription{}) const
{
auto subscriber = m_subject.get_subscriber();
auto subscriber = m_state->subject.get_subscriber();
const auto& subscriber_subscription = subscriber.get_subscription();

{
std::lock_guard lock(m_state->mutex);

Expand All @@ -64,33 +66,44 @@ class RPP_EMPTY_BASES connectable_observable
m_state->sub = subscription;
}

subscription.add([state = m_state, subscriber_subscription]
subscription.add([state = std::weak_ptr{m_state}]
{
auto current_sub = composite_subscription::empty();
if (const auto locked = state.lock())
{
std::lock_guard lock(state->mutex);
std::swap(current_sub, state->sub);
auto current_sub = composite_subscription::empty();
{
std::lock_guard lock(locked->mutex);
std::swap(current_sub, locked->sub);
}
current_sub.unsubscribe();
locked->subject.get_subscriber().get_subscription().remove(current_sub);
}
current_sub.unsubscribe();
subscriber_subscription.remove(current_sub);
});

m_original_observable.subscribe(m_state->sub, subscriber.get_observer());

m_original_observable.subscribe(create_subscriber_with_state<Type>(m_state->sub,
utils::forwarding_on_next{},
utils::forwarding_on_error{},
utils::forwarding_on_completed{},
subscriber.get_observer(),
// capture state to be sure that state is alive while ANY subscriber is alive
m_state));

return subscription;
}

private:
OriginalObservable m_original_observable;
Subject m_subject;

OriginalObservable m_original_observable;
struct state_t
{
state_t(const Subject& subj) : subject{subj} {}

Subject subject;
std::mutex mutex{};
composite_subscription sub = composite_subscription::empty();
};

std::shared_ptr<state_t> m_state = std::make_shared<state_t>();
std::shared_ptr<state_t> m_state{};
};

template<constraint::observable OriginalObservable, subjects::constraint::subject Subject>
Expand Down
23 changes: 17 additions & 6 deletions src/rpp/rpp/operators/ref_count.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
#include <rpp/subscribers/constraints.hpp>
#include <rpp/sources/create.hpp>

#include <rpp/operators/details/subscriber_with_state.hpp> // create_subscriber_with_state


IMPLEMENTATION_FILE(ref_count_tag);

namespace rpp::details
Expand Down Expand Up @@ -53,16 +56,24 @@ struct ref_count_on_subscribe
std::shared_ptr<ref_count_state_t> state = std::make_shared<ref_count_state_t>();

template<constraint::subscriber_of_type<Type> TSub>
void operator()(const TSub &subscriber) const
void operator()(TSub&& subscriber) const
{
const bool need_to_connect = state->on_subscribe();

subscriber.get_subscription().add([state = state]
{
state->on_unsubscribe();
});
subscriber.get_subscription().add([state = std::weak_ptr{state}]
{
if (const auto locked = state.lock())
locked->on_unsubscribe();
});

observable.subscribe(subscriber);
auto sub = subscriber.get_subscription();
observable.subscribe(create_subscriber_with_state<Type>(std::move(sub),
utils::forwarding_on_next{},
utils::forwarding_on_error{},
utils::forwarding_on_completed{},
std::forward<TSub>(subscriber),
// capture state to be sure that state is alive while ANY subscriber is alive
state));
if (need_to_connect)
observable.connect(state->get_subscription());
}
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/subjects/details/subject_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class subject_state : public std::enable_shared_from_this<subject_state<T>>
std::unique_lock lock{m_mutex};

process_state(m_state,
[&](shared_subscribers subs)
[&](const shared_subscribers& subs)
{
auto new_subs = make_copy_of_subscribed_subs(subs->size() + 1, subs);
new_subs->push_back(subscriber);
Expand Down
5 changes: 3 additions & 2 deletions src/rpp/rpp/subjects/publish_subject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ class publish_strategy
publish_strategy(const composite_subscription& sub)
: m_sub{sub}
{
m_sub.add([state = m_state]
m_sub.add([state = std::weak_ptr{m_state}]
{
state->on_unsubscribe();
if(const auto locked = state.lock())
locked->on_unsubscribe();
});
}

Expand Down