Skip to content

Commit

Permalink
Add message type frequency to peer metrics, change ports
Browse files Browse the repository at this point in the history
  • Loading branch information
gregtatcam committed Jan 4, 2024
1 parent 4822170 commit d6a0031
Show file tree
Hide file tree
Showing 8 changed files with 191 additions and 74 deletions.
1 change: 1 addition & 0 deletions Builds/levelization/results/ordering.txt
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ test.nodestore > test.unit_test
test.overlay > ripple.app
test.overlay > ripple.basics
test.overlay > ripple.beast
test.overlay > ripple.core
test.overlay > ripple.overlay
test.overlay > ripple.peerfinder
test.overlay > ripple.protocol
Expand Down
23 changes: 22 additions & 1 deletion src/ripple/overlay/impl/PeerImp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,12 @@ PeerImp::json()
std::to_string(metrics_.recv.average_bytes());
ret[jss::metrics][jss::avg_bps_sent] =
std::to_string(metrics_.sent.average_bytes());
for (auto const& [t, n] : metrics_.sent.mtype())
ret[jss::metrics][jss::message_type_sent][std::to_string(t)] =
to_string(n);
for (auto const& [t, n] : metrics_.recv.mtype())
ret[jss::metrics][jss::message_type_recv][std::to_string(t)] =
to_string(n);

return ret;
}
Expand Down Expand Up @@ -905,7 +911,8 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
while (read_buffer_.size() > 0)
{
std::size_t bytes_consumed;
std::tie(bytes_consumed, ec) =
std::uint16_t message_type;
std::tie(bytes_consumed, message_type, ec) =
invokeProtocolMessage(read_buffer_.data(), *this, hint);
if (ec)
return fail("onReadMessage", ec);
Expand All @@ -916,6 +923,7 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
if (bytes_consumed == 0)
break;
read_buffer_.consume(bytes_consumed);
metrics_.recv.add_message_type(message_type);
}

// Timeout on writes only
Expand Down Expand Up @@ -954,6 +962,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
metrics_.sent.add_message(bytes_transferred);

assert(!send_queue_.empty());
metrics_.sent.add_message_type(send_queue_.front()->getType());
if (send_queue_.front()->getType() == protocol::mtGRACEFUL_CLOSE)
{
close();
Expand Down Expand Up @@ -3754,6 +3763,12 @@ PeerImp::Metrics::add_message(std::uint64_t bytes)
}
}

void
PeerImp::Metrics::add_message_type(std::uint16_t type)
{
mtype_[type]++;
}

std::uint64_t
PeerImp::Metrics::average_bytes() const
{
Expand All @@ -3768,4 +3783,10 @@ PeerImp::Metrics::total_bytes() const
return totalBytes_;
}

std::unordered_map<std::uint16_t, std::uint64_t> const&
PeerImp::Metrics::mtype() const
{
return mtype_;
}

} // namespace ripple
5 changes: 5 additions & 0 deletions src/ripple/overlay/impl/PeerImp.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,14 @@ class PeerImp : public Peer,

void
add_message(std::uint64_t bytes);
void
add_message_type(std::uint16_t mtype);
std::uint64_t
average_bytes() const;
std::uint64_t
total_bytes() const;
std::unordered_map<std::uint16_t, std::uint64_t> const&
mtype() const;

private:
boost::shared_mutex mutable mutex_;
Expand All @@ -210,6 +214,7 @@ class PeerImp : public Peer,
std::uint64_t totalBytes_{0};
std::uint64_t accumBytes_{0};
std::uint64_t rollingAvgBytes_{0};
std::unordered_map<std::uint16_t, std::uint64_t> mtype_;
};

struct
Expand Down
20 changes: 13 additions & 7 deletions src/ripple/overlay/impl/ProtocolMessage.h
Original file line number Diff line number Diff line change
Expand Up @@ -342,20 +342,22 @@ invoke(MessageHeader const& header, Buffers const& buffers, Handler& handler)
@return The number of bytes consumed, or the error code if any.
*/
template <class Buffers, class Handler>
std::pair<std::size_t, boost::system::error_code>
std::tuple<std::size_t, std::uint16_t, boost::system::error_code>
invokeProtocolMessage(
Buffers const& buffers,
Handler& handler,
std::size_t& hint)
{
std::pair<std::size_t, boost::system::error_code> result = {0, {}};
std::tuple<std::size_t, std::uint16_t, boost::system::error_code> result = {
0, 0, {}};

auto const size = boost::asio::buffer_size(buffers);

if (size == 0)
return result;

auto header = detail::parseMessageHeader(result.second, buffers, size);
auto header =
detail::parseMessageHeader(std::get<2>(result), buffers, size);

// If we can't parse the header then it may be that we don't have enough
// bytes yet, or because the message was cut off (if error_code is success).
Expand All @@ -372,15 +374,17 @@ invokeProtocolMessage(
if (header->payload_wire_size > maximiumMessageSize ||
header->uncompressed_size > maximiumMessageSize)
{
result.second = make_error_code(boost::system::errc::message_size);
std::get<2>(result) =
make_error_code(boost::system::errc::message_size);
return result;
}

// We requested uncompressed messages from the peer but received compressed.
if (!handler.compressionEnabled() &&
header->algorithm != compression::Algorithm::None)
{
result.second = make_error_code(boost::system::errc::protocol_error);
std::get<2>(result) =
make_error_code(boost::system::errc::protocol_error);
return result;
}

Expand Down Expand Up @@ -510,10 +514,12 @@ invokeProtocolMessage(
break;
}

result.first = header->total_wire_size;
std::get<0>(result) = header->total_wire_size;
if (protocolMessageName(header->message_type) != "unknown")
std::get<1>(result) = header->message_type;

if (!success)
result.second = make_error_code(boost::system::errc::bad_message);
std::get<2>(result) = make_error_code(boost::system::errc::bad_message);

return result;
}
Expand Down
4 changes: 3 additions & 1 deletion src/ripple/overlay/impl/ProtocolVersion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ negotiateProtocolVersion(
std::string const&
supportedProtocolVersions()
{
return toProtocolVersionStr(supportedProtocolList);
static std::string const supported =
toProtocolVersionStr(supportedProtocolList);
return supported;
}

bool
Expand Down
24 changes: 10 additions & 14 deletions src/ripple/overlay/impl/ProtocolVersion.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,22 +79,18 @@ std::string const&
supportedProtocolVersions();

template <typename T>
std::string const&
std::string
toProtocolVersionStr(T const& list)
{
static std::string const supported = [&]() {
std::string ret;
for (auto const& v : list)
{
if (!ret.empty())
ret += ", ";
ret += to_string(v);
}

return ret;
}();

return supported;
std::string ret = "";
for (auto const& v : list)
{
if (!ret.empty())
ret += ", ";
ret += to_string(v);
}

return ret;
}

/** Determine whether we support a specific protocol version. */
Expand Down
2 changes: 2 additions & 0 deletions src/ripple/protocol/jss.h
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,8 @@ JSS(max_spend_drops_total); // out: AccountInfo
JSS(median_fee); // out: TxQ
JSS(median_level); // out: TxQ
JSS(message); // error.
JSS(message_type_sent); // out: Peers
JSS(message_type_recv); // out: Peers
JSS(meta); // out: NetworkOPs, AccountTx*, Tx
JSS(meta_blob); // out: NetworkOPs, AccountTx*, Tx
JSS(metaData);
Expand Down
Loading

0 comments on commit d6a0031

Please sign in to comment.