Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bitswap: peer prom tacker #413

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ type Bitswap struct {
*client.Client
*server.Server

tracer tracer.Tracer
net network.BitSwapNetwork
tracers []tracer.Tracer
net network.BitSwapNetwork
}

func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Bitswap {
Expand All @@ -77,8 +77,8 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc
}
}

if bs.tracer != nil {
var tracer tracer.Tracer = nopReceiveTracer{bs.tracer}
for _, t := range bs.tracers {
var tracer tracer.Tracer = nopReceiveTracer{t}
clientOptions = append(clientOptions, client.WithTracer(tracer))
serverOptions = append(serverOptions, server.WithTracer(tracer))
}
Expand Down Expand Up @@ -172,8 +172,8 @@ func (bs *Bitswap) ReceiveError(err error) {
}

func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming message.BitSwapMessage) {
if bs.tracer != nil {
bs.tracer.MessageReceived(p, incoming)
for _, t := range bs.tracers {
t.MessageReceived(p, incoming)
}

bs.Client.ReceiveMessage(ctx, p, incoming)
Expand Down
11 changes: 6 additions & 5 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,13 @@ func SetSimulateDontHavesOnTimeout(send bool) Option {
}
}

// Configures the Client to use given tracer.
// WithTracer configures the Client to use given tracer.
// This provides methods to access all messages sent and received by the Client.
// This interface can be used to implement various statistics (this is original intent).
// This can be passed multiple times to register multiple tracers.
func WithTracer(tap tracer.Tracer) Option {
return func(bs *Client) {
bs.tracer = tap
bs.tracers = append(bs.tracers, tap)
}
}

Expand Down Expand Up @@ -208,7 +209,7 @@ type Client struct {
allMetric metrics.Histogram

// External statistics interface
tracer tracer.Tracer
tracers []tracer.Tracer

// the SessionManager routes requests to interested sessions
sm *bssm.SessionManager
Expand Down Expand Up @@ -342,8 +343,8 @@ func (bs *Client) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.
bs.counters.messagesRecvd++
bs.counterLk.Unlock()

if bs.tracer != nil {
bs.tracer.MessageReceived(p, incoming)
for _, t := range bs.tracers {
t.MessageReceived(p, incoming)
}

iblocks := incoming.Blocks()
Expand Down
3 changes: 2 additions & 1 deletion bitswap/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,12 @@ func SetSimulateDontHavesOnTimeout(send bool) Option {
return Option{client.SetSimulateDontHavesOnTimeout(send)}
}

// WithTracer can be passed multiple times to register multiple tracers.
func WithTracer(tap tracer.Tracer) Option {
// Only trace the server, both receive the same messages anyway
return Option{
option(func(bs *Bitswap) {
bs.tracer = tap
bs.tracers = append(bs.tracers, tap)
}),
}
}
146 changes: 146 additions & 0 deletions bitswap/peer-prom-tracker/peer-prom-tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package peerpromtracker

import (
"errors"
"fmt"

bsmsg "github.com/ipfs/boxo/bitswap/message"
"github.com/ipfs/boxo/bitswap/tracer"
"github.com/ipfs/go-log/v2"
peer "github.com/libp2p/go-libp2p/core/peer"
prom "github.com/prometheus/client_golang/prometheus"
)

var logger = log.Logger("bitswap/peer-prom-tracker")

var _ tracer.Tracer = (*PeerTracer)(nil)

type PeerTracer struct {
bytesSent *prom.CounterVec
messagesSent *prom.CounterVec
bytesReceived *prom.CounterVec
messagesReceived *prom.CounterVec

reg *prom.Registry
}

func NewPeerTracer(reg *prom.Registry) (*PeerTracer, error) {
pt := &PeerTracer{
reg: reg,
}

var good bool
defer func() {
if !good {
pt.Close() // will unregister the sucessfully registered metrics
}
}()

peerIdLabel := []string{"peer-id"}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ This will create a separate time series per PeerID, which is ok for debugging, but should NOT be enabled by default.

High cardinality labels in prometheus are is considered antipattern. If there was 20K of peers, this will create 20K time series, and that may cause problems (performance, billing) when Grafana tries to visualize it.

To understand why high cardinality is a problem, see:

IMO this PR can't land in boxo in this form as it creates footgun for users of this library.

There needs to be either a hard-limit on the number of peers tracked, or an explicit opt-in via constructor option or ENV variable.

Copy link
Member

@lidel lidel Jul 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 I think if we wanted to have metrics similar to this, we could measure P95 globally without running into the cardinality problem.

To do so, one would define Objectives in SummaryOpts to be P50, P75, P95 etc, and calculate messages-received, messages-sent, bytes-received etc across all peers, not specific per peer. This way we get useful P95 metric with known error margin, without exploding the time series.


bytesSent := prom.NewCounterVec(prom.CounterOpts{
Namespace: "ipfs",
Subsystem: "bitswap/messages",
Name: "bytes-sent",
Help: "This records the number of bitswap messages bytes sent per peer.",
}, peerIdLabel)
if err := reg.Register(bytesSent); err != nil {
return nil, fmt.Errorf("registering bytes-sent: %w", err)
}
pt.bytesSent = bytesSent

messagesSent := prom.NewCounterVec(prom.CounterOpts{
Namespace: "ipfs",
Subsystem: "bitswap/messages",
Name: "messages-sent",
Help: "This records the number of bitswap messages sent per peer.",
}, peerIdLabel)
if err := reg.Register(messagesSent); err != nil {
return nil, fmt.Errorf("registering messages-sent: %w", err)
}
pt.messagesSent = messagesSent

bytesReceived := prom.NewCounterVec(prom.CounterOpts{
Namespace: "ipfs",
Subsystem: "bitswap/messages",
Name: "bytes-received",
Help: "This records the number of bitswap messages bytes received from each peer.",
}, peerIdLabel)
if err := reg.Register(bytesReceived); err != nil {
return nil, fmt.Errorf("registering bytes-received: %w", err)
}
pt.bytesReceived = bytesReceived

messagesReceived := prom.NewCounterVec(prom.CounterOpts{
Namespace: "ipfs",
Subsystem: "bitswap/messages",
Name: "messages-received",
Help: "This records the number of bitswap messages received from each peer.",
}, peerIdLabel)
if err := reg.Register(messagesReceived); err != nil {
return nil, fmt.Errorf("registering messages-received: %w", err)
}
pt.messagesReceived = messagesReceived

good = true
return pt, nil
}

func (t *PeerTracer) MessageReceived(p peer.ID, msg bsmsg.BitSwapMessage) {
strPeerid := p.Pretty()

counter, err := t.messagesReceived.GetMetricWithLabelValues(strPeerid)
if err == nil {
logger.Debugf("failed to grab messages received label %s", err)
} else {
counter.Inc()
}

counter, err = t.bytesReceived.GetMetricWithLabelValues(strPeerid)
if err == nil {
logger.Debugf("failed to grab messages received label %s", err)
} else {
counter.Add(float64(msg.Size()))
}
}

func (t *PeerTracer) MessageSent(p peer.ID, msg bsmsg.BitSwapMessage) {
strPeerid := p.Pretty()

counter, err := t.messagesSent.GetMetricWithLabelValues(strPeerid)
if err == nil {
logger.Debugf("failed to grab messages received label %s", err)
} else {
counter.Inc()
}

counter, err = t.bytesSent.GetMetricWithLabelValues(strPeerid)
if err == nil {
logger.Debugf("failed to grab messages received label %s", err)
} else {
counter.Add(float64(msg.Size()))
}
}

func (t *PeerTracer) Close() error {
if t.reg == nil {
return errors.New("already closed")
}

// we have to check because this can be called by [NewPeerTracer] if an errors occurs.
if t.bytesSent != nil {
t.reg.Unregister(t.bytesSent)
}
if t.messagesSent != nil {
t.reg.Unregister(t.messagesSent)
}
if t.bytesReceived != nil {
t.reg.Unregister(t.bytesReceived)
}
if t.messagesReceived != nil {
t.reg.Unregister(t.messagesReceived)
}
*t = PeerTracer{}

return nil
}
13 changes: 7 additions & 6 deletions bitswap/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Server struct {
network bsnet.BitSwapNetwork

// External statistics interface
tracer tracer.Tracer
tracers []tracer.Tracer

// Counters for various statistics
counterLk sync.Mutex
Expand Down Expand Up @@ -123,9 +123,10 @@ func TaskWorkerCount(count int) Option {
}
}

// WithTracer can be passed multiple times to register multiple tracers.
func WithTracer(tap tracer.Tracer) Option {
return func(bs *Server) {
bs.tracer = tap
bs.tracers = append(bs.tracers, tap)
}
}

Expand Down Expand Up @@ -294,8 +295,8 @@ func (bs *Server) taskWorker(ctx context.Context, id int) {
// Ideally, yes. But we'd need some way to trigger a retry and/or drop
// the peer.
bs.engine.MessageSent(envelope.Peer, envelope.Message)
if bs.tracer != nil {
bs.tracer.MessageSent(envelope.Peer, envelope.Message)
for _, t := range bs.tracers {
t.MessageSent(envelope.Peer, envelope.Message)
}
bs.sendBlocks(ctx, envelope)

Expand Down Expand Up @@ -529,8 +530,8 @@ func (bs *Server) ReceiveMessage(ctx context.Context, p peer.ID, incoming messag
// TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger

if bs.tracer != nil {
bs.tracer.MessageReceived(p, incoming)
for _, t := range bs.tracers {
t.MessageReceived(p, incoming)
}
}

Expand Down
Loading