Skip to content

Commit

Permalink
Merge pull request #5385 from ethereum-optimism/inphi/chan-in-metrics
Browse files Browse the repository at this point in the history
op-node: Add channel_input_bytes metric
  • Loading branch information
OptimismBot authored Apr 7, 2023
2 parents 3be8b48 + 06bea64 commit cbb0bb5
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 9 deletions.
16 changes: 16 additions & 0 deletions op-node/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type Metricer interface {
RecordSequencerBuildingDiffTime(duration time.Duration)
RecordSequencerSealingTime(duration time.Duration)
Document() []metrics.DocumentedMetric
RecordChannelInputBytes(num int)
// P2P Metrics
SetPeerScores(scores map[string]float64)
ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration)
Expand Down Expand Up @@ -131,6 +132,8 @@ type Metrics struct {
GossipEventsTotal *prometheus.CounterVec
BandwidthTotal *prometheus.GaugeVec

ChannelInputBytes prometheus.Counter

registry *prometheus.Registry
factory metrics.Factory
}
Expand Down Expand Up @@ -331,6 +334,12 @@ func NewMetrics(procName string) *Metrics {
"direction",
}),

ChannelInputBytes: factory.NewCounter(prometheus.CounterOpts{
Namespace: ns,
Name: "channel_input_bytes",
Help: "Number of compressed bytes added to the channel",
}),

P2PReqDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{
Namespace: ns,
Subsystem: "p2p",
Expand Down Expand Up @@ -635,6 +644,10 @@ func (m *Metrics) PayloadsQuarantineSize(n int) {
m.PayloadsQuarantineTotal.Set(float64(n))
}

func (m *Metrics) RecordChannelInputBytes(inputCompressedBytes int) {
m.ChannelInputBytes.Add(float64(inputCompressedBytes))
}

type noopMetricer struct{}

var NoopMetrics Metricer = new(noopMetricer)
Expand Down Expand Up @@ -737,3 +750,6 @@ func (n *noopMetricer) ServerPayloadByNumberEvent(num uint64, resultCode byte, d

func (n *noopMetricer) PayloadsQuarantineSize(int) {
}

func (n *noopMetricer) RecordChannelInputBytes(int) {
}
10 changes: 7 additions & 3 deletions op-node/rollup/derive/channel_in_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,18 @@ type ChannelInReader struct {
nextBatchFn func() (BatchWithL1InclusionBlock, error)

prev *ChannelBank

metrics Metrics
}

var _ ResetableStage = (*ChannelInReader)(nil)

// NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use.
func NewChannelInReader(log log.Logger, prev *ChannelBank) *ChannelInReader {
func NewChannelInReader(log log.Logger, prev *ChannelBank, metrics Metrics) *ChannelInReader {
return &ChannelInReader{
log: log,
prev: prev,
log: log,
prev: prev,
metrics: metrics,
}
}

Expand All @@ -41,6 +44,7 @@ func (cr *ChannelInReader) Origin() eth.L1BlockRef {
func (cr *ChannelInReader) WriteChannel(data []byte) error {
if f, err := BatchReader(bytes.NewBuffer(data), cr.Origin()); err == nil {
cr.nextBatchFn = f
cr.metrics.RecordChannelInputBytes(len(data))
return nil
} else {
cr.log.Error("Error creating batch reader from channel data", "err", err)
Expand Down
3 changes: 2 additions & 1 deletion op-node/rollup/derive/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type Metrics interface {
RecordL1Ref(name string, ref eth.L1BlockRef)
RecordL2Ref(name string, ref eth.L2BlockRef)
RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID)
RecordChannelInputBytes(inputCompresedBytes int)
}

type L1Fetcher interface {
Expand Down Expand Up @@ -82,7 +83,7 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch
l1Src := NewL1Retrieval(log, dataSrc, l1Traversal)
frameQueue := NewFrameQueue(log, l1Src)
bank := NewChannelBank(log, cfg, frameQueue, l1Fetcher)
chInReader := NewChannelInReader(log, bank)
chInReader := NewChannelInReader(log, bank, metrics)
batchQueue := NewBatchQueue(log, cfg, chInReader)
attrBuilder := NewFetchingAttributesBuilder(cfg, l1Fetcher, engine)
attributesQueue := NewAttributesQueue(log, cfg, attrBuilder, batchQueue)
Expand Down
1 change: 1 addition & 0 deletions op-node/rollup/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Metrics interface {

RecordL1Ref(name string, ref eth.L1BlockRef)
RecordL2Ref(name string, ref eth.L2BlockRef)
RecordChannelInputBytes(inputCompresedBytes int)

RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID)

Expand Down
19 changes: 14 additions & 5 deletions op-node/testutils/metrics.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package testutils

import "github.com/ethereum-optimism/optimism/op-node/eth"
import (
"github.com/ethereum-optimism/optimism/op-node/eth"
)

// TestDerivationMetrics implements the metrics used in the derivation pipeline as no-op operations.
// Optionally a test may hook into the metrics
type TestDerivationMetrics struct {
FnRecordL1ReorgDepth func(d uint64)
FnRecordL1Ref func(name string, ref eth.L1BlockRef)
FnRecordL2Ref func(name string, ref eth.L2BlockRef)
FnRecordUnsafePayloads func(length uint64, memSize uint64, next eth.BlockID)
FnRecordL1ReorgDepth func(d uint64)
FnRecordL1Ref func(name string, ref eth.L1BlockRef)
FnRecordL2Ref func(name string, ref eth.L2BlockRef)
FnRecordUnsafePayloads func(length uint64, memSize uint64, next eth.BlockID)
FnRecordChannelInputBytes func(inputCompresedBytes int)
}

func (t *TestDerivationMetrics) RecordL1ReorgDepth(d uint64) {
Expand All @@ -35,6 +38,12 @@ func (t *TestDerivationMetrics) RecordUnsafePayloadsBuffer(length uint64, memSiz
}
}

func (t *TestDerivationMetrics) RecordChannelInputBytes(inputCompresedBytes int) {
if t.FnRecordChannelInputBytes != nil {
t.FnRecordChannelInputBytes(inputCompresedBytes)
}
}

type TestRPCMetrics struct{}

func (n *TestRPCMetrics) RecordRPCServerRequest(method string) func() {
Expand Down

0 comments on commit cbb0bb5

Please sign in to comment.