Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p: Add algod_network_p2p_* traffic metrics #6105

Merged
merged 16 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/algod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,8 @@ var startupConfigCheckFields = []string{
"TxPoolExponentialIncreaseFactor",
"TxPoolSize",
"VerifiedTranscationsCacheSize",
"EnableP2P",
"EnableP2PHybridMode",
}

func resolveDataDir() string {
Expand Down
206 changes: 206 additions & 0 deletions network/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright (C) 2019-2024 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package network

import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
p2proto "github.com/libp2p/go-libp2p/core/protocol"

"github.com/algorand/go-algorand/network/p2p"
"github.com/algorand/go-algorand/protocol"
"github.com/algorand/go-algorand/util/metrics"
)

func init() {
// all tags are tracked by ws net
tagStringList := make([]string, 0, len(protocol.TagList))
for _, t := range protocol.TagList {
tagStringList = append(tagStringList, string(t))
}
networkSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringList, "UNK")
networkReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringList, "UNK")
networkMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringList, "UNK")
networkMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringList, "UNK")
networkHandleCountByTag = metrics.NewTagCounterFiltered("algod_network_rx_handle_countbytag_{TAG}", "count of handler calls in the receive thread for {TAG} messages", tagStringList, "UNK")
networkHandleMicrosByTag = metrics.NewTagCounterFiltered("algod_network_rx_handle_microsbytag_{TAG}", "microseconds spent by protocol handlers in the receive thread for {TAG} messages", tagStringList, "UNK")

networkP2PSentBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_sent_bytes_{TAG}", "Number of bytes that were sent over the network for {TAG} messages", tagStringList, "UNK")
networkP2PReceivedBytesByTag = metrics.NewTagCounterFiltered("algod_network_p2p_received_bytes_{TAG}", "Number of bytes that were received from the network for {TAG} messages", tagStringList, "UNK")
networkP2PMessageReceivedByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_received_{TAG}", "Number of complete messages that were received from the network for {TAG} messages", tagStringList, "UNK")
networkP2PMessageSentByTag = metrics.NewTagCounterFiltered("algod_network_p2p_message_sent_{TAG}", "Number of complete messages that were sent to the network for {TAG} messages", tagStringList, "UNK")
}

var networkSentBytesTotal = metrics.MakeCounter(metrics.NetworkSentBytesTotal)
var networkP2PSentBytesTotal = metrics.MakeCounter(metrics.NetworkP2PSentBytesTotal)
var networkSentBytesByTag *metrics.TagCounter
var networkP2PSentBytesByTag *metrics.TagCounter
var networkReceivedBytesTotal = metrics.MakeCounter(metrics.NetworkReceivedBytesTotal)
var networkP2PReceivedBytesTotal = metrics.MakeCounter(metrics.NetworkP2PReceivedBytesTotal)
var networkReceivedBytesByTag *metrics.TagCounter
var networkP2PReceivedBytesByTag *metrics.TagCounter

var networkMessageReceivedTotal = metrics.MakeCounter(metrics.NetworkMessageReceivedTotal)
var networkP2PMessageReceivedTotal = metrics.MakeCounter(metrics.NetworkP2PMessageReceivedTotal)
var networkMessageReceivedByTag *metrics.TagCounter
var networkP2PMessageReceivedByTag *metrics.TagCounter
var networkMessageSentTotal = metrics.MakeCounter(metrics.NetworkMessageSentTotal)
var networkP2PMessageSentTotal = metrics.MakeCounter(metrics.NetworkP2PMessageSentTotal)
var networkMessageSentByTag *metrics.TagCounter
var networkP2PMessageSentByTag *metrics.TagCounter

var networkHandleMicrosByTag *metrics.TagCounter
var networkHandleCountByTag *metrics.TagCounter

var networkConnectionsDroppedTotal = metrics.MakeCounter(metrics.NetworkConnectionsDroppedTotal)
var networkMessageQueueMicrosTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_message_sent_queue_micros_total", Description: "Total microseconds message spent waiting in queue to be sent"})
var networkP2PMessageQueueMicrosTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_message_sent_queue_micros_total", Description: "Total microseconds p2p message spent waiting in queue to be sent"})

var duplicateNetworkMessageReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedTotal)
var duplicateNetworkMessageReceivedBytesTotal = metrics.MakeCounter(metrics.DuplicateNetworkMessageReceivedBytesTotal)
var duplicateNetworkFilterReceivedTotal = metrics.MakeCounter(metrics.DuplicateNetworkFilterReceivedTotal)
var outgoingNetworkMessageFilteredOutTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutTotal)
var outgoingNetworkMessageFilteredOutBytesTotal = metrics.MakeCounter(metrics.OutgoingNetworkMessageFilteredOutBytesTotal)
var unknownProtocolTagMessagesTotal = metrics.MakeCounter(metrics.UnknownProtocolTagMessagesTotal)

var networkIncomingConnections = metrics.MakeGauge(metrics.NetworkIncomingConnections)
var networkOutgoingConnections = metrics.MakeGauge(metrics.NetworkOutgoingConnections)

var networkIncomingBufferMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_buffer_micros_total", Description: "microseconds spent by incoming messages on the receive buffer"})
var networkHandleMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_rx_handle_micros_total", Description: "microseconds spent by protocol handlers in the receive thread"})

var networkBroadcasts = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcasts_total", Description: "number of broadcast operations"})
var networkBroadcastQueueFull = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_full_total", Description: "number of messages that were drops due to full broadcast queue"})
var networkBroadcastQueueMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_queue_micros_total", Description: "microseconds broadcast requests sit on queue"})
var networkBroadcastSendMicros = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_broadcast_send_micros_total", Description: "microseconds spent broadcasting"})
var networkBroadcastsDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_broadcasts_dropped_total", Description: "number of broadcast messages not sent to any peer"})
var networkPeerBroadcastDropped = metrics.MakeCounter(metrics.MetricName{Name: "algod_peer_broadcast_dropped_total", Description: "number of broadcast messages not sent to some peer"})

var networkPeerIdentityDisconnect = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_duplicate", Description: "number of times identity challenge cause us to disconnect a peer"})
var networkPeerIdentityError = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_identity_error", Description: "number of times an error occurs (besides expected) when processing identity challenges"})
var networkPeerAlreadyClosed = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_peer_already_closed", Description: "number of times a peer would be added but the peer connection is already closed"})

var networkSlowPeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_slow_drops_total", Description: "number of peers dropped for being slow to send to"})
var networkIdlePeerDrops = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_idle_drops_total", Description: "number of peers dropped due to idle connection"})

var peers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_peers", Description: "Number of active peers."})
var incomingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_incoming_peers", Description: "Number of active incoming peers."})
var outgoingPeers = metrics.MakeGauge(metrics.MetricName{Name: "algod_network_outgoing_peers", Description: "Number of active outgoing peers."})

var transactionMessagesP2PRejectMessage = metrics.NewTagCounter(metrics.TransactionMessagesP2PRejectMessage.Name, metrics.TransactionMessagesP2PRejectMessage.Description)
var transactionMessagesP2PDuplicateMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDuplicateMessage)
var transactionMessagesP2PDeliverMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PDeliverMessage)
var transactionMessagesP2PUnderdeliverableMessage = metrics.MakeCounter(metrics.TransactionMessagesP2PUndeliverableMessage)

var networkP2PGossipSubSentBytesTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_sent_bytes_total", Description: "Total number of bytes sent through gossipsub"})
var networkP2PGossipSubReceivedBytesTotal = metrics.MakeCounter(metrics.MetricName{Name: "algod_network_p2p_gs_received_bytes_total", Description: "Total number of bytes received through gossipsub"})

var _ = pubsub.RawTracer(pubsubMetricsTracer{})

// pubsubMetricsTracer is a tracer for pubsub events used to track metrics.
type pubsubMetricsTracer struct{}

// AddPeer is invoked when a new peer is added.
func (t pubsubMetricsTracer) AddPeer(p peer.ID, proto p2proto.ID) {}

// RemovePeer is invoked when a peer is removed.
func (t pubsubMetricsTracer) RemovePeer(p peer.ID) {}

// Join is invoked when a new topic is joined
func (t pubsubMetricsTracer) Join(topic string) {}

// Leave is invoked when a topic is abandoned
func (t pubsubMetricsTracer) Leave(topic string) {}

Check warning on line 127 in network/metrics.go

View check run for this annotation

Codecov / codecov/patch

network/metrics.go#L127

Added line #L127 was not covered by tests

// Graft is invoked when a new peer is grafted on the mesh (gossipsub)
func (t pubsubMetricsTracer) Graft(p peer.ID, topic string) {}

// Prune is invoked when a peer is pruned from the message (gossipsub)
func (t pubsubMetricsTracer) Prune(p peer.ID, topic string) {}

Check warning on line 133 in network/metrics.go

View check run for this annotation

Codecov / codecov/patch

network/metrics.go#L133

Added line #L133 was not covered by tests

// ValidateMessage is invoked when a message first enters the validation pipeline.
func (t pubsubMetricsTracer) ValidateMessage(msg *pubsub.Message) {
if msg != nil && msg.Topic != nil {
switch *msg.Topic {
case p2p.TXTopicName:
networkP2PReceivedBytesTotal.AddUint64(uint64(len(msg.Data)), nil)
networkP2PReceivedBytesByTag.Add(string(protocol.TxnTag), uint64(len(msg.Data)))
networkP2PMessageReceivedByTag.Add(string(protocol.TxnTag), 1)
}
}
}

// DeliverMessage is invoked when a message is delivered
func (t pubsubMetricsTracer) DeliverMessage(msg *pubsub.Message) {
transactionMessagesP2PDeliverMessage.Inc(nil)
}

// RejectMessage is invoked when a message is Rejected or Ignored.
// The reason argument can be one of the named strings Reject*.
func (t pubsubMetricsTracer) RejectMessage(msg *pubsub.Message, reason string) {

Check warning on line 154 in network/metrics.go

View check run for this annotation

Codecov / codecov/patch

network/metrics.go#L154

Added line #L154 was not covered by tests
// TagCounter cannot handle tags with spaces so pubsub.Reject* cannot be used directly.
// Since Go's strings are immutable, char replacement is a new allocation so that stick to string literals.
switch reason {
case pubsub.RejectValidationThrottled:
transactionMessagesP2PRejectMessage.Add("throttled", 1)
case pubsub.RejectValidationQueueFull:
transactionMessagesP2PRejectMessage.Add("full", 1)
case pubsub.RejectValidationFailed:
transactionMessagesP2PRejectMessage.Add("failed", 1)
case pubsub.RejectValidationIgnored:
transactionMessagesP2PRejectMessage.Add("ignored", 1)
default:
transactionMessagesP2PRejectMessage.Add("other", 1)

Check warning on line 167 in network/metrics.go

View check run for this annotation

Codecov / codecov/patch

network/metrics.go#L157-L167

Added lines #L157 - L167 were not covered by tests
}
}

// DuplicateMessage is invoked when a duplicate message is dropped.
func (t pubsubMetricsTracer) DuplicateMessage(msg *pubsub.Message) {
transactionMessagesP2PDuplicateMessage.Inc(nil)

Check warning on line 173 in network/metrics.go

View check run for this annotation

Codecov / codecov/patch

network/metrics.go#L172-L173

Added lines #L172 - L173 were not covered by tests
}

// ThrottlePeer is invoked when a peer is throttled by the peer gater.
func (t pubsubMetricsTracer) ThrottlePeer(p peer.ID) {}

Check warning on line 177 in network/metrics.go

View check run for this annotation

Codecov / codecov/patch

network/metrics.go#L177

Added line #L177 was not covered by tests

// RecvRPC is invoked when an incoming RPC is received.
func (t pubsubMetricsTracer) RecvRPC(rpc *pubsub.RPC) {
networkP2PGossipSubReceivedBytesTotal.AddUint64(uint64(rpc.Size()), nil)
}

// SendRPC is invoked when a RPC is sent.
func (t pubsubMetricsTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {
networkP2PGossipSubSentBytesTotal.AddUint64(uint64(rpc.Size()), nil)
for i := range rpc.GetPublish() {
if rpc.Publish[i] != nil && rpc.Publish[i].Topic != nil {
switch *rpc.Publish[i].Topic {
case p2p.TXTopicName:
networkP2PSentBytesByTag.Add(string(protocol.TxnTag), uint64(len(rpc.Publish[i].Data)))
networkP2PSentBytesTotal.AddUint64(uint64(len(rpc.Publish[i].Data)), nil)
networkP2PMessageSentByTag.Add(string(protocol.TxnTag), 1)
}
}
}
}

// DropRPC is invoked when an outbound RPC is dropped, typically because of a queue full.
func (t pubsubMetricsTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {}

Check warning on line 200 in network/metrics.go

View check run for this annotation

Codecov / codecov/patch

network/metrics.go#L200

Added line #L200 was not covered by tests

// UndeliverableMessage is invoked when the consumer of Subscribe is not reading messages fast enough and
// the pressure release mechanism trigger, dropping messages.
func (t pubsubMetricsTracer) UndeliverableMessage(msg *pubsub.Message) {
transactionMessagesP2PUnderdeliverableMessage.Inc(nil)

Check warning on line 205 in network/metrics.go

View check run for this annotation

Codecov / codecov/patch

network/metrics.go#L204-L205

Added lines #L204 - L205 were not covered by tests
}
76 changes: 76 additions & 0 deletions network/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright (C) 2019-2024 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package network

import (
"go/ast"
"go/parser"
"go/token"
"testing"

"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/test/partitiontest"
)

// TestPubsubTracer_TagList makes sure pubsubMetricsTracer traces pubsub messages
// by counting switch cases in SendRPC and ValidateMessage
func TestMetrics_PubsubTracer_TagList(t *testing.T) {
t.Parallel()
partitiontest.PartitionTest(t)

fset := token.NewFileSet()
f, err := parser.ParseFile(fset, "metrics.go", nil, 0)
require.NoError(t, err)

// Find the SendRPC/ValidateMessage functions and count the switch cases
var sendCaseCount int
var recvCaseCount int
ast.Inspect(f, func(n ast.Node) bool {
switch stmt := n.(type) {
case *ast.FuncDecl:
if stmt.Name.Name == "SendRPC" {
ast.Inspect(stmt.Body, func(n ast.Node) bool {
if switchStmt, ok := n.(*ast.SwitchStmt); ok {
for _, stmt := range switchStmt.Body.List {
if _, ok := stmt.(*ast.CaseClause); ok {
sendCaseCount++
}
}
}
return true
})
}
if stmt.Name.Name == "ValidateMessage" {
ast.Inspect(stmt.Body, func(n ast.Node) bool {
if switchStmt, ok := n.(*ast.SwitchStmt); ok {
for _, stmt := range switchStmt.Body.List {
if _, ok := stmt.(*ast.CaseClause); ok {
recvCaseCount++
}
}
}
return true
})
}
}
return true
})

require.Equal(t, len(gossipSubTags), sendCaseCount)
require.Equal(t, len(gossipSubTags), recvCaseCount)
}
4 changes: 2 additions & 2 deletions network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@
}

// MakeService creates a P2P service instance
func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler) (*serviceImpl, error) {
func MakeService(ctx context.Context, log logging.Logger, cfg config.Local, h host.Host, listenAddr string, wsStreamHandler StreamHandler, metricsTracer pubsub.RawTracer) (*serviceImpl, error) {

Check warning on line 179 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L179

Added line #L179 was not covered by tests

sm := makeStreamManager(ctx, log, h, wsStreamHandler, cfg.EnableGossipService)
h.Network().Notify(sm)
Expand All @@ -188,7 +188,7 @@
telemetryProtoInfo := formatPeerTelemetryInfoProtocolName(telemetryID, telemetryInstance)
h.SetStreamHandler(protocol.ID(telemetryProtoInfo), func(s network.Stream) { s.Close() })

ps, err := makePubSub(ctx, cfg, h)
ps, err := makePubSub(ctx, cfg, h, metricsTracer)

Check warning on line 191 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L191

Added line #L191 was not covered by tests
if err != nil {
return nil, err
}
Expand Down
12 changes: 9 additions & 3 deletions network/p2p/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,14 @@
)

// TXTopicName defines a pubsub topic for TX messages
const TXTopicName = "/algo/tx/0.1.0"
// There is a micro optimization for const string comparison:
// 8 bytes const string require a single x86-64 CMPQ instruction.
// Naming convention: "algo" + 2 bytes protocol tag + 2 bytes version
const TXTopicName = "algotx01"

const incomingThreads = 20 // matches to number wsNetwork workers

func makePubSub(ctx context.Context, cfg config.Local, host host.Host) (*pubsub.PubSub, error) {
func makePubSub(ctx context.Context, cfg config.Local, host host.Host, metricsTracer pubsub.RawTracer) (*pubsub.PubSub, error) {

Check warning on line 61 in network/p2p/pubsub.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsub.go#L61

Added line #L61 was not covered by tests
//defaultParams := pubsub.DefaultGossipSubParams()

options := []pubsub.Option{
Expand Down Expand Up @@ -98,7 +101,10 @@
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
// pubsub.WithValidateThrottle(cfg.TxBacklogSize),
pubsub.WithValidateWorkers(incomingThreads),
pubsub.WithRawTracer(pubsubTracer{}),
}

if metricsTracer != nil {
options = append(options, pubsub.WithRawTracer(metricsTracer))

Check warning on line 107 in network/p2p/pubsub.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/pubsub.go#L106-L107

Added lines #L106 - L107 were not covered by tests
}

return pubsub.NewGossipSub(ctx, host, options...)
Expand Down
Loading
Loading