Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Subscription source bugs fix (#1626) #1633

Merged
merged 1 commit into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/etl/LoadBalancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,13 @@
validatedLedgers,
forwardingTimeout,
[this]() {
if (not hasForwardingSource_)
if (not hasForwardingSource_.lock().get())
chooseForwardingSource();
},
[this](bool wasForwarding) {

Check warning on line 117 in src/etl/LoadBalancer.cpp

View check run for this annotation

Codecov / codecov/patch

src/etl/LoadBalancer.cpp#L117

Added line #L117 was not covered by tests
if (wasForwarding)
chooseForwardingSource();
},
[this]() { chooseForwardingSource(); },
[this]() {
if (forwardingCache_.has_value())
forwardingCache_->invalidate();
Expand Down Expand Up @@ -323,11 +326,12 @@
LoadBalancer::chooseForwardingSource()
{
LOG(log_.info()) << "Choosing a new source to forward subscriptions";
hasForwardingSource_ = false;
auto hasForwardingSourceLock = hasForwardingSource_.lock();
hasForwardingSourceLock.get() = false;
for (auto& source : sources_) {
if (not hasForwardingSource_ and source->isConnected()) {
if (not hasForwardingSourceLock.get() and source->isConnected()) {
source->setForwarding(true);
hasForwardingSource_ = true;
hasForwardingSourceLock.get() = true;
} else {
source->setForwarding(false);
}
Expand Down
8 changes: 5 additions & 3 deletions src/etl/LoadBalancer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#include "etl/Source.hpp"
#include "etl/impl/ForwardingCache.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
#include "rpc/Errors.hpp"
#include "util/Mutex.hpp"
#include "util/config/Config.hpp"
#include "util/log/Logger.hpp"

Expand All @@ -39,7 +39,6 @@
#include <org/xrpl/rpc/v1/ledger.pb.h>
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>

#include <atomic>
#include <chrono>
#include <cstdint>
#include <expected>
Expand Down Expand Up @@ -76,7 +75,10 @@ class LoadBalancer {
std::optional<ETLState> etlState_;
std::uint32_t downloadRanges_ =
DEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */
std::atomic_bool hasForwardingSource_{false};

// Using mutext instead of atomic_bool because choosing a new source to
// forward messages should be done with a mutual exclusion otherwise there will be a race condition
util::Mutex<bool> hasForwardingSource_{false};

public:
/**
Expand Down
2 changes: 1 addition & 1 deletion src/etl/Source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace etl {
class SourceBase {
public:
using OnConnectHook = std::function<void()>;
using OnDisconnectHook = std::function<void()>;
using OnDisconnectHook = std::function<void(bool)>;
using OnLedgerClosedHook = std::function<void()>;

virtual ~SourceBase() = default;
Expand Down
35 changes: 23 additions & 12 deletions src/etl/impl/SubscriptionSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "rpc/JS.hpp"
#include "util/Retry.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Label.hpp"
#include "util/prometheus/Prometheus.hpp"
#include "util/requests/Types.hpp"

#include <boost/algorithm/string/classification.hpp>
Expand Down Expand Up @@ -66,22 +68,28 @@ SubscriptionSource::SubscriptionSource(
OnConnectHook onConnect,
OnDisconnectHook onDisconnect,
OnLedgerClosedHook onLedgerClosed,
std::chrono::steady_clock::duration const connectionTimeout,
std::chrono::steady_clock::duration const wsTimeout,
std::chrono::steady_clock::duration const retryDelay
)
: log_(fmt::format("SubscriptionSource[{}:{}]", ip, wsPort))
, wsConnectionBuilder_(ip, wsPort)
, validatedLedgers_(std::move(validatedLedgers))
, subscriptions_(std::move(subscriptions))
, strand_(boost::asio::make_strand(ioContext))
, wsTimeout_(wsTimeout)
, retry_(util::makeRetryExponentialBackoff(retryDelay, RETRY_MAX_DELAY, strand_))
, onConnect_(std::move(onConnect))
, onDisconnect_(std::move(onDisconnect))
, onLedgerClosed_(std::move(onLedgerClosed))
, lastMessageTimeSecondsSinceEpoch_(PrometheusService::gaugeInt(
"subscription_source_last_message_time",
util::prometheus::Labels({{"source", fmt::format("{}:{}", ip, wsPort)}}),
"Seconds since epoch of the last message received from rippled subscription streams"
))
{
wsConnectionBuilder_.addHeader({boost::beast::http::field::user_agent, "clio-client"})
.addHeader({"X-User", "clio-client"})
.setConnectionTimeout(connectionTimeout);
.setConnectionTimeout(wsTimeout_);
}

SubscriptionSource::~SubscriptionSource()
Expand Down Expand Up @@ -167,21 +175,22 @@ SubscriptionSource::subscribe()
}

wsConnection_ = std::move(connection).value();
isConnected_ = true;
onConnect_();
LOG(log_.info()) << "Connected";

auto const& subscribeCommand = getSubscribeCommandJson();
auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield);
auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield, wsTimeout_);
if (writeErrorOpt) {
handleError(writeErrorOpt.value(), yield);
return;
}

isConnected_ = true;
LOG(log_.info()) << "Connected";
onConnect_();

retry_.reset();

while (!stop_) {
auto const message = wsConnection_->read(yield);
auto const message = wsConnection_->read(yield, wsTimeout_);
if (not message) {
handleError(message.error(), yield);
return;
Expand Down Expand Up @@ -256,8 +265,6 @@ SubscriptionSource::handleMessage(std::string const& message)
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ManifestReceived) {
LOG(log_.debug()) << "Forwarding manifest: " << object;
subscriptions_->forwardManifest(object);
} else {
LOG(log_.error()) << "Unknown message: " << object;
}
}
}
Expand All @@ -278,10 +285,10 @@ void
SubscriptionSource::handleError(util::requests::RequestError const& error, boost::asio::yield_context yield)
{
isConnected_ = false;
isForwarding_ = false;
bool const wasForwarding = isForwarding_.exchange(false);
if (not stop_) {
onDisconnect_();
LOG(log_.info()) << "Disconnected";
onDisconnect_(wasForwarding);
}

if (wsConnection_ != nullptr) {
Expand Down Expand Up @@ -312,7 +319,11 @@ SubscriptionSource::logError(util::requests::RequestError const& error) const
void
SubscriptionSource::setLastMessageTime()
{
lastMessageTime_.lock().get() = std::chrono::steady_clock::now();
lastMessageTimeSecondsSinceEpoch_.get().set(
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count()
);
auto lock = lastMessageTime_.lock();
lock.get() = std::chrono::steady_clock::now();
}

void
Expand Down
12 changes: 9 additions & 3 deletions src/etl/impl/SubscriptionSource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "util/Mutex.hpp"
#include "util/Retry.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Gauge.hpp"
#include "util/requests/Types.hpp"
#include "util/requests/WsConnection.hpp"

Expand All @@ -37,6 +38,7 @@
#include <atomic>
#include <chrono>
#include <cstdint>
#include <functional>
#include <future>
#include <memory>
#include <optional>
Expand Down Expand Up @@ -71,6 +73,8 @@ class SubscriptionSource {

boost::asio::strand<boost::asio::io_context::executor_type> strand_;

std::chrono::steady_clock::duration wsTimeout_;

util::Retry retry_;

OnConnectHook onConnect_;
Expand All @@ -83,9 +87,11 @@ class SubscriptionSource {

util::Mutex<std::chrono::steady_clock::time_point> lastMessageTime_;

std::reference_wrapper<util::prometheus::GaugeInt> lastMessageTimeSecondsSinceEpoch_;

std::future<void> runFuture_;

static constexpr std::chrono::seconds CONNECTION_TIMEOUT{30};
static constexpr std::chrono::seconds WS_TIMEOUT{30};
static constexpr std::chrono::seconds RETRY_MAX_DELAY{30};
static constexpr std::chrono::seconds RETRY_DELAY{1};

Expand All @@ -103,7 +109,7 @@ class SubscriptionSource {
* @param onDisconnect The onDisconnect hook. Called when the connection is lost
* @param onLedgerClosed The onLedgerClosed hook. Called when the ledger is closed but only if the source is
* forwarding
* @param connectionTimeout The connection timeout. Defaults to 30 seconds
* @param wsTimeout A timeout for websocket operations. Defaults to 30 seconds
* @param retryDelay The retry delay. Defaults to 1 second
*/
SubscriptionSource(
Expand All @@ -115,7 +121,7 @@ class SubscriptionSource {
OnConnectHook onConnect,
OnDisconnectHook onDisconnect,
OnLedgerClosedHook onLedgerClosed,
std::chrono::steady_clock::duration const connectionTimeout = CONNECTION_TIMEOUT,
std::chrono::steady_clock::duration const wsTimeout = WS_TIMEOUT,
std::chrono::steady_clock::duration const retryDelay = RETRY_DELAY
);

Expand Down
11 changes: 9 additions & 2 deletions src/util/requests/impl/WsConnectionImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
#include <boost/beast/websocket/stream_base.hpp>
#include <boost/system/errc.hpp>

#include <atomic>
#include <chrono>
#include <expected>
#include <memory>
#include <optional>
#include <string>
#include <utility>
Expand Down Expand Up @@ -123,15 +125,20 @@ class WsConnectionImpl : public WsConnection {
static void
withTimeout(Operation&& operation, boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout)
{
auto isCompleted = std::make_shared<bool>(false);
boost::asio::cancellation_signal cancellationSignal;
auto cyield = boost::asio::bind_cancellation_slot(cancellationSignal.slot(), yield);

boost::asio::steady_timer timer{boost::asio::get_associated_executor(cyield), timeout};
timer.async_wait([&cancellationSignal](boost::system::error_code errorCode) {
if (!errorCode)

// The timer below can be called with no error code even if the operation is completed before the timeout, so we
// need an additional flag here
timer.async_wait([&cancellationSignal, isCompleted](boost::system::error_code errorCode) {
if (!errorCode and not *isCompleted)
cancellationSignal.emit(boost::asio::cancellation_type::terminal);
});
operation(cyield);
*isCompleted = true;
}

static boost::system::error_code
Expand Down
34 changes: 11 additions & 23 deletions tests/unit/etl/LoadBalancerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,15 +280,12 @@ TEST_F(LoadBalancerOnDisconnectHookTests, source0Disconnects)
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(true));
sourceFactory_.callbacksAt(0).onDisconnect();
sourceFactory_.callbacksAt(0).onDisconnect(true);
}

TEST_F(LoadBalancerOnDisconnectHookTests, source1Disconnects)
{
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false));
sourceFactory_.callbacksAt(1).onDisconnect();
sourceFactory_.callbacksAt(1).onDisconnect(false);
}

TEST_F(LoadBalancerOnDisconnectHookTests, source0DisconnectsAndConnectsBack)
Expand All @@ -297,29 +294,25 @@ TEST_F(LoadBalancerOnDisconnectHookTests, source0DisconnectsAndConnectsBack)
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(true));
sourceFactory_.callbacksAt(0).onDisconnect();
sourceFactory_.callbacksAt(0).onDisconnect(true);

sourceFactory_.callbacksAt(0).onConnect();
}

TEST_F(LoadBalancerOnDisconnectHookTests, source1DisconnectsAndConnectsBack)
{
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false));
sourceFactory_.callbacksAt(1).onDisconnect();

sourceFactory_.callbacksAt(1).onDisconnect(false);
sourceFactory_.callbacksAt(1).onConnect();
}

TEST_F(LoadBalancerOnConnectHookTests, bothSourcesDisconnectAndConnectBack)
{
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).Times(2).WillRepeatedly(Return(false));
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false)).Times(2);
EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).Times(2).WillRepeatedly(Return(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false)).Times(2);
sourceFactory_.callbacksAt(0).onDisconnect();
sourceFactory_.callbacksAt(1).onDisconnect();
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(false));
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false));
sourceFactory_.callbacksAt(0).onDisconnect(true);
sourceFactory_.callbacksAt(1).onDisconnect(false);

EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(true));
Expand Down Expand Up @@ -362,12 +355,7 @@ TEST_F(LoadBalancer3SourcesTests, forwardingUpdate)
sourceFactory_.callbacksAt(1).onConnect();

// Source 0 got disconnected
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(false));
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(true));
EXPECT_CALL(sourceFactory_.sourceAt(2), setForwarding(false)); // only source 1 must be forwarding
sourceFactory_.callbacksAt(0).onDisconnect();
sourceFactory_.callbacksAt(0).onDisconnect(false);
}

struct LoadBalancerLoadInitialLedgerTests : LoadBalancerOnConnectHookTests {
Expand Down
Loading
Loading