Skip to content

Commit

Permalink
perf(p2p): Reduce the p2p metrics overhead. cometbft#3411
Browse files Browse the repository at this point in the history
  • Loading branch information
ValarDragon committed Aug 19, 2024
1 parent f2f9426 commit 6239b99
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 53 deletions.
68 changes: 43 additions & 25 deletions p2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,37 +40,55 @@ type Metrics struct {
MessageSendBytesTotal metrics.Counter `metrics_labels:"message_type"`
}

type metricsLabelCache struct {
mtx *sync.RWMutex
messageLabelNames map[reflect.Type]string
type peerPendingMetricsCache struct {
mtx sync.Mutex
perMessageCache map[reflect.Type]*peerPendingMetricsCacheEntry
}

// ValueToMetricLabel is a method that is used to produce a prometheus label value of the golang
// type that is passed in.
// This method uses a map on the Metrics struct so that each label name only needs
// to be produced once to prevent expensive string operations.
func (m *metricsLabelCache) ValueToMetricLabel(i interface{}) string {
t := reflect.TypeOf(i)
m.mtx.RLock()
type peerPendingMetricsCacheEntry struct {
label string
pendingSendBytes int
pendingRecvBytes int
}

if s, ok := m.messageLabelNames[t]; ok {
m.mtx.RUnlock()
return s
func newPeerPendingMetricsCache() *peerPendingMetricsCache {
return &peerPendingMetricsCache{
perMessageCache: make(map[reflect.Type]*peerPendingMetricsCacheEntry),
}
m.mtx.RUnlock()
}

s := t.String()
ss := valueToLabelRegexp.FindStringSubmatch(s)
l := fmt.Sprintf("%s_%s", ss[1], ss[2])
m.mtx.Lock()
defer m.mtx.Unlock()
m.messageLabelNames[t] = l
return l
func (c *peerPendingMetricsCache) AddPendingSendBytes(msgType reflect.Type, addBytes int) {
c.mtx.Lock()
defer c.mtx.Unlock()
if entry, ok := c.perMessageCache[msgType]; ok {
entry.pendingSendBytes += addBytes
} else {
c.perMessageCache[msgType] = &peerPendingMetricsCacheEntry{
label: buildLabel(msgType),
pendingSendBytes: addBytes,
}
}
}

func newMetricsLabelCache() *metricsLabelCache {
return &metricsLabelCache{
mtx: &sync.RWMutex{},
messageLabelNames: map[reflect.Type]string{},
func (c *peerPendingMetricsCache) AddPendingRecvBytes(msgType reflect.Type, addBytes int) {
c.mtx.Lock()
defer c.mtx.Unlock()
if entry, ok := c.perMessageCache[msgType]; ok {
entry.pendingRecvBytes += addBytes
} else {
c.perMessageCache[msgType] = &peerPendingMetricsCacheEntry{
label: buildLabel(msgType),
pendingRecvBytes: addBytes,
}
}
}

func buildLabel(msgType reflect.Type) string {
s := msgType.String()
ss := valueToLabelRegexp.FindStringSubmatch(s)
return fmt.Sprintf("%s_%s", ss[1], ss[2])
}

func getMsgType(i any) reflect.Type {
return reflect.TypeOf(i)
}
56 changes: 36 additions & 20 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import (

//go:generate ../scripts/mockery_generate.sh Peer

const metricsTickerDuration = 10 * time.Second
// Same as the default Prometheus scrape interval in order to not lose
// granularity.
const metricsTickerDuration = 1 * time.Second

// Peer is an interface representing a peer connected on a reactor.
type Peer interface {
Expand Down Expand Up @@ -121,9 +123,9 @@ type peer struct {
// User data
Data *cmap.CMap

metrics *Metrics
metricsTicker *time.Ticker
mlc *metricsLabelCache
metrics *Metrics
metricsTicker *time.Ticker
pendingMetrics *peerPendingMetricsCache

// When removal of a peer fails, we set this flag
removalAttemptFailed bool
Expand All @@ -139,17 +141,16 @@ func newPeer(
msgTypeByChID map[byte]proto.Message,
chDescs []*cmtconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
mlc *metricsLabelCache,
options ...PeerOption,
) *peer {
p := &peer{
peerConn: pc,
nodeInfo: nodeInfo,
channels: nodeInfo.(DefaultNodeInfo).Channels,
Data: cmap.NewCMap(),
metricsTicker: time.NewTicker(metricsTickerDuration),
metrics: NopMetrics(),
mlc: mlc,
peerConn: pc,
nodeInfo: nodeInfo,
channels: nodeInfo.(DefaultNodeInfo).Channels,
Data: cmap.NewCMap(),
metricsTicker: time.NewTicker(metricsTickerDuration),
metrics: NopMetrics(),
pendingMetrics: newPeerPendingMetricsCache(),
}

p.mconn = createMConnection(
Expand Down Expand Up @@ -273,7 +274,7 @@ func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bo
} else if !p.hasChannel(chID) {
return false
}
metricLabelValue := p.mlc.ValueToMetricLabel(msg)
msgType := getMsgType(msg)
if w, ok := msg.(Wrapper); ok {
msg = w.Wrap()
}
Expand All @@ -284,7 +285,7 @@ func (p *peer) send(chID byte, msg proto.Message, sendFunc func(byte, []byte) bo
}
res := sendFunc(chID, msgBytes)
if res {
p.metrics.MessageSendBytesTotal.With("message_type", metricLabelValue).Add(float64(len(msgBytes)))
p.pendingMetrics.AddPendingSendBytes(msgType, len(msgBytes))
}
return res
}
Expand Down Expand Up @@ -373,6 +374,26 @@ func (p *peer) metricsReporter() {
}

p.metrics.PeerPendingSendBytes.With("peer_id", string(p.ID())).Set(sendQueueSize)
// Report per peer, per message total bytes, since the last interval
func() {
p.pendingMetrics.mtx.Lock()
defer p.pendingMetrics.mtx.Unlock()
for _, entry := range p.pendingMetrics.perMessageCache {
if entry.pendingSendBytes > 0 {
p.metrics.MessageSendBytesTotal.
With("message_type", entry.label).
Add(float64(entry.pendingSendBytes))
entry.pendingSendBytes = 0
}
if entry.pendingRecvBytes > 0 {
p.metrics.MessageReceiveBytesTotal.
With("message_type", entry.label).
Add(float64(entry.pendingRecvBytes))
entry.pendingRecvBytes = 0
}
}
}()

case <-p.Quit():
return
}
Expand Down Expand Up @@ -405,18 +426,13 @@ func createMConnection(
if err != nil {
panic(fmt.Errorf("unmarshaling message: %s into type: %s", err, reflect.TypeOf(mt)))
}
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
if w, ok := msg.(Unwrapper); ok {
msg, err = w.Unwrap()
if err != nil {
panic(fmt.Errorf("unwrapping message: %s", err))
}
}
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
p.metrics.MessageReceiveBytesTotal.With("message_type", p.mlc.ValueToMetricLabel(msg)).Add(float64(len(msgBytes)))
p.pendingMetrics.AddPendingRecvBytes(getMsgType(msg), len(msgBytes))
reactor.Receive(Envelope{
ChannelID: chID,
Src: p,
Expand Down
2 changes: 1 addition & 1 deletion p2p/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func createOutboundPeerAndPerformHandshake(
return nil, err
}

p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, msgTypeByChID, chDescs, func(p Peer, r interface{}) {}, newMetricsLabelCache())
p := newPeer(pc, mConfig, peerNodeInfo, reactorsByCh, msgTypeByChID, chDescs, func(p Peer, r interface{}) {})
p.SetLogger(log.TestingLogger().With("peer", addr))
return p, nil
}
Expand Down
4 changes: 0 additions & 4 deletions p2p/switch.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ type Switch struct {
rng *rand.Rand // seed for randomizing dial times and orders

metrics *Metrics
mlc *metricsLabelCache
}

// NetAddress returns the address the switch is listening on.
Expand Down Expand Up @@ -127,7 +126,6 @@ func NewSwitch(
filterTimeout: defaultFilterTimeout,
persistentPeersAddrs: make([]*NetAddress, 0),
unconditionalPeerIDs: make(map[ID]struct{}),
mlc: newMetricsLabelCache(),
}

// Ensure we have a completely undeterministic PRNG.
Expand Down Expand Up @@ -635,7 +633,6 @@ func (sw *Switch) acceptRoutine() {
reactorsByCh: sw.reactorsByCh,
msgTypeByChID: sw.msgTypeByChID,
metrics: sw.metrics,
mlc: sw.mlc,
isPersistent: sw.IsPeerPersistent,
})
if err != nil {
Expand Down Expand Up @@ -741,7 +738,6 @@ func (sw *Switch) addOutboundPeerWithConfig(
reactorsByCh: sw.reactorsByCh,
msgTypeByChID: sw.msgTypeByChID,
metrics: sw.metrics,
mlc: sw.mlc,
})
if err != nil {
if e, ok := err.(ErrRejected); ok {
Expand Down
1 change: 0 additions & 1 deletion p2p/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
sw.msgTypeByChID,
sw.chDescs,
sw.StopPeerForError,
sw.mlc,
)

if err = sw.addPeer(p); err != nil {
Expand Down
2 changes: 0 additions & 2 deletions p2p/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ type peerConfig struct {
reactorsByCh map[byte]Reactor
msgTypeByChID map[byte]proto.Message
metrics *Metrics
mlc *metricsLabelCache
}

// Transport emits and connects to Peers. The implementation of Peer is left to
Expand Down Expand Up @@ -531,7 +530,6 @@ func (mt *MultiplexTransport) wrapPeer(
cfg.msgTypeByChID,
cfg.chDescs,
cfg.onPeerError,
cfg.mlc,
PeerMetrics(cfg.metrics),
)

Expand Down

0 comments on commit 6239b99

Please sign in to comment.