Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
fix: plumb through ctor contexts to preserve metrics scopes
Browse files Browse the repository at this point in the history
  • Loading branch information
guseggert committed Aug 13, 2022
1 parent 81393bc commit ab72e8e
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 28 deletions.
9 changes: 6 additions & 3 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
"github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-bitswap/server"
"github.com/ipfs/go-bitswap/tracer"
"github.com/ipfs/go-metrics-interface"

"github.com/ipfs/go-block-format"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-ipfs-exchange-interface"
blockstore "github.com/ipfs/go-ipfs-blockstore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/peer"

Expand Down Expand Up @@ -86,6 +87,8 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc
serverOptions = append(serverOptions, server.HasBlockBufferSize(HasBlockBufferSize))
}

ctx = metrics.CtxSubScope(ctx, "bitswap")

bs.Server = server.New(ctx, net, bstore, serverOptions...)
bs.Client = client.New(ctx, net, bstore, append(clientOptions, client.WithBlockReceivedNotifier(bs.Server))...)
net.Start(bs) // use the polyfill receiver to log received errors and trace messages only once
Expand Down
4 changes: 2 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
sim: sim,
notif: notif,
counters: new(counters),
dupMetric: bmetrics.DupHist(),
allMetric: bmetrics.AllHist(),
dupMetric: bmetrics.DupHist(ctx),
allMetric: bmetrics.AllHist(ctx),
provSearchDelay: defaults.ProvSearchDelay,
rebroadcastDelay: delay.Fixed(time.Minute),
simulateDontHavesOnTimeout: true,
Expand Down
34 changes: 18 additions & 16 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package metrics

import (
"context"

"github.com/ipfs/go-metrics-interface"
)

Expand All @@ -11,34 +13,34 @@ var (
timeMetricsBuckets = []float64{1, 10, 30, 60, 90, 120, 600}
)

func DupHist() metrics.Histogram {
return metrics.New("recv_dup_blocks_bytes", "Summary of duplicate data blocks recived").Histogram(metricsBuckets)
func DupHist(ctx context.Context) metrics.Histogram {
return metrics.NewCtx(ctx, "recv_dup_blocks_bytes", "Summary of duplicate data blocks recived").Histogram(metricsBuckets)
}

func AllHist() metrics.Histogram {
return metrics.New("recv_all_blocks_bytes", "Summary of all data blocks recived").Histogram(metricsBuckets)
func AllHist(ctx context.Context) metrics.Histogram {
return metrics.NewCtx(ctx, "recv_all_blocks_bytes", "Summary of all data blocks recived").Histogram(metricsBuckets)
}

func SentHist() metrics.Histogram {
return metrics.New("sent_all_blocks_bytes", "Histogram of blocks sent by this bitswap").Histogram(metricsBuckets)
func SentHist(ctx context.Context) metrics.Histogram {
return metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by this bitswap").Histogram(metricsBuckets)
}

func SendTimeHist() metrics.Histogram {
return metrics.New("send_times", "Histogram of how long it takes to send messages in this bitswap").Histogram(timeMetricsBuckets)
func SendTimeHist(ctx context.Context) metrics.Histogram {
return metrics.NewCtx(ctx, "send_times", "Histogram of how long it takes to send messages in this bitswap").Histogram(timeMetricsBuckets)
}

func PendingEngineGauge() metrics.Gauge {
return metrics.New("pending_tasks", "Total number of pending tasks").Gauge()
func PendingEngineGauge(ctx context.Context) metrics.Gauge {
return metrics.NewCtx(ctx, "pending_tasks", "Total number of pending tasks").Gauge()
}

func ActiveEngineGauge() metrics.Gauge {
return metrics.New("active_tasks", "Total number of active tasks").Gauge()
func ActiveEngineGauge(ctx context.Context) metrics.Gauge {
return metrics.NewCtx(ctx, "active_tasks", "Total number of active tasks").Gauge()
}

func PendingBlocksGauge() metrics.Gauge {
return metrics.New("pending_block_tasks", "Total number of pending blockstore tasks").Gauge()
func PendingBlocksGauge(ctx context.Context) metrics.Gauge {
return metrics.NewCtx(ctx, "pending_block_tasks", "Total number of pending blockstore tasks").Gauge()
}

func ActiveBlocksGauge() metrics.Gauge {
return metrics.New("active_block_tasks", "Total number of active blockstore tasks").Gauge()
func ActiveBlocksGauge(ctx context.Context) metrics.Gauge {
return metrics.NewCtx(ctx, "active_block_tasks", "Total number of active blockstore tasks").Gauge()
}
9 changes: 6 additions & 3 deletions server/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,12 +305,14 @@ func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator {
// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer more tasks if it has some maximum
// work already outstanding.
func NewEngine(
ctx context.Context,
bs bstore.Blockstore,
peerTagger PeerTagger,
self peer.ID,
opts ...Option,
) *Engine {
return newEngine(
ctx,
bs,
peerTagger,
self,
Expand All @@ -320,6 +322,7 @@ func NewEngine(
}

func newEngine(
ctx context.Context,
bs bstore.Blockstore,
peerTagger PeerTagger,
self peer.ID,
Expand All @@ -340,8 +343,8 @@ func newEngine(
sendDontHaves: true,
self: self,
peerLedger: newPeerLedger(),
pendingGauge: bmetrics.PendingEngineGauge(),
activeGauge: bmetrics.ActiveEngineGauge(),
pendingGauge: bmetrics.PendingEngineGauge(ctx),
activeGauge: bmetrics.ActiveEngineGauge(ctx),
targetMessageSize: defaultTargetMessageSize,
tagQueued: fmt.Sprintf(tagFormat, "queued", uuid.New().String()),
tagUseful: fmt.Sprintf(tagFormat, "useful", uuid.New().String()),
Expand All @@ -351,7 +354,7 @@ func newEngine(
opt(e)
}

e.bsm = newBlockstoreManager(bs, e.bstoreWorkerCount, bmetrics.PendingBlocksGauge(), bmetrics.ActiveBlocksGauge())
e.bsm = newBlockstoreManager(bs, e.bstoreWorkerCount, bmetrics.PendingBlocksGauge(ctx), bmetrics.ActiveBlocksGauge(ctx))

// default peer task queue options
peerTaskQueueOpts := []peertaskqueue.Option{
Expand Down
1 change: 1 addition & 0 deletions server/internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func newEngineForTesting(
opts ...Option,
) *Engine {
return newEngine(
ctx,
bs,
peerTagger,
self,
Expand Down
8 changes: 4 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/ipfs/go-bitswap/tracer"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-ipfs-blockstore"
blockstore "github.com/ipfs/go-ipfs-blockstore"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-metrics-interface"
process "github.com/jbenet/goprocess"
Expand Down Expand Up @@ -85,8 +85,8 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl
}()

s := &Server{
sentHistogram: bmetrics.SentHist(),
sendTimeHistogram: bmetrics.SendTimeHist(),
sentHistogram: bmetrics.SentHist(ctx),
sendTimeHistogram: bmetrics.SendTimeHist(ctx),
taskWorkerCount: defaults.BitswapTaskWorkerCount,
network: network,
process: px,
Expand All @@ -100,8 +100,8 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl
o(s)
}

// Set up decision engine
s.engine = decision.NewEngine(
ctx,
bstore,
network.ConnectionManager(),
network.Self(),
Expand Down

0 comments on commit ab72e8e

Please sign in to comment.