Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Forward validations and manifests in reporting mode #3905

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions src/ripple/app/misc/NetworkOPs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,10 @@ class NetworkOPsImp final : public NetworkOPs
void
pubValidation(std::shared_ptr<STValidation> const& val) override;

void
forwardValidation(Json::Value const& jvObj) override;
void
forwardManifest(Json::Value const& jvObj) override;
void
forwardProposedTransaction(Json::Value const& jvObj) override;
void
Expand Down Expand Up @@ -2590,6 +2594,46 @@ NetworkOPsImp::forwardProposedTransaction(Json::Value const& jvObj)
forwardProposedAccountTransaction(jvObj);
}

void
NetworkOPsImp::forwardValidation(Json::Value const& jvObj)
{
std::lock_guard sl(mSubLock);

for (auto i = mStreamMaps[sValidations].begin();
i != mStreamMaps[sValidations].end();)
{
if (auto p = i->second.lock())
{
p->send(jvObj, true);
++i;
}
else
{
i = mStreamMaps[sValidations].erase(i);
}
}
}

void
NetworkOPsImp::forwardManifest(Json::Value const& jvObj)
{
std::lock_guard sl(mSubLock);

for (auto i = mStreamMaps[sManifests].begin();
i != mStreamMaps[sManifests].end();)
{
if (auto p = i->second.lock())
{
p->send(jvObj, true);
++i;
}
else
{
i = mStreamMaps[sManifests].erase(i);
}
}
}

static void
getAccounts(Json::Value const& jvObj, std::vector<AccountID>& accounts)
{
Expand Down
4 changes: 4 additions & 0 deletions src/ripple/app/misc/NetworkOPs.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ class NetworkOPs : public InfoSub::Source
virtual void
pubValidation(std::shared_ptr<STValidation> const& val) = 0;

virtual void
forwardValidation(Json::Value const& jvObj) = 0;
virtual void
forwardManifest(Json::Value const& jvObj) = 0;
virtual void
forwardProposedTransaction(Json::Value const& jvObj) = 0;
virtual void
Expand Down
23 changes: 20 additions & 3 deletions src/ripple/app/reporting/ETLSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ ETLSource::onHandshake(boost::beast::error_code ec)
jv["streams"].append(ledgerStream);
Json::Value txnStream("transactions_proposed");
jv["streams"].append(txnStream);
Json::Value validationStream("validations");
jv["streams"].append(validationStream);
Json::Value manifestStream("manifests");
jv["streams"].append(manifestStream);
Json::FastWriter fastWriter;

JLOG(journal_.trace()) << "Sending subscribe stream message";
Expand Down Expand Up @@ -352,15 +356,28 @@ ETLSource::handleMessage()
}
else
{
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the shouldPropagateStream() check once instead of in 3 places. Then, if the check passes, check for each of the stream types and proceed accordingly.

This removes duplicated code and I think makes the intention more clear.

if (response.isMember(jss::transaction))
if (etl_.getETLLoadBalancer().shouldPropagateStream(this))
{
if (etl_.getETLLoadBalancer().shouldPropagateTxnStream(this))
if (response.isMember(jss::transaction))
{
etl_.getApplication().getOPs().forwardProposedTransaction(
response);
}
else if (
response.isMember("type") &&
response["type"] == "validationReceived")
{
etl_.getApplication().getOPs().forwardValidation(response);
}
else if (
response.isMember("type") &&
response["type"] == "manifestReceived")
{
etl_.getApplication().getOPs().forwardManifest(response);
}
}
else

if (response.isMember("type") && response["type"] == "ledgerClosed")
{
JLOG(journal_.debug())
<< __func__ << " : "
Expand Down
8 changes: 4 additions & 4 deletions src/ripple/app/reporting/ETLSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -368,13 +368,13 @@ class ETLLoadBalancer

/// Determine whether messages received on the transactions_proposed stream
/// should be forwarded to subscribing clients. The server subscribes to
/// transactions_proposed on multiple ETLSources, yet only forwards messages
/// from one source at any given time (to avoid sending duplicate messages
/// to clients).
/// transactions_proposed, validations, and manifests on multiple
/// ETLSources, yet only forwards messages from one source at any given time
/// (to avoid sending duplicate messages to clients).
/// @param in ETLSource in question
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update the documentation for this function which currently only describes the transaction stream.

/// @return true if messages should be forwarded
bool
shouldPropagateTxnStream(ETLSource* in) const
shouldPropagateStream(ETLSource* in) const
{
for (auto& src : sources_)
{
Expand Down
4 changes: 2 additions & 2 deletions src/ripple/nodestore/backend/CassandraFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ class CassandraBackend : public Backend
continue;
}

query = {};
query.str("");
query << "SELECT * FROM " << tableName << " LIMIT 1";
statement = makeStatement(query.str().c_str(), 0);
fut = cass_session_execute(session_.get(), statement);
Expand Down Expand Up @@ -433,7 +433,7 @@ class CassandraBackend : public Backend
*/
cass_future_free(prepare_future);

query = {};
query.str("");
query << "SELECT object FROM " << tableName << " WHERE hash = ?";
prepare_future =
cass_session_prepare(session_.get(), query.str().c_str());
Expand Down
4 changes: 0 additions & 4 deletions src/ripple/rpc/handlers/Subscribe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@ doSubscribe(RPC::JsonContext& context)
}
else if (streamName == "manifests")
{
if (context.app.config().reporting())
return rpcError(rpcREPORTING_UNSUPPORTED);
context.netOps.subManifests(ispSub);
}
else if (streamName == "transactions")
Expand All @@ -154,8 +152,6 @@ doSubscribe(RPC::JsonContext& context)
}
else if (streamName == "validations")
{
if (context.app.config().reporting())
return rpcError(rpcREPORTING_UNSUPPORTED);
context.netOps.subValidations(ispSub);
}
else if (streamName == "peer_status")
Expand Down