diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index ef1525a33948..4058bf3ccf62 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "options.go", "pubsub.go", "pubsub_filter.go", + "pubsub_tracer.go", "rpc_topic_mappings.go", "sender.go", "service.go", @@ -84,6 +85,7 @@ go_library( "@com_github_libp2p_go_libp2p//core/host:go_default_library", "@com_github_libp2p_go_libp2p//core/network:go_default_library", "@com_github_libp2p_go_libp2p//core/peer:go_default_library", + "@com_github_libp2p_go_libp2p//core/peerstore:go_default_library", "@com_github_libp2p_go_libp2p//core/protocol:go_default_library", "@com_github_libp2p_go_libp2p//p2p/muxer/mplex:go_default_library", "@com_github_libp2p_go_libp2p//p2p/security/noise:go_default_library", diff --git a/beacon-chain/p2p/monitoring.go b/beacon-chain/p2p/monitoring.go index 989fdb6adf47..8bd0c5d526ec 100644 --- a/beacon-chain/p2p/monitoring.go +++ b/beacon-chain/p2p/monitoring.go @@ -4,6 +4,7 @@ import ( "strings" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -63,6 +64,80 @@ var ( Name: "p2p_sync_committee_subnet_attempted_broadcasts", Help: "The number of sync committee that were attempted to be broadcast.", }) + + // Gossip Tracer Metrics + pubsubTopicsActive = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "p2p_pubsub_topic_active", + Help: "The topics that the peer is participating in gossipsub.", + }, + []string{"topic"}) + pubsubTopicsGraft = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_graft_total", + Help: "The number of graft messages sent for a particular topic", + }, + []string{"topic"}) + pubsubTopicsPrune = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_prune_total", + Help: "The number of prune messages sent for a particular topic", + }, + []string{"topic"}) + pubsubMessageDeliver = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_deliver_total", + Help: "The number of messages received for delivery of a particular topic", + }, + []string{"topic"}) + pubsubMessageUndeliverable = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_undeliverable_total", + Help: "The number of messages received which weren't able to be delivered of a particular topic", + }, + []string{"topic"}) + pubsubMessageValidate = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_validate_total", + Help: "The number of messages received for validation of a particular topic", + }, + []string{"topic"}) + pubsubMessageDuplicate = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_duplicate_total", + Help: "The number of duplicate messages sent for a particular topic", + }, + []string{"topic"}) + pubsubMessageReject = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_reject_total", + Help: "The number of messages rejected of a particular topic", + }, + []string{"topic"}) + pubsubPeerThrottle = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_throttle_total", + Help: "The number of times a peer has been throttled for a particular topic", + }, + []string{"topic"}) + pubsubRPCRecv = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_rpc_recv_total", + Help: "The number of messages received via rpc for a particular topic", + }, + []string{"control_message"}) + pubsubRPCSubRecv = promauto.NewCounter(prometheus.CounterOpts{ + Name: "p2p_pubsub_rpc_recv_sub_total", + Help: "The number of subscription messages received via rpc", + }) + pubsubRPCDrop = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_rpc_drop_total", + Help: "The number of messages dropped via rpc for a particular topic", + }, + []string{"control_message"}) + pubsubRPCSubDrop = promauto.NewCounter(prometheus.CounterOpts{ + Name: "p2p_pubsub_rpc_drop_sub_total", + Help: "The number of subscription messages dropped via rpc", + }) + pubsubRPCSent = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "p2p_pubsub_rpc_sent_total", + Help: "The number of messages sent via rpc for a particular topic", + }, + []string{"control_message"}) + pubsubRPCSubSent = promauto.NewCounter(prometheus.CounterOpts{ + Name: "p2p_pubsub_rpc_sent_sub_total", + Help: "The number of subscription messages sent via rpc", + }) ) func (s *Service) updateMetrics() { @@ -84,20 +159,7 @@ func (s *Service) updateMetrics() { continue } - // Get the agent data. - rawAgent, err := store.Get(pid, "AgentVersion") - agent, ok := rawAgent.(string) - if err != nil || !ok { - agent = "unknown" - } - foundName := "unknown" - for _, knownAgent := range knownAgentVersions { - // If the agent string matches one of our known agents, we set - // the value to our own, sanitized string. - if strings.Contains(strings.ToLower(agent), knownAgent) { - foundName = knownAgent - } - } + foundName := agentFromPid(pid, store) numConnectedPeersByClient[foundName] += 1 // Get peer scoring data. @@ -123,3 +185,21 @@ func average(xs []float64) float64 { } return total / float64(len(xs)) } + +func agentFromPid(pid peer.ID, store peerstore.Peerstore) string { + // Get the agent data. + rawAgent, err := store.Get(pid, "AgentVersion") + agent, ok := rawAgent.(string) + if err != nil || !ok { + return "unknown" + } + foundName := "unknown" + for _, knownAgent := range knownAgentVersions { + // If the agent string matches one of our known agents, we set + // the value to our own, sanitized string. + if strings.Contains(strings.ToLower(agent), knownAgent) { + foundName = knownAgent + } + } + return foundName +} diff --git a/beacon-chain/p2p/pubsub.go b/beacon-chain/p2p/pubsub.go index 09d0ad2a3d90..47d649db059e 100644 --- a/beacon-chain/p2p/pubsub.go +++ b/beacon-chain/p2p/pubsub.go @@ -145,6 +145,7 @@ func (s *Service) pubsubOptions() []pubsub.Option { pubsub.WithPeerScore(peerScoringParams()), pubsub.WithPeerScoreInspect(s.peerInspector, time.Minute), pubsub.WithGossipSubParams(pubsubGossipParam()), + pubsub.WithRawTracer(gossipTracer{host: s.host}), } return psOpts } diff --git a/beacon-chain/p2p/pubsub_tracer.go b/beacon-chain/p2p/pubsub_tracer.go new file mode 100644 index 000000000000..1405f410b2c4 --- /dev/null +++ b/beacon-chain/p2p/pubsub_tracer.go @@ -0,0 +1,103 @@ +package p2p + +import ( + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" + "github.com/prometheus/client_golang/prometheus" +) + +var _ = pubsub.RawTracer(gossipTracer{}) + +// This tracer is used to implement metrics collection for messages received +// and broadcasted through gossipsub. +type gossipTracer struct { + host host.Host +} + +// AddPeer . +func (g gossipTracer) AddPeer(p peer.ID, proto protocol.ID) { + // no-op +} + +// RemovePeer . +func (g gossipTracer) RemovePeer(p peer.ID) { + // no-op +} + +// Join . +func (g gossipTracer) Join(topic string) { + pubsubTopicsActive.WithLabelValues(topic).Set(1) +} + +// Leave . +func (g gossipTracer) Leave(topic string) { + pubsubTopicsActive.WithLabelValues(topic).Set(0) +} + +// Graft . +func (g gossipTracer) Graft(p peer.ID, topic string) { + pubsubTopicsGraft.WithLabelValues(topic).Inc() +} + +// Prune . +func (g gossipTracer) Prune(p peer.ID, topic string) { + pubsubTopicsPrune.WithLabelValues(topic).Inc() +} + +// ValidateMessage . +func (g gossipTracer) ValidateMessage(msg *pubsub.Message) { + pubsubMessageValidate.WithLabelValues(*msg.Topic).Inc() +} + +// DeliverMessage . +func (g gossipTracer) DeliverMessage(msg *pubsub.Message) { + pubsubMessageDeliver.WithLabelValues(*msg.Topic).Inc() +} + +// RejectMessage . +func (g gossipTracer) RejectMessage(msg *pubsub.Message, reason string) { + pubsubMessageReject.WithLabelValues(*msg.Topic).Inc() +} + +// DuplicateMessage . +func (g gossipTracer) DuplicateMessage(msg *pubsub.Message) { + pubsubMessageDuplicate.WithLabelValues(*msg.Topic).Inc() +} + +// UndeliverableMessage . +func (g gossipTracer) UndeliverableMessage(msg *pubsub.Message) { + pubsubMessageUndeliverable.WithLabelValues(*msg.Topic).Inc() +} + +// ThrottlePeer . +func (g gossipTracer) ThrottlePeer(p peer.ID) { + agent := agentFromPid(p, g.host.Peerstore()) + pubsubPeerThrottle.WithLabelValues(agent).Inc() +} + +// RecvRPC . +func (g gossipTracer) RecvRPC(rpc *pubsub.RPC) { + setMetricFromRPC(pubsubRPCSubRecv, pubsubRPCRecv, rpc) +} + +// SendRPC . +func (g gossipTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) { + setMetricFromRPC(pubsubRPCSubSent, pubsubRPCSent, rpc) +} + +// DropRPC . +func (g gossipTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) { + setMetricFromRPC(pubsubRPCSubDrop, pubsubRPCDrop, rpc) +} + +func setMetricFromRPC(ctr prometheus.Counter, gauge *prometheus.CounterVec, rpc *pubsub.RPC) { + ctr.Add(float64(len(rpc.Subscriptions))) + if rpc.Control != nil { + gauge.WithLabelValues("graft").Add(float64(len(rpc.Control.Graft))) + gauge.WithLabelValues("prune").Add(float64(len(rpc.Control.Prune))) + gauge.WithLabelValues("ihave").Add(float64(len(rpc.Control.Ihave))) + gauge.WithLabelValues("iwant").Add(float64(len(rpc.Control.Iwant))) + } +}