Skip to content
This repository has been archived by the owner on May 11, 2024. It is now read-only.

feat(driver): optimize error handling for CalldataSyncer #262

Merged
merged 2 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 25 additions & 33 deletions driver/chain_syncer/calldata/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (s *Syncer) onBlockProposed(
txListBytes = []byte{}
}

payloadData, rpcError, payloadError := s.insertNewHead(
payloadData, err := s.insertNewHead(
ctx,
event,
parent,
Expand All @@ -227,16 +227,8 @@ func (s *Syncer) onBlockProposed(
l1Origin,
)

// RPC errors are recoverable.
if rpcError != nil {
return fmt.Errorf("failed to insert new head to L2 execution engine: %w", rpcError)
}

if payloadError != nil {
log.Warn(
"Ignore invalid block context", "blockID", event.Id, "payloadError", payloadError, "payloadData", payloadData,
)
return nil
if err != nil {
return fmt.Errorf("failed to insert new head to L2 execution engine: %w", err)
}

log.Debug("Payload data", "hash", payloadData.BlockHash, "txs", len(payloadData.Transactions))
Expand Down Expand Up @@ -272,7 +264,7 @@ func (s *Syncer) insertNewHead(
headBlockID *big.Int,
txListBytes []byte,
l1Origin *rawdb.L1Origin,
) (*engine.ExecutableData, error, error) {
) (*engine.ExecutableData, error) {
log.Debug(
"Try to insert a new L2 head block",
"parentNumber", parent.Number,
Expand All @@ -285,14 +277,14 @@ func (s *Syncer) insertNewHead(
var txList []*types.Transaction
if len(txListBytes) != 0 {
if err := rlp.DecodeBytes(txListBytes, &txList); err != nil {
log.Info("Ignore invalid txList bytes", "blockID", event.Id)
return nil, nil, err
log.Error("Invalid txList bytes", "blockID", event.Id)
return nil, err
}
}

parentTimestamp, err := s.rpc.TaikoL2.ParentTimestamp(&bind.CallOpts{BlockNumber: parent.Number})
if err != nil {
return nil, nil, err
return nil, err
}

// Get L2 baseFee
Expand All @@ -303,7 +295,7 @@ func (s *Syncer) insertNewHead(
parent.GasUsed,
)
if err != nil {
return nil, nil, fmt.Errorf("failed to get L2 baseFee: %w", encoding.TryParsingCustomError(err))
return nil, fmt.Errorf("failed to get L2 baseFee: %w", encoding.TryParsingCustomError(err))
}

log.Debug(
Expand All @@ -330,17 +322,17 @@ func (s *Syncer) insertNewHead(
parent.GasUsed,
)
if err != nil {
return nil, nil, fmt.Errorf("failed to create TaikoL2.anchor transaction: %w", err)
return nil, fmt.Errorf("failed to create TaikoL2.anchor transaction: %w", err)
}

txList = append([]*types.Transaction{anchorTx}, txList...)

if txListBytes, err = rlp.EncodeToBytes(txList); err != nil {
log.Warn("Encode txList error", "blockID", event.Id, "error", err)
return nil, nil, err
log.Error("Encode txList error", "blockID", event.Id, "error", err)
return nil, err
}

payload, rpcErr, payloadErr := s.createExecutionPayloads(
payload, err := s.createExecutionPayloads(
ctx,
event,
parent.Hash(),
Expand All @@ -351,8 +343,8 @@ func (s *Syncer) insertNewHead(
withdrawals,
)

if rpcErr != nil || payloadErr != nil {
return nil, rpcErr, payloadErr
if err != nil {
return nil, fmt.Errorf("failed to create execution payloads: %w", err)
}

fc := &engine.ForkchoiceStateV1{HeadBlockHash: parent.Hash()}
Expand All @@ -361,13 +353,13 @@ func (s *Syncer) insertNewHead(
fc.HeadBlockHash = payload.BlockHash
fcRes, err := s.rpc.L2Engine.ForkchoiceUpdate(ctx, fc, nil)
if err != nil {
return nil, err, nil
return nil, err
}
if fcRes.PayloadStatus.Status != engine.VALID {
return nil, nil, fmt.Errorf("unexpected ForkchoiceUpdate response status: %s", fcRes.PayloadStatus.Status)
return nil, fmt.Errorf("unexpected ForkchoiceUpdate response status: %s", fcRes.PayloadStatus.Status)
}

return payload, nil, nil
return payload, nil
}

// createExecutionPayloads creates a new execution payloads through
Expand All @@ -381,7 +373,7 @@ func (s *Syncer) createExecutionPayloads(
txListBytes []byte,
baseFeee *big.Int,
withdrawals types.Withdrawals,
) (payloadData *engine.ExecutableData, rpcError error, payloadError error) {
) (payloadData *engine.ExecutableData, err error) {
fc := &engine.ForkchoiceStateV1{HeadBlockHash: parentHash}
attributes := &engine.PayloadAttributes{
Timestamp: event.Meta.Timestamp,
Expand All @@ -406,31 +398,31 @@ func (s *Syncer) createExecutionPayloads(
// Step 1, prepare a payload
fcRes, err := s.rpc.L2Engine.ForkchoiceUpdate(ctx, fc, attributes)
if err != nil {
return nil, err, nil
return nil, fmt.Errorf("failed to update fork choice: %w", err)
}
if fcRes.PayloadStatus.Status != engine.VALID {
return nil, nil, fmt.Errorf("unexpected ForkchoiceUpdate response status: %s", fcRes.PayloadStatus.Status)
return nil, fmt.Errorf("unexpected ForkchoiceUpdate response status: %s", fcRes.PayloadStatus.Status)
}
if fcRes.PayloadID == nil {
return nil, nil, errors.New("empty payload ID")
return nil, errors.New("empty payload ID")
}

// Step 2, get the payload
payload, err := s.rpc.L2Engine.GetPayload(ctx, fcRes.PayloadID)
if err != nil {
return nil, err, nil
return nil, fmt.Errorf("failed to get payload: %w", err)
}

log.Debug("Payload", "payload", payload)

// Step 3, execute the payload
execStatus, err := s.rpc.L2Engine.NewPayload(ctx, payload)
if err != nil {
return nil, err, nil
return nil, fmt.Errorf("failed to create a new payload: %w", err)
}
if execStatus.Status != engine.VALID {
return nil, nil, fmt.Errorf("unexpected NewPayload response status: %s", execStatus.Status)
return nil, fmt.Errorf("unexpected NewPayload response status: %s", execStatus.Status)
}

return payload, nil, nil
return payload, nil
}
5 changes: 2 additions & 3 deletions driver/chain_syncer/calldata/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *CalldataSyncerTestSuite) TestInsertNewHead() {
s.Nil(err)
l1Head, err := s.s.rpc.L1.BlockByNumber(context.Background(), nil)
s.Nil(err)
_, rpcErr, payloadErr := s.s.insertNewHead(
_, err = s.s.insertNewHead(
context.Background(),
&bindings.TaikoL1ClientBlockProposed{
Id: common.Big1,
Expand All @@ -99,8 +99,7 @@ func (s *CalldataSyncerTestSuite) TestInsertNewHead() {
L1BlockHash: testutils.RandomHash(),
},
)
s.Nil(rpcErr)
s.Nil(payloadErr)
s.Nil(err)
}

func (s *CalldataSyncerTestSuite) TestTreasuryIncomeAllAnchors() {
Expand Down
4 changes: 2 additions & 2 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (d *Driver) Close() {
// eventLoop starts the main loop of a L2 execution engine's driver.
func (d *Driver) eventLoop() {
defer d.wg.Done()
exponentialBackoff := backoff.NewExponentialBackOff()
constatnBackoff := backoff.NewConstantBackOff(12 * time.Second)

// reqSync requests performing a synchronising operation, won't block
// if we are already synchronising.
Expand All @@ -129,7 +129,7 @@ func (d *Driver) eventLoop() {

// doSyncWithBackoff performs a synchronising operation with a backoff strategy.
doSyncWithBackoff := func() {
if err := backoff.Retry(d.doSync, exponentialBackoff); err != nil {
if err := backoff.Retry(d.doSync, constatnBackoff); err != nil {
log.Error("Sync L2 execution engine's block chain error", "error", err)
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/chain_iterator/block_batch_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"math/big"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/ethereum/go-ethereum"
Expand Down Expand Up @@ -163,7 +164,7 @@ func (i *BlockBatchIterator) Iter() error {
return nil
}

if err := backoff.Retry(iterOp, backoff.NewExponentialBackOff()); err != nil {
if err := backoff.Retry(iterOp, backoff.NewConstantBackOff(12*time.Second)); err != nil {
return err
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/rpc/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"fmt"
"net/url"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc"
"github.com/prysmaticlabs/prysm/v4/network"
"github.com/prysmaticlabs/prysm/v4/network/authorization"
Expand All @@ -19,9 +21,10 @@ func DialClientWithBackoff(ctx context.Context, url string) (*ethclient.Client,
if err := backoff.Retry(
func() (err error) {
client, err = ethclient.DialContext(ctx, url)
log.Error("Dial ethclient error", "url", url, "error", err)
return err
},
backoff.NewExponentialBackOff(),
backoff.NewConstantBackOff(12*time.Second),
); err != nil {
return nil, err
}
Expand All @@ -37,13 +40,14 @@ func DialEngineClientWithBackoff(ctx context.Context, url string, jwtSecret stri
func() (err error) {
client, err := DialEngineClient(ctx, url, jwtSecret)
if err != nil {
log.Error("Dial engine client error", "url", url, "error", err)
return err
}

engineClient = &EngineClient{client}
return nil
},
backoff.NewExponentialBackOff(),
backoff.NewConstantBackOff(12*time.Second),
); err != nil {
return nil, err
}
Expand Down