diff --git a/bitswap.go b/bitswap.go index f6fdb4cb..df7a91e7 100644 --- a/bitswap.go +++ b/bitswap.go @@ -10,11 +10,12 @@ import ( "github.com/ipfs/go-bitswap/network" "github.com/ipfs/go-bitswap/server" "github.com/ipfs/go-bitswap/tracer" + "github.com/ipfs/go-metrics-interface" - "github.com/ipfs/go-block-format" + blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - "github.com/ipfs/go-ipfs-blockstore" - "github.com/ipfs/go-ipfs-exchange-interface" + blockstore "github.com/ipfs/go-ipfs-blockstore" + exchange "github.com/ipfs/go-ipfs-exchange-interface" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p-core/peer" @@ -86,6 +87,8 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc serverOptions = append(serverOptions, server.HasBlockBufferSize(HasBlockBufferSize)) } + ctx = metrics.CtxSubScope(ctx, "bitswap") + bs.Server = server.New(ctx, net, bstore, serverOptions...) bs.Client = client.New(ctx, net, bstore, append(clientOptions, client.WithBlockReceivedNotifier(bs.Server))...) net.Start(bs) // use the polyfill receiver to log received errors and trace messages only once diff --git a/client/client.go b/client/client.go index 3a208749..47aa6444 100644 --- a/client/client.go +++ b/client/client.go @@ -153,8 +153,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore sim: sim, notif: notif, counters: new(counters), - dupMetric: bmetrics.DupHist(), - allMetric: bmetrics.AllHist(), + dupMetric: bmetrics.DupHist(ctx), + allMetric: bmetrics.AllHist(ctx), provSearchDelay: defaults.ProvSearchDelay, rebroadcastDelay: delay.Fixed(time.Minute), simulateDontHavesOnTimeout: true, diff --git a/metrics/metrics.go b/metrics/metrics.go index 8d679a51..b7192372 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -1,6 +1,8 @@ package metrics import ( + "context" + "github.com/ipfs/go-metrics-interface" ) @@ -11,34 +13,34 @@ var ( timeMetricsBuckets = []float64{1, 10, 30, 60, 90, 120, 600} ) -func DupHist() metrics.Histogram { - return metrics.New("recv_dup_blocks_bytes", "Summary of duplicate data blocks recived").Histogram(metricsBuckets) +func DupHist(ctx context.Context) metrics.Histogram { + return metrics.NewCtx(ctx, "recv_dup_blocks_bytes", "Summary of duplicate data blocks recived").Histogram(metricsBuckets) } -func AllHist() metrics.Histogram { - return metrics.New("recv_all_blocks_bytes", "Summary of all data blocks recived").Histogram(metricsBuckets) +func AllHist(ctx context.Context) metrics.Histogram { + return metrics.NewCtx(ctx, "recv_all_blocks_bytes", "Summary of all data blocks recived").Histogram(metricsBuckets) } -func SentHist() metrics.Histogram { - return metrics.New("sent_all_blocks_bytes", "Histogram of blocks sent by this bitswap").Histogram(metricsBuckets) +func SentHist(ctx context.Context) metrics.Histogram { + return metrics.NewCtx(ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by this bitswap").Histogram(metricsBuckets) } -func SendTimeHist() metrics.Histogram { - return metrics.New("send_times", "Histogram of how long it takes to send messages in this bitswap").Histogram(timeMetricsBuckets) +func SendTimeHist(ctx context.Context) metrics.Histogram { + return metrics.NewCtx(ctx, "send_times", "Histogram of how long it takes to send messages in this bitswap").Histogram(timeMetricsBuckets) } -func PendingEngineGauge() metrics.Gauge { - return metrics.New("pending_tasks", "Total number of pending tasks").Gauge() +func PendingEngineGauge(ctx context.Context) metrics.Gauge { + return metrics.NewCtx(ctx, "pending_tasks", "Total number of pending tasks").Gauge() } -func ActiveEngineGauge() metrics.Gauge { - return metrics.New("active_tasks", "Total number of active tasks").Gauge() +func ActiveEngineGauge(ctx context.Context) metrics.Gauge { + return metrics.NewCtx(ctx, "active_tasks", "Total number of active tasks").Gauge() } -func PendingBlocksGauge() metrics.Gauge { - return metrics.New("pending_block_tasks", "Total number of pending blockstore tasks").Gauge() +func PendingBlocksGauge(ctx context.Context) metrics.Gauge { + return metrics.NewCtx(ctx, "pending_block_tasks", "Total number of pending blockstore tasks").Gauge() } -func ActiveBlocksGauge() metrics.Gauge { - return metrics.New("active_block_tasks", "Total number of active blockstore tasks").Gauge() +func ActiveBlocksGauge(ctx context.Context) metrics.Gauge { + return metrics.NewCtx(ctx, "active_block_tasks", "Total number of active blockstore tasks").Gauge() } diff --git a/server/internal/decision/engine.go b/server/internal/decision/engine.go index 04bcb143..a53a6274 100644 --- a/server/internal/decision/engine.go +++ b/server/internal/decision/engine.go @@ -305,12 +305,14 @@ func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator { // maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer more tasks if it has some maximum // work already outstanding. func NewEngine( + ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID, opts ...Option, ) *Engine { return newEngine( + ctx, bs, peerTagger, self, @@ -320,6 +322,7 @@ func NewEngine( } func newEngine( + ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID, @@ -340,8 +343,8 @@ func newEngine( sendDontHaves: true, self: self, peerLedger: newPeerLedger(), - pendingGauge: bmetrics.PendingEngineGauge(), - activeGauge: bmetrics.ActiveEngineGauge(), + pendingGauge: bmetrics.PendingEngineGauge(ctx), + activeGauge: bmetrics.ActiveEngineGauge(ctx), targetMessageSize: defaultTargetMessageSize, tagQueued: fmt.Sprintf(tagFormat, "queued", uuid.New().String()), tagUseful: fmt.Sprintf(tagFormat, "useful", uuid.New().String()), @@ -351,7 +354,7 @@ func newEngine( opt(e) } - e.bsm = newBlockstoreManager(bs, e.bstoreWorkerCount, bmetrics.PendingBlocksGauge(), bmetrics.ActiveBlocksGauge()) + e.bsm = newBlockstoreManager(bs, e.bstoreWorkerCount, bmetrics.PendingBlocksGauge(ctx), bmetrics.ActiveBlocksGauge(ctx)) // default peer task queue options peerTaskQueueOpts := []peertaskqueue.Option{ diff --git a/server/internal/decision/engine_test.go b/server/internal/decision/engine_test.go index 3ae8f150..7484a7aa 100644 --- a/server/internal/decision/engine_test.go +++ b/server/internal/decision/engine_test.go @@ -192,6 +192,7 @@ func newEngineForTesting( opts ...Option, ) *Engine { return newEngine( + ctx, bs, peerTagger, self, diff --git a/server/server.go b/server/server.go index b39c34f1..c9dbf4d9 100644 --- a/server/server.go +++ b/server/server.go @@ -17,7 +17,7 @@ import ( "github.com/ipfs/go-bitswap/tracer" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" - "github.com/ipfs/go-ipfs-blockstore" + blockstore "github.com/ipfs/go-ipfs-blockstore" logging "github.com/ipfs/go-log" "github.com/ipfs/go-metrics-interface" process "github.com/jbenet/goprocess" @@ -85,8 +85,8 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl }() s := &Server{ - sentHistogram: bmetrics.SentHist(), - sendTimeHistogram: bmetrics.SendTimeHist(), + sentHistogram: bmetrics.SentHist(ctx), + sendTimeHistogram: bmetrics.SendTimeHist(ctx), taskWorkerCount: defaults.BitswapTaskWorkerCount, network: network, process: px, @@ -100,8 +100,8 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl o(s) } - // Set up decision engine s.engine = decision.NewEngine( + ctx, bstore, network.ConnectionManager(), network.Self(),