From 431329a054d0a2ad40d73ba9a9e04854aa9b17c9 Mon Sep 17 00:00:00 2001 From: Jorropo Date: Thu, 11 Aug 2022 18:24:41 +0200 Subject: [PATCH] refactor: remove metrics object --- polyfill.go => bitswap.go | 10 +- client/client.go | 14 ++- metrics/gen.go | 132 ------------------------ metrics/metrics.go | 44 ++++++++ server/internal/decision/engine.go | 10 +- server/internal/decision/engine_test.go | 2 - server/server.go | 7 +- 7 files changed, 60 insertions(+), 159 deletions(-) rename polyfill.go => bitswap.go (93%) delete mode 100644 metrics/gen.go create mode 100644 metrics/metrics.go diff --git a/polyfill.go b/bitswap.go similarity index 93% rename from polyfill.go rename to bitswap.go index 95dcd5dc..ba16a093 100644 --- a/polyfill.go +++ b/bitswap.go @@ -6,7 +6,6 @@ import ( "github.com/ipfs/go-bitswap/client" "github.com/ipfs/go-bitswap/message" - "github.com/ipfs/go-bitswap/metrics" "github.com/ipfs/go-bitswap/network" "github.com/ipfs/go-bitswap/server" "github.com/ipfs/go-bitswap/tracer" @@ -24,7 +23,7 @@ import ( var log = logging.Logger("bitswap") // old interface we are targeting -type old interface { +type bitswap interface { Close() error GetBlock(ctx context.Context, k cid.Cid) (blocks.Block, error) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) @@ -44,7 +43,7 @@ type old interface { } var _ exchange.SessionExchange = (*Bitswap)(nil) -var _ old = (*Bitswap)(nil) +var _ bitswap = (*Bitswap)(nil) type Bitswap struct { *client.Client @@ -81,9 +80,8 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc serverOptions = append(serverOptions, server.WithTracer(tracer)) } - stats := metrics.New(ctx) - bs.Server = server.New(ctx, net, bstore, stats, serverOptions...) - bs.Client = client.New(ctx, net, bstore, stats, append(clientOptions, client.WithBlockReceivedNotifier(bs.Server))...) + bs.Server = server.New(ctx, net, bstore, serverOptions...) + bs.Client = client.New(ctx, net, bstore, append(clientOptions, client.WithBlockReceivedNotifier(bs.Server))...) net.Start(bs) // use the polyfill receiver to log received errors and trace messages only once return bs diff --git a/client/client.go b/client/client.go index 1380e0d9..3a208749 100644 --- a/client/client.go +++ b/client/client.go @@ -82,16 +82,14 @@ func WithBlockReceivedNotifier(brn BlockReceivedNotifier) Option { } type BlockReceivedNotifier interface { - // ReceivedBlocks notify the decision engine that a peer is well behaving - // and gave us usefull data, potentially increasing it's score and making us + // ReceivedBlocks notifies the decision engine that a peer is well-behaving + // and gave us useful data, potentially increasing its score and making us // send them more data in exchange. ReceivedBlocks(peer.ID, []blocks.Block) } -// New initializes a BitSwap instance that communicates over the provided -// BitSwapNetwork. This function registers the returned instance as the network -// delegate. Runs until context is cancelled or bitswap.Close is called. -func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, m *bmetrics.Metrics, options ...Option) *Client { +// New initializes a Bitswap client that runs until client.Close is called. +func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Client { // important to use provided parent context (since it may include important // loggable data). It's probably not a good idea to allow bitswap to be // coupled to the concerns of the ipfs daemon in this way. @@ -155,8 +153,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore sim: sim, notif: notif, counters: new(counters), - dupMetric: m.DupHist(), - allMetric: m.AllHist(), + dupMetric: bmetrics.DupHist(), + allMetric: bmetrics.AllHist(), provSearchDelay: defaults.ProvSearchDelay, rebroadcastDelay: delay.Fixed(time.Minute), simulateDontHavesOnTimeout: true, diff --git a/metrics/gen.go b/metrics/gen.go deleted file mode 100644 index 000a8cde..00000000 --- a/metrics/gen.go +++ /dev/null @@ -1,132 +0,0 @@ -package metrics - -import ( - "context" - "sync" - - "github.com/ipfs/go-metrics-interface" -) - -var ( - // the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size - metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22} - - timeMetricsBuckets = []float64{1, 10, 30, 60, 90, 120, 600} -) - -// Metrics is a type which lazy initialize metrics objects. -// It MUST not be copied. -type Metrics struct { - ctx context.Context - lock sync.Mutex - - dupHist metrics.Histogram - allHist metrics.Histogram - sentHist metrics.Histogram - sendTimeHist metrics.Histogram - - pendingEngineGauge metrics.Gauge - activeEngineGauge metrics.Gauge - pendingBlocksGauge metrics.Gauge - activeBlocksGauge metrics.Gauge -} - -func New(ctx context.Context) *Metrics { - return &Metrics{ctx: metrics.CtxSubScope(ctx, "bitswap")} -} - -// DupHist return recv_dup_blocks_bytes. -// Threadsafe -func (m *Metrics) DupHist() metrics.Histogram { - m.lock.Lock() - defer m.lock.Unlock() - if m.dupHist != nil { - return m.dupHist - } - m.dupHist = metrics.NewCtx(m.ctx, "recv_dup_blocks_bytes", "Summary of duplicate data blocks recived").Histogram(metricsBuckets) - return m.dupHist -} - -// AllHist returns recv_all_blocks_bytes. -// Threadsafe -func (m *Metrics) AllHist() metrics.Histogram { - m.lock.Lock() - defer m.lock.Unlock() - if m.allHist != nil { - return m.allHist - } - m.allHist = metrics.NewCtx(m.ctx, "recv_all_blocks_bytes", "Summary of all data blocks recived").Histogram(metricsBuckets) - return m.allHist -} - -// SentHist returns sent_all_blocks_bytes. -// Threadsafe -func (m *Metrics) SentHist() metrics.Histogram { - m.lock.Lock() - defer m.lock.Unlock() - if m.sentHist != nil { - return m.sentHist - } - m.sentHist = metrics.NewCtx(m.ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by this bitswap").Histogram(metricsBuckets) - return m.sentHist -} - -// SendTimeHist returns send_times. -// Threadsafe -func (m *Metrics) SendTimeHist() metrics.Histogram { - m.lock.Lock() - defer m.lock.Unlock() - if m.sendTimeHist != nil { - return m.sendTimeHist - } - m.sendTimeHist = metrics.NewCtx(m.ctx, "send_times", "Histogram of how long it takes to send messages in this bitswap").Histogram(timeMetricsBuckets) - return m.sendTimeHist -} - -// PendingEngineGauge returns pending_tasks. -// Threadsafe -func (m *Metrics) PendingEngineGauge() metrics.Gauge { - m.lock.Lock() - defer m.lock.Unlock() - if m.pendingEngineGauge != nil { - return m.pendingEngineGauge - } - m.pendingEngineGauge = metrics.NewCtx(m.ctx, "pending_tasks", "Total number of pending tasks").Gauge() - return m.pendingEngineGauge -} - -// ActiveEngineGauge returns active_tasks. -// Threadsafe -func (m *Metrics) ActiveEngineGauge() metrics.Gauge { - m.lock.Lock() - defer m.lock.Unlock() - if m.activeEngineGauge != nil { - return m.activeEngineGauge - } - m.activeEngineGauge = metrics.NewCtx(m.ctx, "active_tasks", "Total number of active tasks").Gauge() - return m.activeEngineGauge -} - -// PendingBlocksGauge returns pending_block_tasks. -// Threadsafe -func (m *Metrics) PendingBlocksGauge() metrics.Gauge { - m.lock.Lock() - defer m.lock.Unlock() - if m.pendingBlocksGauge != nil { - return m.pendingBlocksGauge - } - m.pendingBlocksGauge = metrics.NewCtx(m.ctx, "pending_block_tasks", "Total number of pending blockstore tasks").Gauge() - return m.pendingBlocksGauge -} - -// ActiveBlocksGauge returns active_block_tasks. -// Threadsafe -func (m *Metrics) ActiveBlocksGauge() metrics.Gauge { - m.lock.Lock() - defer m.lock.Unlock() - if m.activeBlocksGauge != nil { - return m.activeBlocksGauge - } - m.activeBlocksGauge = metrics.NewCtx(m.ctx, "active_block_tasks", "Total number of active blockstore tasks").Gauge() - return m.activeBlocksGauge -} diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 00000000..8d679a51 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,44 @@ +package metrics + +import ( + "github.com/ipfs/go-metrics-interface" +) + +var ( + // the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size + metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22} + + timeMetricsBuckets = []float64{1, 10, 30, 60, 90, 120, 600} +) + +func DupHist() metrics.Histogram { + return metrics.New("recv_dup_blocks_bytes", "Summary of duplicate data blocks recived").Histogram(metricsBuckets) +} + +func AllHist() metrics.Histogram { + return metrics.New("recv_all_blocks_bytes", "Summary of all data blocks recived").Histogram(metricsBuckets) +} + +func SentHist() metrics.Histogram { + return metrics.New("sent_all_blocks_bytes", "Histogram of blocks sent by this bitswap").Histogram(metricsBuckets) +} + +func SendTimeHist() metrics.Histogram { + return metrics.New("send_times", "Histogram of how long it takes to send messages in this bitswap").Histogram(timeMetricsBuckets) +} + +func PendingEngineGauge() metrics.Gauge { + return metrics.New("pending_tasks", "Total number of pending tasks").Gauge() +} + +func ActiveEngineGauge() metrics.Gauge { + return metrics.New("active_tasks", "Total number of active tasks").Gauge() +} + +func PendingBlocksGauge() metrics.Gauge { + return metrics.New("pending_block_tasks", "Total number of pending blockstore tasks").Gauge() +} + +func ActiveBlocksGauge() metrics.Gauge { + return metrics.New("active_block_tasks", "Total number of active blockstore tasks").Gauge() +} diff --git a/server/internal/decision/engine.go b/server/internal/decision/engine.go index d1ccdeb0..04bcb143 100644 --- a/server/internal/decision/engine.go +++ b/server/internal/decision/engine.go @@ -308,7 +308,6 @@ func NewEngine( bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID, - metrics *bmetrics.Metrics, opts ...Option, ) *Engine { return newEngine( @@ -316,7 +315,6 @@ func NewEngine( peerTagger, self, maxBlockSizeReplaceHasWithBlock, - metrics, opts..., ) } @@ -326,10 +324,8 @@ func newEngine( peerTagger PeerTagger, self peer.ID, maxReplaceSize int, - metrics *bmetrics.Metrics, opts ...Option, ) *Engine { - e := &Engine{ ledgerMap: make(map[peer.ID]*ledger), scoreLedger: NewDefaultScoreLedger(), @@ -344,8 +340,8 @@ func newEngine( sendDontHaves: true, self: self, peerLedger: newPeerLedger(), - pendingGauge: metrics.PendingEngineGauge(), - activeGauge: metrics.ActiveEngineGauge(), + pendingGauge: bmetrics.PendingEngineGauge(), + activeGauge: bmetrics.ActiveEngineGauge(), targetMessageSize: defaultTargetMessageSize, tagQueued: fmt.Sprintf(tagFormat, "queued", uuid.New().String()), tagUseful: fmt.Sprintf(tagFormat, "useful", uuid.New().String()), @@ -355,7 +351,7 @@ func newEngine( opt(e) } - e.bsm = newBlockstoreManager(bs, e.bstoreWorkerCount, metrics.PendingBlocksGauge(), metrics.ActiveBlocksGauge()) + e.bsm = newBlockstoreManager(bs, e.bstoreWorkerCount, bmetrics.PendingBlocksGauge(), bmetrics.ActiveBlocksGauge()) // default peer task queue options peerTaskQueueOpts := []peertaskqueue.Option{ diff --git a/server/internal/decision/engine_test.go b/server/internal/decision/engine_test.go index 853cc3bf..3ae8f150 100644 --- a/server/internal/decision/engine_test.go +++ b/server/internal/decision/engine_test.go @@ -14,7 +14,6 @@ import ( "github.com/ipfs/go-bitswap/internal/testutil" message "github.com/ipfs/go-bitswap/message" pb "github.com/ipfs/go-bitswap/message/pb" - "github.com/ipfs/go-bitswap/metrics" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" @@ -197,7 +196,6 @@ func newEngineForTesting( peerTagger, self, maxReplaceSize, - metrics.New(ctx), opts..., ) } diff --git a/server/server.go b/server/server.go index 8cbe4682..5f76a79b 100644 --- a/server/server.go +++ b/server/server.go @@ -78,7 +78,7 @@ type Server struct { provideEnabled bool } -func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, m *bmetrics.Metrics, options ...Option) *Server { +func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Server { ctx, cancel := context.WithCancel(ctx) px := process.WithTeardown(func() error { @@ -90,8 +90,8 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl }() s := &Server{ - sentHistogram: m.SentHist(), - sendTimeHistogram: m.SendTimeHist(), + sentHistogram: bmetrics.SentHist(), + sendTimeHistogram: bmetrics.SendTimeHist(), taskWorkerCount: defaults.BitswapTaskWorkerCount, network: network, process: px, @@ -109,7 +109,6 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl bstore, network.ConnectionManager(), network.Self(), - m, s.engineOptions..., ) s.engineOptions = nil