diff --git a/op-node/metrics/metrics.go b/op-node/metrics/metrics.go index 2fea08f92ab2..1abc82b61a71 100644 --- a/op-node/metrics/metrics.go +++ b/op-node/metrics/metrics.go @@ -64,6 +64,7 @@ type Metricer interface { RecordSequencerBuildingDiffTime(duration time.Duration) RecordSequencerSealingTime(duration time.Duration) Document() []metrics.DocumentedMetric + RecordChannelInputBytes(num int) // P2P Metrics SetPeerScores(scores map[string]float64) ClientPayloadByNumberEvent(num uint64, resultCode byte, duration time.Duration) @@ -131,6 +132,8 @@ type Metrics struct { GossipEventsTotal *prometheus.CounterVec BandwidthTotal *prometheus.GaugeVec + ChannelInputBytes prometheus.Counter + registry *prometheus.Registry factory metrics.Factory } @@ -331,6 +334,12 @@ func NewMetrics(procName string) *Metrics { "direction", }), + ChannelInputBytes: factory.NewCounter(prometheus.CounterOpts{ + Namespace: ns, + Name: "channel_input_bytes", + Help: "Number of compressed bytes added to the channel", + }), + P2PReqDurationSeconds: factory.NewHistogramVec(prometheus.HistogramOpts{ Namespace: ns, Subsystem: "p2p", @@ -635,6 +644,10 @@ func (m *Metrics) PayloadsQuarantineSize(n int) { m.PayloadsQuarantineTotal.Set(float64(n)) } +func (m *Metrics) RecordChannelInputBytes(inputCompressedBytes int) { + m.ChannelInputBytes.Add(float64(inputCompressedBytes)) +} + type noopMetricer struct{} var NoopMetrics Metricer = new(noopMetricer) @@ -737,3 +750,6 @@ func (n *noopMetricer) ServerPayloadByNumberEvent(num uint64, resultCode byte, d func (n *noopMetricer) PayloadsQuarantineSize(int) { } + +func (n *noopMetricer) RecordChannelInputBytes(int) { +} diff --git a/op-node/rollup/derive/channel_in_reader.go b/op-node/rollup/derive/channel_in_reader.go index 5a5156e678f2..2b3e1af2f164 100644 --- a/op-node/rollup/derive/channel_in_reader.go +++ b/op-node/rollup/derive/channel_in_reader.go @@ -21,15 +21,18 @@ type ChannelInReader struct { nextBatchFn func() (BatchWithL1InclusionBlock, error) prev *ChannelBank + + metrics Metrics } var _ ResetableStage = (*ChannelInReader)(nil) // NewChannelInReader creates a ChannelInReader, which should be Reset(origin) before use. -func NewChannelInReader(log log.Logger, prev *ChannelBank) *ChannelInReader { +func NewChannelInReader(log log.Logger, prev *ChannelBank, metrics Metrics) *ChannelInReader { return &ChannelInReader{ - log: log, - prev: prev, + log: log, + prev: prev, + metrics: metrics, } } @@ -41,6 +44,7 @@ func (cr *ChannelInReader) Origin() eth.L1BlockRef { func (cr *ChannelInReader) WriteChannel(data []byte) error { if f, err := BatchReader(bytes.NewBuffer(data), cr.Origin()); err == nil { cr.nextBatchFn = f + cr.metrics.RecordChannelInputBytes(len(data)) return nil } else { cr.log.Error("Error creating batch reader from channel data", "err", err) diff --git a/op-node/rollup/derive/pipeline.go b/op-node/rollup/derive/pipeline.go index 4d68fa39177e..4dcd74a8e778 100644 --- a/op-node/rollup/derive/pipeline.go +++ b/op-node/rollup/derive/pipeline.go @@ -15,6 +15,7 @@ type Metrics interface { RecordL1Ref(name string, ref eth.L1BlockRef) RecordL2Ref(name string, ref eth.L2BlockRef) RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID) + RecordChannelInputBytes(inputCompresedBytes int) } type L1Fetcher interface { @@ -82,7 +83,7 @@ func NewDerivationPipeline(log log.Logger, cfg *rollup.Config, l1Fetcher L1Fetch l1Src := NewL1Retrieval(log, dataSrc, l1Traversal) frameQueue := NewFrameQueue(log, l1Src) bank := NewChannelBank(log, cfg, frameQueue, l1Fetcher) - chInReader := NewChannelInReader(log, bank) + chInReader := NewChannelInReader(log, bank, metrics) batchQueue := NewBatchQueue(log, cfg, chInReader) attrBuilder := NewFetchingAttributesBuilder(cfg, l1Fetcher, engine) attributesQueue := NewAttributesQueue(log, cfg, attrBuilder, batchQueue) diff --git a/op-node/rollup/driver/driver.go b/op-node/rollup/driver/driver.go index b011e99abb33..08d1c844eca9 100644 --- a/op-node/rollup/driver/driver.go +++ b/op-node/rollup/driver/driver.go @@ -21,6 +21,7 @@ type Metrics interface { RecordL1Ref(name string, ref eth.L1BlockRef) RecordL2Ref(name string, ref eth.L2BlockRef) + RecordChannelInputBytes(inputCompresedBytes int) RecordUnsafePayloadsBuffer(length uint64, memSize uint64, next eth.BlockID) diff --git a/op-node/testutils/metrics.go b/op-node/testutils/metrics.go index 36c8001e2fe0..627a741e0378 100644 --- a/op-node/testutils/metrics.go +++ b/op-node/testutils/metrics.go @@ -1,14 +1,17 @@ package testutils -import "github.com/ethereum-optimism/optimism/op-node/eth" +import ( + "github.com/ethereum-optimism/optimism/op-node/eth" +) // TestDerivationMetrics implements the metrics used in the derivation pipeline as no-op operations. // Optionally a test may hook into the metrics type TestDerivationMetrics struct { - FnRecordL1ReorgDepth func(d uint64) - FnRecordL1Ref func(name string, ref eth.L1BlockRef) - FnRecordL2Ref func(name string, ref eth.L2BlockRef) - FnRecordUnsafePayloads func(length uint64, memSize uint64, next eth.BlockID) + FnRecordL1ReorgDepth func(d uint64) + FnRecordL1Ref func(name string, ref eth.L1BlockRef) + FnRecordL2Ref func(name string, ref eth.L2BlockRef) + FnRecordUnsafePayloads func(length uint64, memSize uint64, next eth.BlockID) + FnRecordChannelInputBytes func(inputCompresedBytes int) } func (t *TestDerivationMetrics) RecordL1ReorgDepth(d uint64) { @@ -35,6 +38,12 @@ func (t *TestDerivationMetrics) RecordUnsafePayloadsBuffer(length uint64, memSiz } } +func (t *TestDerivationMetrics) RecordChannelInputBytes(inputCompresedBytes int) { + if t.FnRecordChannelInputBytes != nil { + t.FnRecordChannelInputBytes(inputCompresedBytes) + } +} + type TestRPCMetrics struct{} func (n *TestRPCMetrics) RecordRPCServerRequest(method string) func() {