Skip to content

Commit

Permalink
fix: Fix logging in SubscriptionSource (#1617)
Browse files Browse the repository at this point in the history
For #1616. Later should be ported to develop as well.
  • Loading branch information
kuznetsss authored Aug 29, 2024
1 parent 665fab1 commit 5004dc4
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 9 deletions.
1 change: 1 addition & 0 deletions src/etl/LoadBalancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ LoadBalancer::getETLState() noexcept
void
LoadBalancer::chooseForwardingSource()
{
LOG(log_.info()) << "Choosing a new source to forward subscriptions";
hasForwardingSource_ = false;
for (auto& source : sources_) {
if (not hasForwardingSource_ and source->isConnected()) {
Expand Down
2 changes: 1 addition & 1 deletion src/etl/impl/GrpcSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
namespace etl::impl {

GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort, std::shared_ptr<BackendInterface> backend)
: log_(fmt::format("ETL_Grpc[{}:{}]", ip, grpcPort)), backend_(std::move(backend))
: log_(fmt::format("GrpcSource[{}:{}]", ip, grpcPort)), backend_(std::move(backend))
{
try {
boost::asio::ip::tcp::endpoint const endpoint{boost::asio::ip::make_address(ip), std::stoi(grpcPort)};
Expand Down
22 changes: 14 additions & 8 deletions src/etl/impl/SubscriptionSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ SubscriptionSource::SubscriptionSource(
std::chrono::steady_clock::duration const connectionTimeout,
std::chrono::steady_clock::duration const retryDelay
)
: log_(fmt::format("GrpcSource[{}:{}]", ip, wsPort))
: log_(fmt::format("SubscriptionSource[{}:{}]", ip, wsPort))
, wsConnectionBuilder_(ip, wsPort)
, validatedLedgers_(std::move(validatedLedgers))
, subscriptions_(std::move(subscriptions))
Expand Down Expand Up @@ -133,6 +133,7 @@ void
SubscriptionSource::setForwarding(bool isForwarding)
{
isForwarding_ = isForwarding;
LOG(log_.info()) << "Forwarding set to " << isForwarding_;
}

std::chrono::steady_clock::time_point
Expand Down Expand Up @@ -168,6 +169,7 @@ SubscriptionSource::subscribe()
wsConnection_ = std::move(connection).value();
isConnected_ = true;
onConnect_();
LOG(log_.info()) << "Connected";

auto const& subscribeCommand = getSubscribeCommandJson();
auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield);
Expand Down Expand Up @@ -224,10 +226,11 @@ SubscriptionSource::handleMessage(std::string const& message)
auto validatedLedgers = boost::json::value_to<std::string>(result.at(JS(validated_ledgers)));
setValidatedRange(std::move(validatedLedgers));
}
LOG(log_.info()) << "Received a message on ledger subscription stream. Message : " << object;
LOG(log_.debug()) << "Received a message on ledger subscription stream. Message: " << object;

} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_LedgerClosed) {
LOG(log_.info()) << "Received a message on ledger subscription stream. Message : " << object;
LOG(log_.debug()) << "Received a message of type 'ledgerClosed' on ledger subscription stream. Message: "
<< object;
if (object.contains(JS(ledger_index))) {
ledgerIndex = object.at(JS(ledger_index)).as_int64();
}
Expand All @@ -245,11 +248,16 @@ SubscriptionSource::handleMessage(std::string const& message)
// 2 - Validated transaction
// Only forward proposed transaction, validated transactions are sent by Clio itself
if (object.contains(JS(transaction)) and !object.contains(JS(meta))) {
LOG(log_.debug()) << "Forwarding proposed transaction: " << object;
subscriptions_->forwardProposedTransaction(object);
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ValidationReceived) {
LOG(log_.debug()) << "Forwarding validation: " << object;
subscriptions_->forwardValidation(object);
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ManifestReceived) {
LOG(log_.debug()) << "Forwarding manifest: " << object;
subscriptions_->forwardManifest(object);
} else {
LOG(log_.error()) << "Unknown message: " << object;
}
}
}
Expand All @@ -261,7 +269,7 @@ SubscriptionSource::handleMessage(std::string const& message)

return std::nullopt;
} catch (std::exception const& e) {
LOG(log_.error()) << "Exception in handleMessage : " << e.what();
LOG(log_.error()) << "Exception in handleMessage: " << e.what();
return util::requests::RequestError{fmt::format("Error handling message: {}", e.what())};
}
}
Expand All @@ -273,13 +281,11 @@ SubscriptionSource::handleError(util::requests::RequestError const& error, boost
isForwarding_ = false;
if (not stop_) {
onDisconnect_();
LOG(log_.info()) << "Disconnected";
}

if (wsConnection_ != nullptr) {
auto const err = wsConnection_->close(yield);
if (err) {
LOG(log_.error()) << "Error closing websocket connection: " << err->message();
}
wsConnection_->close(yield);
wsConnection_.reset();
}

Expand Down

0 comments on commit 5004dc4

Please sign in to comment.