From 2e2740d4c575ef18a89660995fbfa703a5d6f314 Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Thu, 29 Aug 2024 16:48:04 +0100 Subject: [PATCH] feat: Published subscription message counters (#1618) This PR adds counters to track the amount of published messages for each subscription stream. --- src/feed/impl/ProposedTransactionFeed.cpp | 4 ++++ src/feed/impl/ProposedTransactionFeed.hpp | 4 +++- src/feed/impl/SingleFeedBase.cpp | 6 +++++- src/feed/impl/SingleFeedBase.hpp | 2 ++ src/feed/impl/TransactionFeed.cpp | 6 ++++++ src/feed/impl/TransactionFeed.hpp | 3 +++ src/feed/impl/Util.hpp | 11 +++++++++++ 7 files changed, 34 insertions(+), 2 deletions(-) diff --git a/src/feed/impl/ProposedTransactionFeed.cpp b/src/feed/impl/ProposedTransactionFeed.cpp index 09696c310..09160d464 100644 --- a/src/feed/impl/ProposedTransactionFeed.cpp +++ b/src/feed/impl/ProposedTransactionFeed.cpp @@ -104,13 +104,17 @@ ProposedTransactionFeed::pub(boost::json::object const& receivedTxJson) boost::asio::post(strand_, [this, pubMsg = std::move(pubMsg), affectedAccounts = std::move(affectedAccounts)]() { notified_.clear(); signal_.emit(pubMsg); + // Prevent the same connection from receiving the same message twice if it is subscribed to multiple accounts // However, if the same connection subscribe both stream and account, it will still receive the message twice. // notified_ can be cleared before signal_ emit to improve this, but let's keep it as is for now, since rippled // acts like this. notified_.clear(); + for (auto const& account : affectedAccounts) accountSignal_.emit(account, pubMsg); + + ++pubCount_.get(); }); } diff --git a/src/feed/impl/ProposedTransactionFeed.hpp b/src/feed/impl/ProposedTransactionFeed.hpp index 58734d936..0ea3e11f0 100644 --- a/src/feed/impl/ProposedTransactionFeed.hpp +++ b/src/feed/impl/ProposedTransactionFeed.hpp @@ -24,6 +24,7 @@ #include "feed/impl/TrackableSignalMap.hpp" #include "feed/impl/Util.hpp" #include "util/log/Logger.hpp" +#include "util/prometheus/Counter.hpp" #include "util/prometheus/Gauge.hpp" #include @@ -54,6 +55,7 @@ class ProposedTransactionFeed { boost::asio::strand strand_; std::reference_wrapper subAllCount_; std::reference_wrapper subAccountCount_; + std::reference_wrapper pubCount_; TrackableSignalMap> accountSignal_; TrackableSignal> signal_; @@ -67,7 +69,7 @@ class ProposedTransactionFeed { : strand_(boost::asio::make_strand(ioContext)) , subAllCount_(getSubscriptionsGaugeInt("tx_proposed")) , subAccountCount_(getSubscriptionsGaugeInt("account_proposed")) - + , pubCount_(getPublishedMessagesCounterInt("tx_proposed")) { } diff --git a/src/feed/impl/SingleFeedBase.cpp b/src/feed/impl/SingleFeedBase.cpp index 60a559b7c..acb4b8d4d 100644 --- a/src/feed/impl/SingleFeedBase.cpp +++ b/src/feed/impl/SingleFeedBase.cpp @@ -36,7 +36,10 @@ namespace feed::impl { SingleFeedBase::SingleFeedBase(boost::asio::io_context& ioContext, std::string const& name) - : strand_(boost::asio::make_strand(ioContext)), subCount_(getSubscriptionsGaugeInt(name)), name_(name) + : strand_(boost::asio::make_strand(ioContext)) + , subCount_(getSubscriptionsGaugeInt(name)) + , pubCount_(getPublishedMessagesCounterInt(name)) + , name_(name) { } @@ -70,6 +73,7 @@ SingleFeedBase::pub(std::string msg) const boost::asio::post(strand_, [this, msg = std::move(msg)]() mutable { auto const msgPtr = std::make_shared(std::move(msg)); signal_.emit(msgPtr); + ++pubCount_.get(); }); } diff --git a/src/feed/impl/SingleFeedBase.hpp b/src/feed/impl/SingleFeedBase.hpp index dcd716c2d..d6dde810c 100644 --- a/src/feed/impl/SingleFeedBase.hpp +++ b/src/feed/impl/SingleFeedBase.hpp @@ -22,6 +22,7 @@ #include "feed/Types.hpp" #include "feed/impl/TrackableSignal.hpp" #include "util/log/Logger.hpp" +#include "util/prometheus/Counter.hpp" #include "util/prometheus/Gauge.hpp" #include @@ -40,6 +41,7 @@ namespace feed::impl { class SingleFeedBase { boost::asio::strand strand_; std::reference_wrapper subCount_; + std::reference_wrapper pubCount_; TrackableSignal const&> signal_; util::Logger logger_{"Subscriptions"}; std::string name_; diff --git a/src/feed/impl/TransactionFeed.cpp b/src/feed/impl/TransactionFeed.cpp index 3194de2a0..2ea244ba9 100644 --- a/src/feed/impl/TransactionFeed.cpp +++ b/src/feed/impl/TransactionFeed.cpp @@ -284,23 +284,29 @@ TransactionFeed::pub( affectedBooks = std::move(affectedBooks)]() { notified_.clear(); signal_.emit(allVersionsMsgs); + // clear the notified set. If the same connection subscribes both transactions + proposed_transactions, // rippled SENDS the same message twice notified_.clear(); txProposedsignal_.emit(allVersionsMsgs); notified_.clear(); + // check duplicate for account and proposed_account, this prevents sending the same message multiple times // if it affects multiple accounts watched by the same connection for (auto const& account : affectedAccounts) { accountSignal_.emit(account, allVersionsMsgs); accountProposedSignal_.emit(account, allVersionsMsgs); } + notified_.clear(); + // check duplicate for books, this prevents sending the same message multiple times if it affects multiple // books watched by the same connection for (auto const& book : affectedBooks) { bookSignal_.emit(book, allVersionsMsgs); } + + ++pubCount_.get(); } ); } diff --git a/src/feed/impl/TransactionFeed.hpp b/src/feed/impl/TransactionFeed.hpp index 4f614ddb8..0d63ccbce 100644 --- a/src/feed/impl/TransactionFeed.hpp +++ b/src/feed/impl/TransactionFeed.hpp @@ -26,6 +26,7 @@ #include "feed/impl/TrackableSignalMap.hpp" #include "feed/impl/Util.hpp" #include "util/log/Logger.hpp" +#include "util/prometheus/Counter.hpp" #include "util/prometheus/Gauge.hpp" #include @@ -67,6 +68,7 @@ class TransactionFeed { std::reference_wrapper subAllCount_; std::reference_wrapper subAccountCount_; std::reference_wrapper subBookCount_; + std::reference_wrapper pubCount_; TrackableSignalMap accountSignal_; TrackableSignalMap bookSignal_; @@ -89,6 +91,7 @@ class TransactionFeed { , subAllCount_(getSubscriptionsGaugeInt("tx")) , subAccountCount_(getSubscriptionsGaugeInt("account")) , subBookCount_(getSubscriptionsGaugeInt("book")) + , pubCount_(getPublishedMessagesCounterInt("tx")) { } diff --git a/src/feed/impl/Util.hpp b/src/feed/impl/Util.hpp index 14da43560..5a50d383f 100644 --- a/src/feed/impl/Util.hpp +++ b/src/feed/impl/Util.hpp @@ -19,6 +19,7 @@ #pragma once +#include "util/prometheus/Counter.hpp" #include "util/prometheus/Gauge.hpp" #include "util/prometheus/Label.hpp" #include "util/prometheus/Prometheus.hpp" @@ -38,4 +39,14 @@ getSubscriptionsGaugeInt(std::string const& counterName) fmt::format("Current subscribers number on the {} stream", counterName) ); } + +inline util::prometheus::CounterInt& +getPublishedMessagesCounterInt(std::string const& counterName) +{ + return PrometheusService::counterInt( + "subscriptions_published_count", + util::prometheus::Labels({util::prometheus::Label{"stream", counterName}}), + fmt::format("Total published messages on the {} stream", counterName) + ); +} } // namespace feed::impl