Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update subjects #526

Merged
merged 10 commits into from
Feb 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ DerivePointerAlignment: false
EmptyLineAfterAccessModifier: Never
EmptyLineBeforeAccessModifier: Always
ExperimentalAutoDetectBinPacking: true
# AllowShortCompoundRequirementOnASingleLine: true
RequiresExpressionIndentation: OuterScope
FixNamespaceComments: true
IncludeBlocks: Regroup
IncludeCategories:
Expand Down
10 changes: 5 additions & 5 deletions .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ WarningsAsErrors: '*'
HeaderFilterRegex: './src/.*'
AnalyzeTemporaryDtors: false
FormatStyle: 'file'
HeaderFileExtensions:
- h
- hpp
ImplementationFileExtensions:
- cpp
# HeaderFileExtensions:
# - h
# - hpp
# ImplementationFileExtensions:
# - cpp
CheckOptions:
- { key: readability-identifier-naming.NamespaceCase, value: lower_case }
- { key: readability-identifier-naming.ClassCase, value: lower_case }
Expand Down
2 changes: 1 addition & 1 deletion docs/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ RPP follows this contract and especially this part. It means, that:
2. Any user-provided callbacks (for operators or observers) can be not thread-safe due to thread-safety of observable is guaranteed. <br>
For example: internal logic of `take` operator doesn't use mutexes or atomics due to underlying observable **MUST** emit items serially
3. When you implement your own operator via `create` be careful to **follow this contract**!
4. It is true **EXCEPT FOR** subjects if they are used manually due to users can use subjects for its own purposes there is potentially place for breaking this concept. Be careful and use serialized_subject instead if can't guarantee serial emissions!
4. It is true **EXCEPT FOR** subjects if they are used manually due to users can use subjects for its own purposes there is potentially place for breaking this concept. Be careful and use serialized_* instead if can't guarantee serial emissions!

It means, that for example:

Expand Down
4 changes: 3 additions & 1 deletion src/rpp/rpp/observables/connectable_observable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,14 @@ namespace rpp
*
* @ingroup observables
*/
template<rpp::constraint::observable OriginalObservable, rpp::constraint::subject Subject>
template<rpp::constraint::observable OriginalObservable, typename Subject>
class connectable_observable final : public decltype(std::declval<Subject>().get_observable())
{
using base = decltype(std::declval<Subject>().get_observable());

public:
static_assert(rpp::constraint::subject<Subject>);

connectable_observable(const OriginalObservable& original_observable, const Subject& subject = Subject{})
: base{subject.get_observable()}
, m_original_observable{original_observable}
Expand Down
3 changes: 1 addition & 2 deletions src/rpp/rpp/observables/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#pragma once

#include <rpp/observers/fwd.hpp>
#include <rpp/subjects/fwd.hpp>

#include <rpp/observables/details/disposable_strategy.hpp>
#include <rpp/utils/constraints.hpp>
Expand Down Expand Up @@ -70,7 +69,7 @@ namespace rpp::constraint

namespace rpp
{
template<rpp::constraint::observable OriginalObservable, rpp::constraint::subject Subject>
template<rpp::constraint::observable OriginalObservable, typename Subject>
class connectable_observable;
} // namespace rpp

Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/concat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ namespace rpp::operators::details
{
auto queue = get_queue();
if (queue->empty())
return {};
return std::nullopt;
auto observable = queue->front();
queue->pop();
return observable;
Expand Down
32 changes: 17 additions & 15 deletions src/rpp/rpp/operators/details/forwarding_subject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,24 @@

#include <rpp/subjects/fwd.hpp>

#include <rpp/disposables/disposable_wrapper.hpp>
#include <rpp/disposables/refcount_disposable.hpp>
#include <rpp/observers/observer.hpp>
#include <rpp/subjects/details/base_subject.hpp>
#include <rpp/subjects/details/subject_on_subscribe.hpp>
#include <rpp/subjects/details/subject_state.hpp>

#include <memory>

namespace rpp::operators::details
{
template<rpp::constraint::decayed_type Type>
class forwarding_strategy
class forwarding_subject
{
struct observer_strategy
{
using preferred_disposable_strategy = rpp::details::observers::none_disposable_strategy;

std::shared_ptr<subjects::details::subject_state<Type>> state{};
std::shared_ptr<subjects::details::subject_state<Type, false>> state{};

void set_upstream(const disposable_wrapper& d) const noexcept { state->add(d); }

Expand All @@ -41,9 +42,9 @@ namespace rpp::operators::details
};

public:
using expected_disposable_strategy = typename rpp::details::observables::deduce_disposable_strategy_t<subjects::details::subject_state<Type>>::template add<1>;
using expected_disposable_strategy = typename rpp::details::observables::deduce_disposable_strategy_t<subjects::details::subject_state<Type, false>>::template add<1>;

explicit forwarding_strategy(disposable_wrapper_impl<rpp::refcount_disposable> refcount)
explicit forwarding_subject(disposable_wrapper_impl<rpp::refcount_disposable> refcount)
: m_refcount{std::move(refcount)}
{
}
Expand All @@ -53,12 +54,16 @@ namespace rpp::operators::details
return rpp::observer<Type, observer_strategy>{m_state.lock()};
}

template<rpp::constraint::observer_of_type<Type> TObs>
void on_subscribe(TObs&& observer) const
auto get_observable() const
{
if (const auto locked = m_refcount.lock())
observer.set_upstream(locked->add_ref());
m_state.lock()->on_subscribe(std::forward<TObs>(observer));
return subjects::details::create_subject_on_subscribe_observable<Type, expected_disposable_strategy>([state = m_state.as_weak(), refcount = m_refcount]<rpp::constraint::observer_of_type<Type> TObs>(TObs&& observer) {
if (const auto locked_state = state.lock())
{
if (const auto locked = refcount.lock())
observer.set_upstream(locked->add_ref());
locked_state->on_subscribe(std::forward<TObs>(observer));
}
});
}

rpp::composite_disposable_wrapper get_disposable() const
Expand All @@ -67,10 +72,7 @@ namespace rpp::operators::details
}

private:
disposable_wrapper_impl<rpp::refcount_disposable> m_refcount;
disposable_wrapper_impl<subjects::details::subject_state<Type>> m_state = disposable_wrapper_impl<subjects::details::subject_state<Type>>::make();
disposable_wrapper_impl<rpp::refcount_disposable> m_refcount;
disposable_wrapper_impl<subjects::details::subject_state<Type, false>> m_state = disposable_wrapper_impl<subjects::details::subject_state<Type, false>>::make();
};

template<rpp::constraint::decayed_type Type>
using forwarding_subject = subjects::details::base_subject<Type, forwarding_strategy<Type>>;
} // namespace rpp::operators::details
2 changes: 1 addition & 1 deletion src/rpp/rpp/operators/details/strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
static auto apply(Observer&& observer, const Args&... vals)
{
static_assert(rpp::constraint::observer_of_type<std::decay_t<Observer>, typename Operator::template operator_traits<Type>::result_type>);
return rpp::observer<Type, typename Operator::template operator_traits<Type>::template observer_strategy<std::decay_t<Observer>>>{std::forward<Observer>(observer), vals...};
return rpp::observer<Type, typename Operator::template operator_traits<Type>::template observer_strategy<std::decay_t<Observer>>>{std::forward<Observer>(observer), vals...}; // NOLINT

Check warning on line 49 in src/rpp/rpp/operators/details/strategy.hpp

View workflow job for this annotation

GitHub Actions / tests ci-ubuntu-gcc Release

‘*((void*)&<anonymous> +4)’ is used uninitialized in this function [-Wuninitialized]

Check warning on line 49 in src/rpp/rpp/operators/details/strategy.hpp

View workflow job for this annotation

GitHub Actions / tests ci-ubuntu-gcc Release

‘*((void*)&<anonymous> +4)’ may be used uninitialized in this function [-Wmaybe-uninitialized]

Check warning on line 49 in src/rpp/rpp/operators/details/strategy.hpp

View workflow job for this annotation

GitHub Actions / tests ci-ubuntu-gcc Release

‘*((void*)&<anonymous> +4)’ may be used uninitialized in this function [-Wmaybe-uninitialized]
}

private:
Expand Down
3 changes: 2 additions & 1 deletion src/rpp/rpp/sources/create.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ namespace rpp::details
template<constraint::decayed_type Type, constraint::on_subscribe<Type> OnSubscribe>
struct create_strategy
{
using value_type = Type;
using value_type = Type;
using expected_disposable_strategy = rpp::details::observables::deduce_disposable_strategy_t<OnSubscribe>;

RPP_NO_UNIQUE_ADDRESS OnSubscribe subscribe;
};
Expand Down
2 changes: 1 addition & 1 deletion src/rpp/rpp/subjects.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@
*/

#include <rpp/subjects/publish_subject.hpp>
#include <rpp/subjects/serialized_subject.hpp>
#include <rpp/subjects/replay_subject.hpp>
62 changes: 0 additions & 62 deletions src/rpp/rpp/subjects/details/base_subject.hpp

This file was deleted.

32 changes: 32 additions & 0 deletions src/rpp/rpp/subjects/details/subject_on_subscribe.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// ReactivePlusPlus library
//
// Copyright Aleksey Loginov 2022 - present.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// https://www.boost.org/LICENSE_1_0.txt)
//
// Project home: https://github.com/victimsnino/ReactivePlusPlus

#pragma once

#include <rpp/sources/fwd.hpp>

#include <rpp/observables/observable.hpp>

namespace rpp::subjects::details
{
template<constraint::decayed_type Type, ::rpp::constraint::on_subscribe<Type> OnSubscribe, typename DisposableStrategy>
struct subject_on_subscribe_strategy
{
using value_type = Type;
using expected_disposable_strategy = DisposableStrategy;

RPP_NO_UNIQUE_ADDRESS OnSubscribe subscribe;
};

template<constraint::decayed_type Type, typename DisposableStrategy, rpp::constraint::on_subscribe<Type> OnSubscribe>
auto create_subject_on_subscribe_observable(OnSubscribe&& on_subscribe)
{
return rpp::observable<Type, subject_on_subscribe_strategy<Type, std::decay_t<OnSubscribe>, DisposableStrategy>>(std::forward<OnSubscribe>(on_subscribe));
}
} // namespace rpp::subjects::details
30 changes: 20 additions & 10 deletions src/rpp/rpp/subjects/details/subject_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,18 @@ namespace rpp::subjects::details
{
};

template<rpp::constraint::decayed_type Type>
class subject_state : public std::enable_shared_from_this<subject_state<Type>>
, public composite_disposable
template<rpp::constraint::decayed_type Type, bool Serialized>
class subject_state : public composite_disposable
, public rpp::details::enable_wrapper_from_this<subject_state<Type, Serialized>>
{
using shared_observers = std::shared_ptr<std::vector<rpp::dynamic_observer<Type>>>;
using state_t = std::variant<shared_observers, std::exception_ptr, completed, disposed>;

public:
using expected_disposable_strategy = rpp::details::observables::atomic_fixed_disposable_strategy_selector<1>;

subject_state() = default;

template<rpp::constraint::observer_of_type<Type> TObs>
void on_subscribe(TObs&& observer)
{
Expand Down Expand Up @@ -72,21 +74,28 @@ namespace rpp::subjects::details

void on_next(const Type& v)
{
std::lock_guard lock{m_serialized_mutex};
if (const auto observers = extract_observers_under_lock_if_there())
rpp::utils::for_each(*observers, [&](const auto& sub) { sub.on_next(v); });
}

void on_error(const std::exception_ptr& err)
{
if (const auto observers = exchange_observers_under_lock_if_there(err))
rpp::utils::for_each(*observers, [&](const auto& sub) { sub.on_error(err); });
{
std::lock_guard lock{m_serialized_mutex};
if (const auto observers = exchange_observers_under_lock_if_there(err))
rpp::utils::for_each(*observers, [&](const auto& sub) { sub.on_error(err); });
}
dispose();
}

void on_completed()
{
if (const auto observers = exchange_observers_under_lock_if_there(completed{}))
rpp::utils::for_each(*observers, rpp::utils::static_mem_fn<&dynamic_observer<Type>::on_completed>{});
{
std::lock_guard lock{m_serialized_mutex};
if (const auto observers = exchange_observers_under_lock_if_there(completed{}))
rpp::utils::for_each(*observers, rpp::utils::static_mem_fn<&dynamic_observer<Type>::on_completed>{});
}
dispose();
}

Expand All @@ -99,7 +108,7 @@ namespace rpp::subjects::details
void set_upstream(rpp::dynamic_observer<Type>& obs)
{
obs.set_upstream(rpp::disposable_wrapper{make_callback_disposable(
[weak = this->weak_from_this()]() noexcept // NOLINT(bugprone-exception-escape)
[weak = this->wrapper_from_this().as_weak()]() noexcept // NOLINT(bugprone-exception-escape)
{
if (const auto shared = weak.lock())
{
Expand Down Expand Up @@ -163,7 +172,8 @@ namespace rpp::subjects::details
}

private:
state_t m_state{};
std::mutex m_mutex{};
state_t m_state{};
std::mutex m_mutex{};
RPP_NO_UNIQUE_ADDRESS std::conditional_t<Serialized, std::mutex, rpp::utils::none_mutex> m_serialized_mutex{};
};
} // namespace rpp::subjects::details
Loading
Loading