Skip to content

Commit

Permalink
fix: Subscription source bugs fix (#1626)
Browse files Browse the repository at this point in the history
For #1620.

- Add timeouts for websocket operations for connections to rippled.
Without these timeouts if connection hangs for some reason, clio
wouldn't know the connection is hanging.
- Fix potential data race in choosing new subscription source which will
forward messages to users.
- Optimise switching between subscription sources.
  • Loading branch information
kuznetsss authored Sep 5, 2024
1 parent 2e2740d commit 9fe9e7c
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 84 deletions.
14 changes: 9 additions & 5 deletions src/etl/LoadBalancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,13 @@ LoadBalancer::LoadBalancer(
validatedLedgers,
forwardingTimeout,
[this]() {
if (not hasForwardingSource_)
if (not hasForwardingSource_.lock().get())
chooseForwardingSource();
},
[this](bool wasForwarding) {
if (wasForwarding)
chooseForwardingSource();
},
[this]() { chooseForwardingSource(); },
[this]() {
if (forwardingCache_.has_value())
forwardingCache_->invalidate();
Expand Down Expand Up @@ -315,11 +318,12 @@ void
LoadBalancer::chooseForwardingSource()
{
LOG(log_.info()) << "Choosing a new source to forward subscriptions";
hasForwardingSource_ = false;
auto hasForwardingSourceLock = hasForwardingSource_.lock();
hasForwardingSourceLock.get() = false;
for (auto& source : sources_) {
if (not hasForwardingSource_ and source->isConnected()) {
if (not hasForwardingSourceLock.get() and source->isConnected()) {
source->setForwarding(true);
hasForwardingSource_ = true;
hasForwardingSourceLock.get() = true;
} else {
source->setForwarding(false);
}
Expand Down
7 changes: 5 additions & 2 deletions src/etl/LoadBalancer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "etl/Source.hpp"
#include "etl/impl/ForwardingCache.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
#include "util/Mutex.hpp"
#include "util/config/Config.hpp"
#include "util/log/Logger.hpp"

Expand All @@ -38,7 +39,6 @@
#include <org/xrpl/rpc/v1/ledger.pb.h>
#include <ripple/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>

#include <atomic>
#include <chrono>
#include <cstdint>
#include <memory>
Expand Down Expand Up @@ -74,7 +74,10 @@ class LoadBalancer {
std::optional<ETLState> etlState_;
std::uint32_t downloadRanges_ =
DEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */
std::atomic_bool hasForwardingSource_{false};

// Using mutext instead of atomic_bool because choosing a new source to
// forward messages should be done with a mutual exclusion otherwise there will be a race condition
util::Mutex<bool> hasForwardingSource_{false};

public:
/**
Expand Down
2 changes: 1 addition & 1 deletion src/etl/Source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ namespace etl {
class SourceBase {
public:
using OnConnectHook = std::function<void()>;
using OnDisconnectHook = std::function<void()>;
using OnDisconnectHook = std::function<void(bool)>;
using OnLedgerClosedHook = std::function<void()>;

virtual ~SourceBase() = default;
Expand Down
35 changes: 23 additions & 12 deletions src/etl/impl/SubscriptionSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include "rpc/JS.hpp"
#include "util/Retry.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Label.hpp"
#include "util/prometheus/Prometheus.hpp"
#include "util/requests/Types.hpp"

#include <boost/algorithm/string/classification.hpp>
Expand Down Expand Up @@ -66,22 +68,28 @@ SubscriptionSource::SubscriptionSource(
OnConnectHook onConnect,
OnDisconnectHook onDisconnect,
OnLedgerClosedHook onLedgerClosed,
std::chrono::steady_clock::duration const connectionTimeout,
std::chrono::steady_clock::duration const wsTimeout,
std::chrono::steady_clock::duration const retryDelay
)
: log_(fmt::format("SubscriptionSource[{}:{}]", ip, wsPort))
, wsConnectionBuilder_(ip, wsPort)
, validatedLedgers_(std::move(validatedLedgers))
, subscriptions_(std::move(subscriptions))
, strand_(boost::asio::make_strand(ioContext))
, wsTimeout_(wsTimeout)
, retry_(util::makeRetryExponentialBackoff(retryDelay, RETRY_MAX_DELAY, strand_))
, onConnect_(std::move(onConnect))
, onDisconnect_(std::move(onDisconnect))
, onLedgerClosed_(std::move(onLedgerClosed))
, lastMessageTimeSecondsSinceEpoch_(PrometheusService::gaugeInt(
"subscription_source_last_message_time",
util::prometheus::Labels({{"source", fmt::format("{}:{}", ip, wsPort)}}),
"Seconds since epoch of the last message received from rippled subscription streams"
))
{
wsConnectionBuilder_.addHeader({boost::beast::http::field::user_agent, "clio-client"})
.addHeader({"X-User", "clio-client"})
.setConnectionTimeout(connectionTimeout);
.setConnectionTimeout(wsTimeout_);
}

SubscriptionSource::~SubscriptionSource()
Expand Down Expand Up @@ -167,21 +175,22 @@ SubscriptionSource::subscribe()
}

wsConnection_ = std::move(connection).value();
isConnected_ = true;
onConnect_();
LOG(log_.info()) << "Connected";

auto const& subscribeCommand = getSubscribeCommandJson();
auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield);
auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield, wsTimeout_);
if (writeErrorOpt) {
handleError(writeErrorOpt.value(), yield);
return;
}

isConnected_ = true;
LOG(log_.info()) << "Connected";
onConnect_();

retry_.reset();

while (!stop_) {
auto const message = wsConnection_->read(yield);
auto const message = wsConnection_->read(yield, wsTimeout_);
if (not message) {
handleError(message.error(), yield);
return;
Expand Down Expand Up @@ -256,8 +265,6 @@ SubscriptionSource::handleMessage(std::string const& message)
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ManifestReceived) {
LOG(log_.debug()) << "Forwarding manifest: " << object;
subscriptions_->forwardManifest(object);
} else {
LOG(log_.error()) << "Unknown message: " << object;
}
}
}
Expand All @@ -278,10 +285,10 @@ void
SubscriptionSource::handleError(util::requests::RequestError const& error, boost::asio::yield_context yield)
{
isConnected_ = false;
isForwarding_ = false;
bool const wasForwarding = isForwarding_.exchange(false);
if (not stop_) {
onDisconnect_();
LOG(log_.info()) << "Disconnected";
onDisconnect_(wasForwarding);
}

if (wsConnection_ != nullptr) {
Expand Down Expand Up @@ -312,7 +319,11 @@ SubscriptionSource::logError(util::requests::RequestError const& error) const
void
SubscriptionSource::setLastMessageTime()
{
lastMessageTime_.lock().get() = std::chrono::steady_clock::now();
lastMessageTimeSecondsSinceEpoch_.get().set(
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count()
);
auto lock = lastMessageTime_.lock();
lock.get() = std::chrono::steady_clock::now();
}

void
Expand Down
14 changes: 10 additions & 4 deletions src/etl/impl/SubscriptionSource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@

#pragma once

#include "etl/ETLHelpers.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/Source.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
#include "util/Mutex.hpp"
#include "util/Retry.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Gauge.hpp"
#include "util/requests/Types.hpp"
#include "util/requests/WsConnection.hpp"

Expand All @@ -37,6 +38,7 @@
#include <atomic>
#include <chrono>
#include <cstdint>
#include <functional>
#include <future>
#include <memory>
#include <optional>
Expand Down Expand Up @@ -71,6 +73,8 @@ class SubscriptionSource {

boost::asio::strand<boost::asio::io_context::executor_type> strand_;

std::chrono::steady_clock::duration wsTimeout_;

util::Retry retry_;

OnConnectHook onConnect_;
Expand All @@ -83,9 +87,11 @@ class SubscriptionSource {

util::Mutex<std::chrono::steady_clock::time_point> lastMessageTime_;

std::reference_wrapper<util::prometheus::GaugeInt> lastMessageTimeSecondsSinceEpoch_;

std::future<void> runFuture_;

static constexpr std::chrono::seconds CONNECTION_TIMEOUT{30};
static constexpr std::chrono::seconds WS_TIMEOUT{30};
static constexpr std::chrono::seconds RETRY_MAX_DELAY{30};
static constexpr std::chrono::seconds RETRY_DELAY{1};

Expand All @@ -103,7 +109,7 @@ class SubscriptionSource {
* @param onNewLedger The onNewLedger hook. Called when a new ledger is received
* @param onLedgerClosed The onLedgerClosed hook. Called when the ledger is closed but only if the source is
* forwarding
* @param connectionTimeout The connection timeout. Defaults to 30 seconds
* @param wsTimeout A timeout for websocket operations. Defaults to 30 seconds
* @param retryDelay The retry delay. Defaults to 1 second
*/
SubscriptionSource(
Expand All @@ -115,7 +121,7 @@ class SubscriptionSource {
OnConnectHook onConnect,
OnDisconnectHook onDisconnect,
OnLedgerClosedHook onLedgerClosed,
std::chrono::steady_clock::duration const connectionTimeout = CONNECTION_TIMEOUT,
std::chrono::steady_clock::duration const wsTimeout = WS_TIMEOUT,
std::chrono::steady_clock::duration const retryDelay = RETRY_DELAY
);

Expand Down
11 changes: 9 additions & 2 deletions src/util/requests/impl/WsConnectionImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
#include <boost/beast/websocket/stream_base.hpp>
#include <boost/system/errc.hpp>

#include <atomic>
#include <chrono>
#include <expected>
#include <memory>
#include <optional>
#include <string>
#include <utility>
Expand Down Expand Up @@ -123,15 +125,20 @@ class WsConnectionImpl : public WsConnection {
static void
withTimeout(Operation&& operation, boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout)
{
auto isCompleted = std::make_shared<bool>(false);
boost::asio::cancellation_signal cancellationSignal;
auto cyield = boost::asio::bind_cancellation_slot(cancellationSignal.slot(), yield);

boost::asio::steady_timer timer{boost::asio::get_associated_executor(cyield), timeout};
timer.async_wait([&cancellationSignal](boost::system::error_code errorCode) {
if (!errorCode)

// The timer below can be called with no error code even if the operation is completed before the timeout, so we
// need an additional flag here
timer.async_wait([&cancellationSignal, isCompleted](boost::system::error_code errorCode) {
if (!errorCode and not *isCompleted)
cancellationSignal.emit(boost::asio::cancellation_type::terminal);
});
operation(cyield);
*isCompleted = true;
}

static boost::system::error_code
Expand Down
34 changes: 11 additions & 23 deletions tests/unit/etl/LoadBalancerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,12 @@ TEST_F(LoadBalancerOnDisconnectHookTests, source0Disconnects)
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(true));
sourceFactory_.callbacksAt(0).onDisconnect();
sourceFactory_.callbacksAt(0).onDisconnect(true);
}

TEST_F(LoadBalancerOnDisconnectHookTests, source1Disconnects)
{
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false));
sourceFactory_.callbacksAt(1).onDisconnect();
sourceFactory_.callbacksAt(1).onDisconnect(false);
}

TEST_F(LoadBalancerOnDisconnectHookTests, source0DisconnectsAndConnectsBack)
Expand All @@ -288,29 +285,25 @@ TEST_F(LoadBalancerOnDisconnectHookTests, source0DisconnectsAndConnectsBack)
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(true));
sourceFactory_.callbacksAt(0).onDisconnect();
sourceFactory_.callbacksAt(0).onDisconnect(true);

sourceFactory_.callbacksAt(0).onConnect();
}

TEST_F(LoadBalancerOnDisconnectHookTests, source1DisconnectsAndConnectsBack)
{
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false));
sourceFactory_.callbacksAt(1).onDisconnect();

sourceFactory_.callbacksAt(1).onDisconnect(false);
sourceFactory_.callbacksAt(1).onConnect();
}

TEST_F(LoadBalancerOnConnectHookTests, bothSourcesDisconnectAndConnectBack)
{
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).Times(2).WillRepeatedly(Return(false));
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false)).Times(2);
EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).Times(2).WillRepeatedly(Return(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false)).Times(2);
sourceFactory_.callbacksAt(0).onDisconnect();
sourceFactory_.callbacksAt(1).onDisconnect();
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(false));
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false));
sourceFactory_.callbacksAt(0).onDisconnect(true);
sourceFactory_.callbacksAt(1).onDisconnect(false);

EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(true));
Expand Down Expand Up @@ -353,12 +346,7 @@ TEST_F(LoadBalancer3SourcesTests, forwardingUpdate)
sourceFactory_.callbacksAt(1).onConnect();

// Source 0 got disconnected
EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(false));
EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false));
EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(true));
EXPECT_CALL(sourceFactory_.sourceAt(2), setForwarding(false)); // only source 1 must be forwarding
sourceFactory_.callbacksAt(0).onDisconnect();
sourceFactory_.callbacksAt(0).onDisconnect(false);
}

struct LoadBalancerLoadInitialLedgerTests : LoadBalancerOnConnectHookTests {
Expand Down
Loading

0 comments on commit 9fe9e7c

Please sign in to comment.