From 7e8f42aabc44dd790e776750c7a59621f8214bd4 Mon Sep 17 00:00:00 2001 From: corver Date: Wed, 13 Nov 2024 12:09:34 +0200 Subject: [PATCH] fix(octane/evmengine): handle engine errors --- halo/app/prouter.go | 12 ++ lib/ethclient/client_test.go | 56 +-------- lib/ethclient/engineclient.go | 106 ++++++++---------- octane/evmengine/keeper/abci.go | 32 +++--- octane/evmengine/keeper/abci_internal_test.go | 8 +- octane/evmengine/keeper/msg_server.go | 25 +---- octane/evmengine/keeper/proposal_server.go | 7 +- 7 files changed, 86 insertions(+), 160 deletions(-) diff --git a/halo/app/prouter.go b/halo/app/prouter.go index 5075d141e..ed988233a 100644 --- a/halo/app/prouter.go +++ b/halo/app/prouter.go @@ -2,6 +2,7 @@ package app import ( "context" + "time" atypes "github.com/omni-network/omni/halo/attest/types" "github.com/omni-network/omni/lib/errors" @@ -16,6 +17,11 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" ) +// processTimeout is the maximum time to process a proposal. +// Timeout results in rejecting the proposal, which could negatively affect liveness. +// But it avoids blocking forever, which also negatively affects liveness. +const processTimeout = time.Minute + // makeProcessProposalRouter creates a new process proposal router that only routes // expected messages to expected modules. func makeProcessProposalRouter(app *App) *baseapp.MsgServiceRouter { @@ -32,6 +38,11 @@ func makeProcessProposalRouter(app *App) *baseapp.MsgServiceRouter { // It also updates some external state. func makeProcessProposalHandler(router *baseapp.MsgServiceRouter, txConfig client.TxConfig) sdk.ProcessProposalHandler { return func(ctx sdk.Context, req *abci.RequestProcessProposal) (*abci.ResponseProcessProposal, error) { + // Only allow 10s to process a proposal. Reject proposal otherwise. + timeoutCtx, timeoutCancel := context.WithTimeout(ctx.Context(), processTimeout) + defer timeoutCancel() + ctx = ctx.WithContext(timeoutCtx) + // Ensure the proposal includes quorum vote extensions (unless first block). if req.Height > 1 { var totalPower, votedPower int64 @@ -86,6 +97,7 @@ func makeProcessProposalHandler(router *baseapp.MsgServiceRouter, txConfig clien } } +//nolint:unparam // Explicitly return nil error func rejectProposal(ctx context.Context, err error) (*abci.ResponseProcessProposal, error) { log.Error(ctx, "Rejecting process proposal", err) return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, nil diff --git a/lib/ethclient/client_test.go b/lib/ethclient/client_test.go index 5fc011a08..5290c1416 100644 --- a/lib/ethclient/client_test.go +++ b/lib/ethclient/client_test.go @@ -19,23 +19,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestGetPayloadV2(t *testing.T) { - t.Parallel() - fuzzer := fuzz.New().NilChance(0) - - var param1 engine.PayloadID - fuzzer.Fuzz(¶m1) - - var resp engine.ExecutionPayloadEnvelope - fuzzer.Fuzz(&resp) - - call := func(ctx context.Context, engineCl ethclient.EngineClient) (any, error) { - return engineCl.GetPayloadV2(ctx, param1) - } - - testEndpoint(t, call, resp, param1) -} - func TestGetPayloadV3(t *testing.T) { t.Parallel() fuzzer := fuzz.New().NilChance(0) @@ -53,23 +36,6 @@ func TestGetPayloadV3(t *testing.T) { testEndpoint(t, call, resp, param1) } -func TestNewPayloadV2(t *testing.T) { - t.Parallel() - fuzzer := fuzz.New().NilChance(0) - - var param1 engine.ExecutableData - fuzzer.Fuzz(¶m1) - - var resp engine.PayloadStatusV1 - fuzzer.Fuzz(&resp) - - call := func(ctx context.Context, engineCl ethclient.EngineClient) (any, error) { - return engineCl.NewPayloadV2(ctx, param1) - } - - testEndpoint(t, call, resp, param1) -} - func TestNewPayloadV3(t *testing.T) { t.Parallel() fuzzer := fuzz.New().NilChance(0) @@ -85,6 +51,7 @@ func TestNewPayloadV3(t *testing.T) { var resp engine.PayloadStatusV1 fuzzer.Fuzz(&resp) + resp.Status = engine.VALID call := func(ctx context.Context, engineCl ethclient.EngineClient) (any, error) { return engineCl.NewPayloadV3(ctx, param1, param2, ¶m3) @@ -93,26 +60,6 @@ func TestNewPayloadV3(t *testing.T) { testEndpoint(t, call, resp, param1, param2, param3) } -func TestForkchoiceUpdatedV2(t *testing.T) { - t.Parallel() - fuzzer := fuzz.New().NilChance(0) - - var param1 engine.ForkchoiceStateV1 - fuzzer.Fuzz(¶m1) - - var param2 engine.PayloadAttributes - fuzzer.Fuzz(¶m2) - - var resp engine.ForkChoiceResponse - fuzzer.Fuzz(&resp) - - call := func(ctx context.Context, engineCl ethclient.EngineClient) (any, error) { - return engineCl.ForkchoiceUpdatedV2(ctx, param1, ¶m2) - } - - testEndpoint(t, call, resp, param1, param2) -} - func TestForkchoiceUpdatedV3(t *testing.T) { t.Parallel() fuzzer := fuzz.New().NilChance(0) @@ -125,6 +72,7 @@ func TestForkchoiceUpdatedV3(t *testing.T) { var resp engine.ForkChoiceResponse fuzzer.Fuzz(&resp) + resp.PayloadStatus.Status = engine.VALID call := func(ctx context.Context, engineCl ethclient.EngineClient) (any, error) { return engineCl.ForkchoiceUpdatedV3(ctx, param1, ¶m2) diff --git a/lib/ethclient/engineclient.go b/lib/ethclient/engineclient.go index f083df6c1..d295359d0 100644 --- a/lib/ethclient/engineclient.go +++ b/lib/ethclient/engineclient.go @@ -6,6 +6,7 @@ import ( "time" "github.com/omni-network/omni/lib/errors" + "github.com/omni-network/omni/lib/log" "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" @@ -30,25 +31,14 @@ const ( type EngineClient interface { Client - // NewPayloadV2 creates an Eth1 block, inserts it in the chain, and returns the status of the chain. - NewPayloadV2(ctx context.Context, params engine.ExecutableData) (engine.PayloadStatusV1, error) // NewPayloadV3 creates an Eth1 block, inserts it in the chain, and returns the status of the chain. NewPayloadV3(ctx context.Context, params engine.ExecutableData, versionedHashes []common.Hash, beaconRoot *common.Hash) (engine.PayloadStatusV1, error) - // ForkchoiceUpdatedV2 has several responsibilities: - // - It sets the chain the head. - // - And/or it sets the chain's finalized block hash. - // - And/or it starts assembling (async) a block with the payload attributes. - ForkchoiceUpdatedV2(ctx context.Context, update engine.ForkchoiceStateV1, - payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) - // ForkchoiceUpdatedV3 is equivalent to V2 with the addition of parent beacon block root in the payload attributes. ForkchoiceUpdatedV3(ctx context.Context, update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) - // GetPayloadV2 returns a cached payload by id. - GetPayloadV2(ctx context.Context, payloadID engine.PayloadID) (*engine.ExecutionPayloadEnvelope, error) // GetPayloadV3 returns a cached payload by id. GetPayloadV3(ctx context.Context, payloadID engine.PayloadID) (*engine.ExecutionPayloadEnvelope, error) } @@ -77,50 +67,39 @@ func NewAuthClient(ctx context.Context, urlAddr string, jwtSecret []byte) (Engin }, nil } -func (c engineClient) NewPayloadV2(ctx context.Context, params engine.ExecutableData) (engine.PayloadStatusV1, error) { - const endpoint = "new_payload_v2" - defer latency(c.chain, endpoint)() - - var resp engine.PayloadStatusV1 - err := c.cl.Client().CallContext(ctx, &resp, newPayloadV2, params) - if err != nil { - incError(c.chain, endpoint) - return engine.PayloadStatusV1{}, errors.Wrap(err, "rpc new payload v2") - } - - return resp, nil -} - func (c engineClient) NewPayloadV3(ctx context.Context, params engine.ExecutableData, versionedHashes []common.Hash, beaconRoot *common.Hash, ) (engine.PayloadStatusV1, error) { const endpoint = "new_payload_v3" defer latency(c.chain, endpoint)() + // isStatusOk returns true if the response status is valid. + isStatusOk := func(status engine.PayloadStatusV1) bool { + return map[string]bool{ + engine.VALID: true, + engine.INVALID: true, + engine.SYNCING: true, + engine.ACCEPTED: true, + }[status.Status] + } + var resp engine.PayloadStatusV1 err := c.cl.Client().CallContext(ctx, &resp, newPayloadV3, params, versionedHashes, beaconRoot) - if err != nil { + if isStatusOk(resp) { + // Swallow errors when geth returns errors along with proper responses (but at least log it). + if err != nil { + log.Warn(ctx, "Ignoring new_payload_v3 error with proper response", err, "status", resp.Status) + } + + return resp, nil + } else if err != nil { incError(c.chain, endpoint) - return engine.PayloadStatusV1{}, errors.Wrap(err, "rpc new payload v3") - } - - return resp, nil -} - -func (c engineClient) ForkchoiceUpdatedV2(ctx context.Context, update engine.ForkchoiceStateV1, - payloadAttributes *engine.PayloadAttributes, -) (engine.ForkChoiceResponse, error) { - const endpoint = "forkchoice_updated_v2" - defer latency(c.chain, endpoint)() + return engine.PayloadStatusV1{}, errors.Wrap(err, "rpc new payload") + } /* else err==nil && status!=ok */ - var resp engine.ForkChoiceResponse - err := c.cl.Client().CallContext(ctx, &resp, forkchoiceUpdatedV2, update, payloadAttributes) - if err != nil { - incError(c.chain, endpoint) - return engine.ForkChoiceResponse{}, errors.Wrap(err, "rpc forkchoice updated v2") - } + incError(c.chain, endpoint) - return resp, nil + return engine.PayloadStatusV1{}, errors.New("nil error and unknown status", "status", resp.Status) } func (c engineClient) ForkchoiceUpdatedV3(ctx context.Context, update engine.ForkchoiceStateV1, @@ -129,30 +108,33 @@ func (c engineClient) ForkchoiceUpdatedV3(ctx context.Context, update engine.For const endpoint = "forkchoice_updated_v3" defer latency(c.chain, endpoint)() + // isStatusOk returns true if the response status is valid. + isStatusOk := func(resp engine.ForkChoiceResponse) bool { + return map[string]bool{ + engine.VALID: true, + engine.INVALID: true, + engine.SYNCING: true, + engine.ACCEPTED: false, // Unexpected in ForkchoiceUpdated + }[resp.PayloadStatus.Status] + } + var resp engine.ForkChoiceResponse err := c.cl.Client().CallContext(ctx, &resp, forkchoiceUpdatedV3, update, payloadAttributes) - if err != nil { + if isStatusOk(resp) { + // Swallow errors when geth returns errors along with proper responses (but at least log it). + if err != nil { + log.Warn(ctx, "Ignoring forkchoice_updated_v3 error with proper response", err, "status", resp.PayloadStatus.Status) + } + + return resp, nil + } else if err != nil { incError(c.chain, endpoint) return engine.ForkChoiceResponse{}, errors.Wrap(err, "rpc forkchoice updated v3") - } - - return resp, nil -} - -func (c engineClient) GetPayloadV2(ctx context.Context, payloadID engine.PayloadID) ( - *engine.ExecutionPayloadEnvelope, error, -) { - const endpoint = "get_payload_v2" - defer latency(c.chain, endpoint)() + } /* else err==nil && status!=ok */ - var resp engine.ExecutionPayloadEnvelope - err := c.cl.Client().CallContext(ctx, &resp, getPayloadV2, payloadID) - if err != nil { - incError(c.chain, endpoint) - return nil, errors.Wrap(err, "rpc get payload v2") - } + incError(c.chain, endpoint) - return &resp, nil + return engine.ForkChoiceResponse{}, errors.New("nil error and unknown status", "status", resp.PayloadStatus.Status) } func (c engineClient) GetPayloadV3(ctx context.Context, payloadID engine.PayloadID) ( diff --git a/octane/evmengine/keeper/abci.go b/octane/evmengine/keeper/abci.go index 0948bcdd9..c0dba73c1 100644 --- a/octane/evmengine/keeper/abci.go +++ b/octane/evmengine/keeper/abci.go @@ -3,9 +3,7 @@ package keeper import ( "context" "encoding/json" - "fmt" "log/slog" - "runtime/debug" "strings" "time" @@ -25,18 +23,20 @@ import ( authtypes "github.com/cosmos/cosmos-sdk/x/auth/types" ) +// prepareTimeout is the maximum time to prepare a proposal. +// Timeout results in proposing an empty consensus block. +const prepareTimeout = time.Second * 10 + // PrepareProposal returns a proposal for the next block. -// Note returning an error results in a panic cometbft and CONSENSUS_FAILURE log. +// Note returning an error results proposing an empty block. func (k *Keeper) PrepareProposal(ctx sdk.Context, req *abci.RequestPrepareProposal) ( *abci.ResponsePrepareProposal, error, ) { - defer func() { - if r := recover(); r != nil { - log.Error(ctx, "PrepareProposal panic", nil, "recover", r) - fmt.Println("panic stacktrace: \n" + string(debug.Stack())) //nolint:forbidigo // Print stacktrace - panic(r) - } - }() + // Only allow 10s to prepare a proposal. Propose empty block otherwise. + timeoutCtx, timeoutCancel := context.WithTimeout(ctx.Context(), prepareTimeout) + defer timeoutCancel() + ctx = ctx.WithContext(timeoutCtx) + if len(req.Txs) > 0 { return nil, errors.New("unexpected transactions in proposal") } else if req.MaxTxBytes < cmttypes.MaxBlockSizeBytes*9/10 { @@ -68,11 +68,13 @@ func (k *Keeper) PrepareProposal(ctx sdk.Context, req *abci.RequestPreparePropos if err != nil { log.Warn(ctx, "Preparing proposal failed: build new evm payload (will retry)", err) return false, nil // Retry - } else if fcr.PayloadStatus.Status != engine.VALID { - return false, errors.New("status not valid") // Abort, don't retry + } else if isSyncing(fcr.PayloadStatus) { + return false, errors.New("evm unexpectedly syncing") // Abort, don't retry + } else if invalid, err := isInvalid(fcr.PayloadStatus); invalid { + return false, errors.Wrap(err, "proposed invalid payload") // Abort, don't retry } else if fcr.PayloadID == nil { return false, errors.New("missing payload ID [BUG]") // Abort, don't retry - } + } /* else isValid(status) */ payloadID = *fcr.PayloadID @@ -193,7 +195,7 @@ func (k *Keeper) PostFinalize(ctx sdk.Context) error { // No need to wrap this in retryForever since this is a best-effort optimisation, if it fails, just skip it. fcr, err := k.startBuild(ctx, appHash, timestamp) - if err != nil || isUnknown(fcr.PayloadStatus) { + if err != nil { log.Warn(ctx, "Starting optimistic build failed", err, logAttr) return nil } else if isSyncing(fcr.PayloadStatus) { @@ -205,7 +207,7 @@ func (k *Keeper) PostFinalize(ctx sdk.Context) error { } else if fcr.PayloadID == nil { log.Error(ctx, "Starting optimistic build failed; missing payload ID [BUG]", nil, logAttr) return nil - } + } /* else isValid(status) */ k.setOptimisticPayload(*fcr.PayloadID, uint64(nextHeight)) diff --git a/octane/evmengine/keeper/abci_internal_test.go b/octane/evmengine/keeper/abci_internal_test.go index 453057c02..9d47e4d27 100644 --- a/octane/evmengine/keeper/abci_internal_test.go +++ b/octane/evmengine/keeper/abci_internal_test.go @@ -84,7 +84,7 @@ func TestKeeper_PrepareProposal(t *testing.T) { wantErr: true, }, { - name: "forkchoiceUpdateV2 not valid", + name: "forkchoiceUpdateV3 not valid", mockEngine: mockEngineAPI{ headerByTypeFunc: func(context.Context, ethclient.HeadType) (*types.Header, error) { fuzzer := ethclient.NewFuzzer(0) @@ -502,10 +502,6 @@ func (m *mockEngineAPI) HeaderByType(ctx context.Context, typ ethclient.HeadType return m.mock.HeaderByType(ctx, typ) } -func (m *mockEngineAPI) NewPayloadV2(ctx context.Context, params eengine.ExecutableData) (eengine.PayloadStatusV1, error) { - return m.mock.NewPayloadV2(ctx, params) -} - //nolint:nonamedreturns // Required for defer func (m *mockEngineAPI) NewPayloadV3(ctx context.Context, params eengine.ExecutableData, versionedHashes []common.Hash, beaconRoot *common.Hash) (resp eengine.PayloadStatusV1, err error) { if status, ok := m.maybeSync(); ok { @@ -540,7 +536,7 @@ func (m *mockEngineAPI) GetPayloadV3(ctx context.Context, payloadID eengine.Payl return m.mock.GetPayloadV3(ctx, payloadID) } -// pushPayload - invokes the ForkchoiceUpdatedV2 method on the mock engine and returns the payload ID. +// pushPayload - invokes the ForkchoiceUpdatedV3 method on the mock engine and returns the payload ID. func (m *mockEngineAPI) pushPayload(t *testing.T, ctx context.Context, feeRecipient common.Address, blockHash common.Hash, ts time.Time, appHash common.Hash) *eengine.PayloadID { t.Helper() state := eengine.ForkchoiceStateV1{ diff --git a/octane/evmengine/keeper/msg_server.go b/octane/evmengine/keeper/msg_server.go index 70505f1c4..05d9cdd57 100644 --- a/octane/evmengine/keeper/msg_server.go +++ b/octane/evmengine/keeper/msg_server.go @@ -36,10 +36,9 @@ func (s msgServer) ExecutionPayload(ctx context.Context, msg *types.MsgExecution err = retryForever(ctx, func(ctx context.Context) (bool, error) { status, err := pushPayload(ctx, s.engineCl, payload) - if err != nil || isUnknown(status) { + if err != nil { // We need to retry forever on networking errors, but can't easily identify them, so retry all errors. - log.Warn(ctx, "Processing finalized payload failed: push new payload to evm (will retry)", err, - "status", status.Status) + log.Warn(ctx, "Processing finalized payload failed: push new payload to evm (will retry)", err) return false, nil // Retry } else if invalid, err := isInvalid(status); invalid { @@ -50,7 +49,7 @@ func (s msgServer) ExecutionPayload(ctx context.Context, msg *types.MsgExecution } else if isSyncing(status) { // Payload pushed, but EVM syncing continue to ForkChoiceUpdate below log.Warn(ctx, "Processing finalized payload; evm syncing", nil) - } + } /* else isValid(status) */ return true, nil // Done }) @@ -67,10 +66,9 @@ func (s msgServer) ExecutionPayload(ctx context.Context, msg *types.MsgExecution err = retryForever(ctx, func(ctx context.Context) (bool, error) { fcr, err := s.engineCl.ForkchoiceUpdatedV3(ctx, fcs, nil) - if err != nil || isUnknown(fcr.PayloadStatus) { + if err != nil { // We need to retry forever on networking errors, but can't easily identify them, so retry all errors. - log.Warn(ctx, "Processing finalized payload failed: evm fork choice update (will retry)", err, - "status", fcr.PayloadStatus.Status) + log.Warn(ctx, "Processing finalized payload failed: evm fork choice update (will retry)", err) return false, nil // Retry } else if isSyncing(fcr.PayloadStatus) { @@ -83,7 +81,7 @@ func (s msgServer) ExecutionPayload(ctx context.Context, msg *types.MsgExecution "payload_height", payload.Number) return false, err // Abort, don't retry - } + } /* else isValid(status) */ return true, nil // Done }) @@ -188,17 +186,6 @@ func NewMsgServerImpl(keeper *Keeper) types.MsgServiceServer { var _ types.MsgServiceServer = msgServer{} -func isUnknown(status engine.PayloadStatusV1) bool { - if status.Status == engine.VALID || - status.Status == engine.INVALID || - status.Status == engine.SYNCING || - status.Status == engine.ACCEPTED { - return false - } - - return true -} - func isSyncing(status engine.PayloadStatusV1) bool { return status.Status == engine.SYNCING || status.Status == engine.ACCEPTED } diff --git a/octane/evmengine/keeper/proposal_server.go b/octane/evmengine/keeper/proposal_server.go index a10e21844..a17d66527 100644 --- a/octane/evmengine/keeper/proposal_server.go +++ b/octane/evmengine/keeper/proposal_server.go @@ -26,10 +26,9 @@ func (s proposalServer) ExecutionPayload(ctx context.Context, msg *types.MsgExec // Push the payload to the EVM. err = retryForever(ctx, func(ctx context.Context) (bool, error) { status, err := pushPayload(ctx, s.engineCl, payload) - if err != nil || isUnknown(status) { + if err != nil { // We need to retry forever on networking errors, but can't easily identify them, so retry all errors. - log.Warn(ctx, "Verifying proposal failed: push new payload to evm (will retry)", err, - "status", status.Status) + log.Warn(ctx, "Verifying proposal failed: push new payload to evm (will retry)", err) return false, nil // Retry } else if invalid, err := isInvalid(status); invalid { @@ -38,7 +37,7 @@ func (s proposalServer) ExecutionPayload(ctx context.Context, msg *types.MsgExec // If this is initial sync, we need to continue and set a target head to sync to, so don't retry. log.Warn(ctx, "Can't properly verifying proposal: evm syncing", err, "payload_height", payload.Number) - } + } /* else isValid(status) */ return true, nil // Done })