Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove AddSingularBatch from ChannelOut interface (prefer AddBlock) #12079

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions op-batcher/batcher/channel_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,20 +178,16 @@ func (c *ChannelBuilder) AddBlock(block *types.Block) (*derive.L1BlockInfo, erro
return nil, c.FullErr()
}

batch, l1info, err := derive.BlockToSingularBatch(c.rollupCfg, block)
if err != nil {
return l1info, fmt.Errorf("converting block to batch: %w", err)
}

if err = c.co.AddSingularBatch(batch, l1info.SequenceNumber); errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.ErrCompressorFull) {
l1info, err := c.co.AddBlock(c.rollupCfg, block)
if errors.Is(err, derive.ErrTooManyRLPBytes) || errors.Is(err, derive.ErrCompressorFull) {
c.setFullErr(err)
return l1info, c.FullErr()
} else if err != nil {
return l1info, fmt.Errorf("adding block to channel out: %w", err)
}

c.blocks = append(c.blocks, block)
c.updateSwTimeout(batch)
c.updateSwTimeout(l1info.Number)

if l1info.Number > c.latestL1Origin.Number {
c.latestL1Origin = eth.BlockID{
Expand Down Expand Up @@ -252,8 +248,8 @@ func (c *ChannelBuilder) updateDurationTimeout(l1BlockNum uint64) {
// derived from the batch's origin L1 block. The timeout is only moved forward
// if the derived sequencer window timeout is earlier than the currently set
// timeout.
func (c *ChannelBuilder) updateSwTimeout(batch *derive.SingularBatch) {
timeout := uint64(batch.EpochNum) + c.cfg.SeqWindowSize - c.cfg.SubSafetyMargin
func (c *ChannelBuilder) updateSwTimeout(l1InfoNumber uint64) {
timeout := l1InfoNumber + c.cfg.SeqWindowSize - c.cfg.SubSafetyMargin
c.updateTimeout(timeout, ErrSeqWindowClose)
}

Expand Down
4 changes: 2 additions & 2 deletions op-batcher/batcher/channel_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func FuzzSeqWindowClose(f *testing.F) {

// Check the timeout
cb.timeout = timeout
cb.updateSwTimeout(&derive.SingularBatch{EpochNum: rollup.Epoch(epochNum)})
cb.updateSwTimeout(epochNum)
calculatedTimeout := epochNum + seqWindowSize - subSafetyMargin
if timeout > calculatedTimeout && calculatedTimeout != 0 {
cb.CheckTimeout(calculatedTimeout)
Expand Down Expand Up @@ -278,7 +278,7 @@ func FuzzSeqWindowZeroTimeoutClose(f *testing.F) {

// Check the timeout
cb.timeout = 0
cb.updateSwTimeout(&derive.SingularBatch{EpochNum: rollup.Epoch(epochNum)})
cb.updateSwTimeout(epochNum)
calculatedTimeout := epochNum + seqWindowSize - subSafetyMargin
cb.CheckTimeout(calculatedTimeout)
if cb.timeout != 0 {
Expand Down
26 changes: 13 additions & 13 deletions op-e2e/actions/helpers/garbage_channel_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type Writer interface {
type ChannelOutIface interface {
ID() derive.ChannelID
Reset() error
AddBlock(rollupCfg *rollup.Config, block *types.Block) error
AddBlock(rollupCfg *rollup.Config, block *types.Block) (*derive.L1BlockInfo, error)
ReadyBytes() int
Flush() error
Close() error
Expand Down Expand Up @@ -157,19 +157,19 @@ func (co *GarbageChannelOut) Reset() error {
// error that it returns is ErrTooManyRLPBytes. If this error
// is returned, the channel should be closed and a new one
// should be made.
func (co *GarbageChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) error {
func (co *GarbageChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) (*derive.L1BlockInfo, error) {
if co.closed {
return errors.New("already closed")
return nil, errors.New("already closed")
}
batch, err := blockToBatch(rollupCfg, block)
batch, l1Info, err := blockToBatch(rollupCfg, block)
if err != nil {
return err
return nil, err
}
// We encode to a temporary buffer to determine the encoded length to
// ensure that the total size of all RLP elements is less than or equal to MAX_RLP_BYTES_PER_CHANNEL
var buf bytes.Buffer
if err := rlp.Encode(&buf, batch); err != nil {
return err
return nil, err
}
if co.cfg.MalformRLP {
// Malform the RLP by incrementing the length prefix by 1.
Expand All @@ -182,13 +182,13 @@ func (co *GarbageChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Blo
chainSpec := rollup.NewChainSpec(rollupCfg)
maxRLPBytesPerChannel := chainSpec.MaxRLPBytesPerChannel(block.Time())
if co.rlpLength+buf.Len() > int(maxRLPBytesPerChannel) {
return fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w",
return nil, fmt.Errorf("could not add %d bytes to channel of %d bytes, max is %d. err: %w",
buf.Len(), co.rlpLength, maxRLPBytesPerChannel, derive.ErrTooManyRLPBytes)
}
co.rlpLength += buf.Len()

_, err = io.Copy(co.compress, &buf)
return err
return l1Info, err
}

// ReadyBytes returns the number of bytes that the channel out can immediately output into a frame.
Expand Down Expand Up @@ -256,25 +256,25 @@ func (co *GarbageChannelOut) OutputFrame(w *bytes.Buffer, maxSize uint64) (uint1
}

// blockToBatch transforms a block into a batch object that can easily be RLP encoded.
func blockToBatch(rollupCfg *rollup.Config, block *types.Block) (*derive.BatchData, error) {
func blockToBatch(rollupCfg *rollup.Config, block *types.Block) (*derive.BatchData, *derive.L1BlockInfo, error) {
opaqueTxs := make([]hexutil.Bytes, 0, len(block.Transactions()))
for i, tx := range block.Transactions() {
if tx.Type() == types.DepositTxType {
continue
}
otx, err := tx.MarshalBinary()
if err != nil {
return nil, fmt.Errorf("could not encode tx %v in block %v: %w", i, tx.Hash(), err)
return nil, nil, fmt.Errorf("could not encode tx %v in block %v: %w", i, tx.Hash(), err)
}
opaqueTxs = append(opaqueTxs, otx)
}
l1InfoTx := block.Transactions()[0]
if l1InfoTx.Type() != types.DepositTxType {
return nil, derive.ErrNotDepositTx
return nil, nil, derive.ErrNotDepositTx
}
l1Info, err := derive.L1BlockInfoFromBytes(rollupCfg, block.Time(), l1InfoTx.Data())
if err != nil {
return nil, fmt.Errorf("could not parse the L1 Info deposit: %w", err)
return nil, nil, fmt.Errorf("could not parse the L1 Info deposit: %w", err)
}

singularBatch := &derive.SingularBatch{
Expand All @@ -285,5 +285,5 @@ func blockToBatch(rollupCfg *rollup.Config, block *types.Block) (*derive.BatchDa
Transactions: opaqueTxs,
}

return derive.NewBatchData(singularBatch), nil
return derive.NewBatchData(singularBatch), l1Info, nil
}
2 changes: 1 addition & 1 deletion op-e2e/actions/helpers/l2_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func (s *L2Batcher) Buffer(t Testing, opts ...BlockModifier) error {
require.NoError(t, err, "failed to create channel")
s.L2ChannelOut = ch
}
if err := s.L2ChannelOut.AddBlock(s.rollupCfg, block); err != nil {
if _, err := s.L2ChannelOut.AddBlock(s.rollupCfg, block); err != nil {
return err
}
ref, err := s.engCl.L2BlockRefByHash(t.Ctx(), block.Hash())
Expand Down
10 changes: 5 additions & 5 deletions op-e2e/actions/sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func TestBackupUnsafe(gt *testing.T) {
block = block.WithBody(types.Body{Transactions: []*types.Transaction{block.Transactions()[0], invalidTx}})
}
// Add A1, B2, B3, B4, B5 into the channel
err = channelOut.AddBlock(sd.RollupCfg, block)
_, err = channelOut.AddBlock(sd.RollupCfg, block)
require.NoError(t, err)
}

Expand Down Expand Up @@ -414,7 +414,7 @@ func TestBackupUnsafeReorgForkChoiceInputError(gt *testing.T) {
block = block.WithBody(types.Body{Transactions: []*types.Transaction{block.Transactions()[0], invalidTx}})
}
// Add A1, B2, B3, B4, B5 into the channel
err = channelOut.AddBlock(sd.RollupCfg, block)
_, err = channelOut.AddBlock(sd.RollupCfg, block)
require.NoError(t, err)
}

Expand Down Expand Up @@ -547,7 +547,7 @@ func TestBackupUnsafeReorgForkChoiceNotInputError(gt *testing.T) {
block = block.WithBody(types.Body{Transactions: []*types.Transaction{block.Transactions()[0], invalidTx}})
}
// Add A1, B2, B3, B4, B5 into the channel
err = channelOut.AddBlock(sd.RollupCfg, block)
_, err = channelOut.AddBlock(sd.RollupCfg, block)
require.NoError(t, err)
}

Expand Down Expand Up @@ -924,7 +924,7 @@ func TestInvalidPayloadInSpanBatch(gt *testing.T) {
block = block.WithBody(types.Body{Transactions: []*types.Transaction{block.Transactions()[0], invalidTx}})
}
// Add A1 ~ A12 into the channel
err = channelOut.AddBlock(sd.RollupCfg, block)
_, err = channelOut.AddBlock(sd.RollupCfg, block)
require.NoError(t, err)
}

Expand Down Expand Up @@ -973,7 +973,7 @@ func TestInvalidPayloadInSpanBatch(gt *testing.T) {
block = block.WithBody(types.Body{Transactions: []*types.Transaction{block.Transactions()[0], tx}})
}
// Add B1, A2 ~ A12 into the channel
err = channelOut.AddBlock(sd.RollupCfg, block)
_, err = channelOut.AddBlock(sd.RollupCfg, block)
require.NoError(t, err)
}
// Submit span batch(B1, A2, ... A12)
Expand Down
76 changes: 59 additions & 17 deletions op-node/benchmarks/batchbuilding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"github.com/ethereum-optimism/optimism/op-batcher/compressor"
"github.com/ethereum-optimism/optimism/op-node/rollup"
"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/testutils"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -101,6 +104,40 @@ func channelOutByType(b *testing.B, batchType uint, cd compressorDetails) (deriv
return nil, fmt.Errorf("unsupported batch type: %d", batchType)
}

func randomBlock(cfg *rollup.Config, rng *rand.Rand, txCount int, timestamp uint64) (*types.Block, error) {
batch := derive.RandomSingularBatch(rng, txCount, cfg.L2ChainID)
batch.Timestamp = timestamp
return singularBatchToBlock(cfg, batch)
}

// singularBatchToBlock converts a singular batch to a block for use in the benchmarks. This function
// should only be used for testing purposes, as the batch input doesn't contain the necessary information
// to build the full block (only non-deposit transactions and a subset of header fields are populated).
func singularBatchToBlock(rollupCfg *rollup.Config, batch *derive.SingularBatch) (*types.Block, error) {
l1InfoTx, err := derive.L1InfoDeposit(rollupCfg, eth.SystemConfig{}, 0, &testutils.MockBlockInfo{
InfoNum: uint64(batch.EpochNum),
InfoHash: batch.EpochHash,
}, batch.Timestamp)
if err != nil {
return nil, fmt.Errorf("could not build L1 Info transaction: %w", err)
}
txs := []*types.Transaction{types.NewTx(l1InfoTx)}
for i, opaqueTx := range batch.Transactions {
var tx types.Transaction
err = tx.UnmarshalBinary(opaqueTx)
if err != nil {
return nil, fmt.Errorf("could not decode tx %d: %w", i, err)
}
txs = append(txs, &tx)
}
return types.NewBlockWithHeader(&types.Header{
ParentHash: batch.ParentHash,
Time: batch.Timestamp,
}).WithBody(types.Body{
Transactions: txs,
}), nil
}

// a test case for the benchmark controls the number of batches and transactions per batch,
// as well as the batch type and compressor used
type BatchingBenchmarkTC struct {
Expand Down Expand Up @@ -155,16 +192,17 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) {
}

for _, tc := range tests {
chainID := big.NewInt(333)
cfg := &rollup.Config{L2ChainID: big.NewInt(333)}
rng := rand.New(rand.NewSource(0x543331))
// pre-generate batches to keep the benchmark from including the random generation
batches := make([]*derive.SingularBatch, tc.BatchCount)
blocks := make([]*types.Block, tc.BatchCount)
t := time.Now()
for i := 0; i < tc.BatchCount; i++ {
batches[i] = derive.RandomSingularBatch(rng, tc.txPerBatch, chainID)
// set the timestamp to increase with each batch
// to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix())
var err error
blocks[i], err = randomBlock(cfg, rng, tc.txPerBatch, uint64(t.Add(time.Duration(i)*time.Second).Unix()))
require.NoError(b, err)
}
b.Run(tc.String(), func(b *testing.B) {
// reset the compressor used in the test case
Expand All @@ -174,13 +212,13 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) {
cout, _ := channelOutByType(b, tc.BatchType, tc.cd)
// add all but the final batch to the channel out
for i := 0; i < tc.BatchCount-1; i++ {
err := cout.AddSingularBatch(batches[i], 0)
_, err := cout.AddBlock(cfg, blocks[i])
require.NoError(b, err)
}
// measure the time to add the final batch
b.StartTimer()
// add the final batch to the channel out
err := cout.AddSingularBatch(batches[tc.BatchCount-1], 0)
_, err := cout.AddBlock(cfg, blocks[tc.BatchCount-1])
require.NoError(b, err)
}
})
Expand All @@ -193,7 +231,7 @@ func BenchmarkFinalBatchChannelOut(b *testing.B) {
// Hint: use -benchtime=1x to run the benchmarks for a single iteration
// it is not currently designed to use b.N
func BenchmarkIncremental(b *testing.B) {
chainID := big.NewInt(333)
cfg := &rollup.Config{L2ChainID: big.NewInt(333)}
rng := rand.New(rand.NewSource(0x543331))
// use the real compressor for this benchmark
// use batchCount as the number of batches to add in each benchmark iteration
Expand Down Expand Up @@ -226,17 +264,20 @@ func BenchmarkIncremental(b *testing.B) {
b.StopTimer()
// prepare the batches
t := time.Now()
batches := make([]*derive.SingularBatch, tc.BatchCount)
blocks := make([]*types.Block, tc.BatchCount)
for i := 0; i < tc.BatchCount; i++ {
t := t.Add(time.Second)
batches[i] = derive.RandomSingularBatch(rng, tc.txPerBatch, chainID)
// set the timestamp to increase with each batch
// to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Unix())
t = t.Add(time.Second)
blocks[i], err = randomBlock(cfg, rng, tc.txPerBatch, uint64(t.Unix()))
if err != nil {
done = true
return
}
}
b.StartTimer()
for i := 0; i < tc.BatchCount; i++ {
err := cout.AddSingularBatch(batches[i], 0)
_, err := cout.AddBlock(cfg, blocks[i])
if err != nil {
done = true
return
Expand Down Expand Up @@ -280,16 +321,17 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) {
}

for _, tc := range tests {
chainID := big.NewInt(333)
cfg := &rollup.Config{L2ChainID: big.NewInt(333)}
rng := rand.New(rand.NewSource(0x543331))
// pre-generate batches to keep the benchmark from including the random generation
batches := make([]*derive.SingularBatch, tc.BatchCount)
blocks := make([]*types.Block, tc.BatchCount)
t := time.Now()
for i := 0; i < tc.BatchCount; i++ {
batches[i] = derive.RandomSingularBatch(rng, tc.txPerBatch, chainID)
// set the timestamp to increase with each batch
// to leverage optimizations in the Batch Linked List
batches[i].Timestamp = uint64(t.Add(time.Duration(i) * time.Second).Unix())
var err error
blocks[i], err = randomBlock(cfg, rng, tc.txPerBatch, uint64(t.Add(time.Duration(i)*time.Second).Unix()))
require.NoError(b, err)
}
b.Run(tc.String(), func(b *testing.B) {
// reset the compressor used in the test case
Expand All @@ -300,7 +342,7 @@ func BenchmarkAllBatchesChannelOut(b *testing.B) {
b.StartTimer()
// add all batches to the channel out
for i := 0; i < tc.BatchCount; i++ {
err := cout.AddSingularBatch(batches[i], 0)
_, err := cout.AddBlock(cfg, blocks[i])
require.NoError(b, err)
}
}
Expand Down
23 changes: 9 additions & 14 deletions op-node/rollup/derive/channel_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ type Compressor interface {
type ChannelOut interface {
ID() ChannelID
Reset() error
AddBlock(*rollup.Config, *types.Block) error
AddSingularBatch(*SingularBatch, uint64) error
AddBlock(*rollup.Config, *types.Block) (*L1BlockInfo, error)
InputBytes() int
ReadyBytes() int
Flush() error
Expand Down Expand Up @@ -107,31 +106,27 @@ func (co *SingularChannelOut) Reset() error {
return err
}

// AddBlock adds a block to the channel. It returns the RLP encoded byte size
// AddBlock adds a block to the channel. It returns the block's L1BlockInfo
mdehoog marked this conversation as resolved.
Show resolved Hide resolved
// and an error if there is a problem adding the block. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made.
func (co *SingularChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) error {
func (co *SingularChannelOut) AddBlock(rollupCfg *rollup.Config, block *types.Block) (*L1BlockInfo, error) {
if co.closed {
return ErrChannelOutAlreadyClosed
return nil, ErrChannelOutAlreadyClosed
}

batch, l1Info, err := BlockToSingularBatch(rollupCfg, block)
if err != nil {
return err
return nil, fmt.Errorf("converting block to batch: %w", err)
}
return co.AddSingularBatch(batch, l1Info.SequenceNumber)
return l1Info, co.addSingularBatch(batch, l1Info.SequenceNumber)
}

// AddSingularBatch adds a batch to the channel. It returns the RLP encoded byte size
// and an error if there is a problem adding the batch. The only sentinel error
// addSingularBatch adds a batch to the channel. It returns
// an error if there is a problem adding the batch. The only sentinel error
// that it returns is ErrTooManyRLPBytes. If this error is returned, the channel
// should be closed and a new one should be made.
//
// AddSingularBatch should be used together with BlockToBatch if you need to access the
// BatchData before adding a block to the channel. It isn't possible to access
// the batch data with AddBlock.
func (co *SingularChannelOut) AddSingularBatch(batch *SingularBatch, _ uint64) error {
func (co *SingularChannelOut) addSingularBatch(batch *SingularBatch, _ uint64) error {
if co.closed {
return ErrChannelOutAlreadyClosed
}
Expand Down
Loading