Skip to content

Commit

Permalink
Add Pubsub Metrics Tracer (#12178)
Browse files Browse the repository at this point in the history
* add tracer

* gaz

* preston's review

* preston's review
  • Loading branch information
nisdas authored Mar 24, 2023
1 parent 797cc36 commit 76c729f
Show file tree
Hide file tree
Showing 4 changed files with 200 additions and 14 deletions.
2 changes: 2 additions & 0 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"options.go",
"pubsub.go",
"pubsub_filter.go",
"pubsub_tracer.go",
"rpc_topic_mappings.go",
"sender.go",
"service.go",
Expand Down Expand Up @@ -84,6 +85,7 @@ go_library(
"@com_github_libp2p_go_libp2p//core/host:go_default_library",
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p//core/peerstore:go_default_library",
"@com_github_libp2p_go_libp2p//core/protocol:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/muxer/mplex:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/security/noise:go_default_library",
Expand Down
108 changes: 94 additions & 14 deletions beacon-chain/p2p/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"strings"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
Expand Down Expand Up @@ -63,6 +64,80 @@ var (
Name: "p2p_sync_committee_subnet_attempted_broadcasts",
Help: "The number of sync committee that were attempted to be broadcast.",
})

// Gossip Tracer Metrics
pubsubTopicsActive = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "p2p_pubsub_topic_active",
Help: "The topics that the peer is participating in gossipsub.",
},
[]string{"topic"})
pubsubTopicsGraft = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_graft_total",
Help: "The number of graft messages sent for a particular topic",
},
[]string{"topic"})
pubsubTopicsPrune = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_prune_total",
Help: "The number of prune messages sent for a particular topic",
},
[]string{"topic"})
pubsubMessageDeliver = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_deliver_total",
Help: "The number of messages received for delivery of a particular topic",
},
[]string{"topic"})
pubsubMessageUndeliverable = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_undeliverable_total",
Help: "The number of messages received which weren't able to be delivered of a particular topic",
},
[]string{"topic"})
pubsubMessageValidate = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_validate_total",
Help: "The number of messages received for validation of a particular topic",
},
[]string{"topic"})
pubsubMessageDuplicate = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_duplicate_total",
Help: "The number of duplicate messages sent for a particular topic",
},
[]string{"topic"})
pubsubMessageReject = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_reject_total",
Help: "The number of messages rejected of a particular topic",
},
[]string{"topic"})
pubsubPeerThrottle = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_throttle_total",
Help: "The number of times a peer has been throttled for a particular topic",
},
[]string{"topic"})
pubsubRPCRecv = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_recv_total",
Help: "The number of messages received via rpc for a particular topic",
},
[]string{"control_message"})
pubsubRPCSubRecv = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_recv_sub_total",
Help: "The number of subscription messages received via rpc",
})
pubsubRPCDrop = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_drop_total",
Help: "The number of messages dropped via rpc for a particular topic",
},
[]string{"control_message"})
pubsubRPCSubDrop = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_drop_sub_total",
Help: "The number of subscription messages dropped via rpc",
})
pubsubRPCSent = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_sent_total",
Help: "The number of messages sent via rpc for a particular topic",
},
[]string{"control_message"})
pubsubRPCSubSent = promauto.NewCounter(prometheus.CounterOpts{
Name: "p2p_pubsub_rpc_sent_sub_total",
Help: "The number of subscription messages sent via rpc",
})
)

func (s *Service) updateMetrics() {
Expand All @@ -84,20 +159,7 @@ func (s *Service) updateMetrics() {
continue
}

// Get the agent data.
rawAgent, err := store.Get(pid, "AgentVersion")
agent, ok := rawAgent.(string)
if err != nil || !ok {
agent = "unknown"
}
foundName := "unknown"
for _, knownAgent := range knownAgentVersions {
// If the agent string matches one of our known agents, we set
// the value to our own, sanitized string.
if strings.Contains(strings.ToLower(agent), knownAgent) {
foundName = knownAgent
}
}
foundName := agentFromPid(pid, store)
numConnectedPeersByClient[foundName] += 1

// Get peer scoring data.
Expand All @@ -123,3 +185,21 @@ func average(xs []float64) float64 {
}
return total / float64(len(xs))
}

func agentFromPid(pid peer.ID, store peerstore.Peerstore) string {
// Get the agent data.
rawAgent, err := store.Get(pid, "AgentVersion")
agent, ok := rawAgent.(string)
if err != nil || !ok {
return "unknown"
}
foundName := "unknown"
for _, knownAgent := range knownAgentVersions {
// If the agent string matches one of our known agents, we set
// the value to our own, sanitized string.
if strings.Contains(strings.ToLower(agent), knownAgent) {
foundName = knownAgent
}
}
return foundName
}
1 change: 1 addition & 0 deletions beacon-chain/p2p/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (s *Service) pubsubOptions() []pubsub.Option {
pubsub.WithPeerScore(peerScoringParams()),
pubsub.WithPeerScoreInspect(s.peerInspector, time.Minute),
pubsub.WithGossipSubParams(pubsubGossipParam()),
pubsub.WithRawTracer(gossipTracer{host: s.host}),
}
return psOpts
}
Expand Down
103 changes: 103 additions & 0 deletions beacon-chain/p2p/pubsub_tracer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package p2p

import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/prometheus/client_golang/prometheus"
)

var _ = pubsub.RawTracer(gossipTracer{})

// This tracer is used to implement metrics collection for messages received
// and broadcasted through gossipsub.
type gossipTracer struct {
host host.Host
}

// AddPeer .
func (g gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {
// no-op
}

// RemovePeer .
func (g gossipTracer) RemovePeer(p peer.ID) {
// no-op
}

// Join .
func (g gossipTracer) Join(topic string) {
pubsubTopicsActive.WithLabelValues(topic).Set(1)
}

// Leave .
func (g gossipTracer) Leave(topic string) {
pubsubTopicsActive.WithLabelValues(topic).Set(0)
}

// Graft .
func (g gossipTracer) Graft(p peer.ID, topic string) {
pubsubTopicsGraft.WithLabelValues(topic).Inc()
}

// Prune .
func (g gossipTracer) Prune(p peer.ID, topic string) {
pubsubTopicsPrune.WithLabelValues(topic).Inc()
}

// ValidateMessage .
func (g gossipTracer) ValidateMessage(msg *pubsub.Message) {
pubsubMessageValidate.WithLabelValues(*msg.Topic).Inc()
}

// DeliverMessage .
func (g gossipTracer) DeliverMessage(msg *pubsub.Message) {
pubsubMessageDeliver.WithLabelValues(*msg.Topic).Inc()
}

// RejectMessage .
func (g gossipTracer) RejectMessage(msg *pubsub.Message, reason string) {
pubsubMessageReject.WithLabelValues(*msg.Topic).Inc()
}

// DuplicateMessage .
func (g gossipTracer) DuplicateMessage(msg *pubsub.Message) {
pubsubMessageDuplicate.WithLabelValues(*msg.Topic).Inc()
}

// UndeliverableMessage .
func (g gossipTracer) UndeliverableMessage(msg *pubsub.Message) {
pubsubMessageUndeliverable.WithLabelValues(*msg.Topic).Inc()
}

// ThrottlePeer .
func (g gossipTracer) ThrottlePeer(p peer.ID) {
agent := agentFromPid(p, g.host.Peerstore())
pubsubPeerThrottle.WithLabelValues(agent).Inc()
}

// RecvRPC .
func (g gossipTracer) RecvRPC(rpc *pubsub.RPC) {
setMetricFromRPC(pubsubRPCSubRecv, pubsubRPCRecv, rpc)
}

// SendRPC .
func (g gossipTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {
setMetricFromRPC(pubsubRPCSubSent, pubsubRPCSent, rpc)
}

// DropRPC .
func (g gossipTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {
setMetricFromRPC(pubsubRPCSubDrop, pubsubRPCDrop, rpc)
}

func setMetricFromRPC(ctr prometheus.Counter, gauge *prometheus.CounterVec, rpc *pubsub.RPC) {
ctr.Add(float64(len(rpc.Subscriptions)))
if rpc.Control != nil {
gauge.WithLabelValues("graft").Add(float64(len(rpc.Control.Graft)))
gauge.WithLabelValues("prune").Add(float64(len(rpc.Control.Prune)))
gauge.WithLabelValues("ihave").Add(float64(len(rpc.Control.Ihave)))
gauge.WithLabelValues("iwant").Add(float64(len(rpc.Control.Iwant)))
}
}

0 comments on commit 76c729f

Please sign in to comment.