Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimistic execution consensus v2 #22560

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cometbft

import (
"context"
"cosmossdk.io/server/v2/cometbft/oe"
"crypto/sha256"
"errors"
"fmt"
Expand All @@ -12,7 +13,7 @@ import (
abci "github.com/cometbft/cometbft/abci/types"
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
gogoproto "github.com/cosmos/gogoproto/proto"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/reflect/protoregistry"

"cosmossdk.io/collections"
Expand Down Expand Up @@ -67,6 +68,11 @@ type Consensus[T transaction.Tx] struct {
extendVote handlers.ExtendVoteHandler
checkTxHandler handlers.CheckTxHandler[T]

// optimisticExec contains the context required for Optimistic Execution,
// including the goroutine handling.This is experimental and must be enabled
Copy link
Member

@julienrbrt julienrbrt Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's always enable it, and possibly remove the way to disable it (cc @tac0turtle)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to me

// by developers.
optimisticExec *oe.OptimisticExecution
Comment on lines +71 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider initializing optimisticExec to avoid nil references

The optimisticExec field is added to the Consensus struct but may remain nil if SetOptimisticExecution is not called. To prevent potential nil pointer dereferences, consider initializing optimisticExec during construction or ensure it is always set before use.


addrPeerFilter types.PeerFilter // filter peers by address and port
idPeerFilter types.PeerFilter // filter peers by node ID

Expand Down Expand Up @@ -117,6 +123,10 @@ func (c *Consensus[T]) SetStreamingManager(sm streaming.Manager) {
c.streaming = sm
}

func (c *Consensus[T]) SetOptimisticExecution(oe *oe.OptimisticExecution) {
c.optimisticExec = oe
}

// RegisterSnapshotExtensions registers the given extensions with the consensus module's snapshot manager.
// It allows additional snapshotter implementations to be used for creating and restoring snapshots.
func (c *Consensus[T]) RegisterSnapshotExtensions(extensions ...snapshots.ExtensionSnapshotter) error {
Expand Down Expand Up @@ -385,6 +395,14 @@ func (c *Consensus[T]) PrepareProposal(
return nil, errors.New("no prepare proposal function was set")
}

// Abort any running OE so it cannot overlap with `PrepareProposal`. This could happen if optimistic
// `internalFinalizeBlock` from previous round takes a long time, but consensus has moved on to next round.
// Overlap is undesirable, since `internalFinalizeBlock` and `PrepareProoposal` could share access to
// in-memory structs depending on application implementation.
// No-op if OE is not enabled.
// Similar call to Abort() is done in `ProcessProposal`.
c.optimisticExec.Abort()

Comment on lines +398 to +405
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential nil pointer dereference in PrepareProposal

In PrepareProposal, c.optimisticExec.Abort() is called without checking if optimisticExec is nil. If optimisticExec is nil, this will cause a runtime panic.

Apply this diff to add a nil check before calling Abort():

     // Similar call to Abort() is done in `ProcessProposal`.
-    c.optimisticExec.Abort()
+    if c.optimisticExec != nil {
+        c.optimisticExec.Abort()
+    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Abort any running OE so it cannot overlap with `PrepareProposal`. This could happen if optimistic
// `internalFinalizeBlock` from previous round takes a long time, but consensus has moved on to next round.
// Overlap is undesirable, since `internalFinalizeBlock` and `PrepareProoposal` could share access to
// in-memory structs depending on application implementation.
// No-op if OE is not enabled.
// Similar call to Abort() is done in `ProcessProposal`.
c.optimisticExec.Abort()
// Abort any running OE so it cannot overlap with `PrepareProposal`. This could happen if optimistic
// `internalFinalizeBlock` from previous round takes a long time, but consensus has moved on to next round.
// Overlap is undesirable, since `internalFinalizeBlock` and `PrepareProoposal` could share access to
// in-memory structs depending on application implementation.
// No-op if OE is not enabled.
// Similar call to Abort() is done in `ProcessProposal`.
if c.optimisticExec != nil {
c.optimisticExec.Abort()
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
Expand Down Expand Up @@ -421,6 +439,17 @@ func (c *Consensus[T]) ProcessProposal(
return nil, errors.New("no process proposal function was set")
}

// Since the application can get access to FinalizeBlock state and write to it,
// we must be sure to reset it in case ProcessProposal timeouts and is called
// again in a subsequent round. However, we only want to do this after we've
// processed the first block, as we want to avoid overwriting the finalizeState
// after state changes during InitChain.
if req.Height > int64(c.initialHeight) {
// abort any running OE
c.optimisticExec.Abort()
//c.setState(execModeFinalize, header)
}

Comment on lines +442 to +452
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential nil pointer dereference in ProcessProposal

In ProcessProposal, c.optimisticExec.Abort() is invoked without verifying if optimisticExec is nil. This could lead to a runtime panic if optimisticExec is not initialized.

Apply this diff to add a nil check before calling Abort():

         if req.Height > int64(c.initialHeight) {
             // abort any running OE
-            c.optimisticExec.Abort()
+            if c.optimisticExec != nil {
+                c.optimisticExec.Abort()
+            }
             //c.setState(execModeFinalize, header)
         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Since the application can get access to FinalizeBlock state and write to it,
// we must be sure to reset it in case ProcessProposal timeouts and is called
// again in a subsequent round. However, we only want to do this after we've
// processed the first block, as we want to avoid overwriting the finalizeState
// after state changes during InitChain.
if req.Height > int64(c.initialHeight) {
// abort any running OE
c.optimisticExec.Abort()
//c.setState(execModeFinalize, header)
}
// Since the application can get access to FinalizeBlock state and write to it,
// we must be sure to reset it in case ProcessProposal timeouts and is called
// again in a subsequent round. However, we only want to do this after we've
// processed the first block, as we want to avoid overwriting the finalizeState
// after state changes during InitChain.
if req.Height > int64(c.initialHeight) {
// abort any running OE
if c.optimisticExec != nil {
c.optimisticExec.Abort()
}
//c.setState(execModeFinalize, header)
}

ciCtx := contextWithCometInfo(ctx, comet.Info{
Evidence: toCoreEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
Expand All @@ -436,6 +465,17 @@ func (c *Consensus[T]) ProcessProposal(
}, nil
}

// Only execute optimistic execution if the proposal is accepted, OE is
// enabled and the block height is greater than the initial height. During
// the first block we'll be carrying state from InitChain, so it would be
// impossible for us to easily revert.
// After the first block has been processed, the next blocks will get executed
// optimistically, so that when the ABCI client calls `FinalizeBlock` the app
// can have a response ready.
if c.optimisticExec.Enabled() && req.Height > int64(c.initialHeight) {
c.optimisticExec.Execute(req)
}

Comment on lines +468 to +478
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential nil pointer dereference when checking optimisticExec.Enabled()

In ProcessProposal, c.optimisticExec.Enabled() is called without verifying if optimisticExec is nil. This may result in a runtime panic if optimisticExec is not set.

Apply this diff to add a nil check:

     if c.optimisticExec != nil && c.optimisticExec.Enabled() && req.Height > int64(c.initialHeight) {
         c.optimisticExec.Execute(req)
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Only execute optimistic execution if the proposal is accepted, OE is
// enabled and the block height is greater than the initial height. During
// the first block we'll be carrying state from InitChain, so it would be
// impossible for us to easily revert.
// After the first block has been processed, the next blocks will get executed
// optimistically, so that when the ABCI client calls `FinalizeBlock` the app
// can have a response ready.
if c.optimisticExec.Enabled() && req.Height > int64(c.initialHeight) {
c.optimisticExec.Execute(req)
}
// Only execute optimistic execution if the proposal is accepted, OE is
// enabled and the block height is greater than the initial height. During
// the first block we'll be carrying state from InitChain, so it would be
// impossible for us to easily revert.
// After the first block has been processed, the next blocks will get executed
// optimistically, so that when the ABCI client calls `FinalizeBlock` the app
// can have a response ready.
if c.optimisticExec != nil && c.optimisticExec.Enabled() && req.Height > int64(c.initialHeight) {
c.optimisticExec.Execute(req)
}

return &abciproto.ProcessProposalResponse{
Status: abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT,
}, nil
Expand All @@ -446,6 +486,29 @@ func (c *Consensus[T]) ProcessProposal(
func (c *Consensus[T]) FinalizeBlock(
ctx context.Context,
req *abciproto.FinalizeBlockRequest,
) (*abciproto.FinalizeBlockResponse, error) {
if c.optimisticExec.Initialized() {
// check if the hash we got is the same as the one we are executing
aborted := c.optimisticExec.AbortIfNeeded(req.Hash)

// Wait for the OE to finish, regardless of whether it was aborted or not
res, err := c.optimisticExec.WaitResult()

// only return if we are not aborting
if !aborted {
return res, err
}

// if it was aborted, we need to reset the state
c.optimisticExec.Reset()
}

Comment on lines +490 to +505
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Potential nil pointer dereference in FinalizeBlock

In FinalizeBlock, c.optimisticExec.Initialized() is called without checking if optimisticExec is nil. This could cause a runtime panic if optimisticExec is not initialized.

Apply this diff to add a nil check before calling Initialized():

     if c.optimisticExec != nil && c.optimisticExec.Initialized() {
         // Existing code...
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if c.optimisticExec.Initialized() {
// check if the hash we got is the same as the one we are executing
aborted := c.optimisticExec.AbortIfNeeded(req.Hash)
// Wait for the OE to finish, regardless of whether it was aborted or not
res, err := c.optimisticExec.WaitResult()
// only return if we are not aborting
if !aborted {
return res, err
}
// if it was aborted, we need to reset the state
c.optimisticExec.Reset()
}
if c.optimisticExec != nil && c.optimisticExec.Initialized() {
// check if the hash we got is the same as the one we are executing
aborted := c.optimisticExec.AbortIfNeeded(req.Hash)
// Wait for the OE to finish, regardless of whether it was aborted or not
res, err := c.optimisticExec.WaitResult()
// only return if we are not aborting
if !aborted {
return res, err
}
// if it was aborted, we need to reset the state
c.optimisticExec.Reset()
}

return c.internalFinalizeBlock(ctx, req)
Copy link
Member

@facundomedica facundomedica Nov 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't comment on the right line below, but internalFinalizeBlock should not commit to store. I think internalFinalizeBlock should return stateChanges, then everything else should be done on the actual FinalizeBlock call.
This is because there could be an equivocation and we could be running the wrong finalize block request, and this would result in us writing bad data to disk. Also, if internalFinalizeBlock finishes then ProcessProposal would be accessing data that is in the "future"

}

func (c *Consensus[T]) internalFinalizeBlock(
ctx context.Context,
req *abciproto.FinalizeBlockRequest,
) (*abciproto.FinalizeBlockResponse, error) {
if err := c.validateFinalizeBlockHeight(req); err != nil {
return nil, err
Expand Down
91 changes: 83 additions & 8 deletions server/v2/cometbft/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package cometbft

import (
"context"
"cosmossdk.io/server/v2/cometbft/oe"
"crypto/sha256"
"encoding/json"
"errors"
"io"
"strings"
"testing"
Expand Down Expand Up @@ -55,10 +57,10 @@ func getQueryRouterBuilder[T any, PT interface {
*T
proto.Message
},
U any, UT interface {
*U
proto.Message
}](
U any, UT interface {
*U
proto.Message
}](
t *testing.T,
handler func(ctx context.Context, msg PT) (UT, error),
) *stf.MsgRouterBuilder {
Expand All @@ -85,10 +87,10 @@ func getMsgRouterBuilder[T any, PT interface {
*T
transaction.Msg
},
U any, UT interface {
*U
transaction.Msg
}](
U any, UT interface {
*U
transaction.Msg
}](
t *testing.T,
handler func(ctx context.Context, msg PT) (UT, error),
) *stf.MsgRouterBuilder {
Expand Down Expand Up @@ -715,3 +717,76 @@ func assertStoreLatestVersion(t *testing.T, store types.Store, target uint64) {
require.NoError(t, err)
require.Equal(t, target, commitInfo.Version)
}

func TestOptimisticExecution(t *testing.T) {
c := setUpConsensus(t, 100_000, mempool.NoOpMempool[mock.Tx]{})
// Set up handlers
c.processProposalHandler = DefaultServerOptions[mock.Tx]().ProcessProposalHandler

// mock optimistic execution
calledTimes := 0
optimisticMockFunc := func(_ context.Context, _ *abciproto.FinalizeBlockRequest) (*abciproto.FinalizeBlockResponse, error) {
calledTimes++
return nil, errors.New("test error")
}

c.SetOptimisticExecution(oe.NewOptimisticExecution(log.NewNopLogger(), optimisticMockFunc))

_, err := c.InitChain(context.Background(), &abciproto.InitChainRequest{
Time: time.Now(),
ChainId: "test",
InitialHeight: 1,
})
require.NoError(t, err)

_, err = c.FinalizeBlock(context.Background(), &abciproto.FinalizeBlockRequest{
Time: time.Now(),
Height: 1,
Txs: [][]byte{mockTx.Bytes()},
Hash: emptyHash[:],
})
require.NoError(t, err)

theHash := sha256.Sum256([]byte("test"))
ppReq := &abciproto.ProcessProposalRequest{
Height: 2,
Hash: theHash[:],
Time: time.Now(),
Txs: [][]byte{mockTx.Bytes()},
}

// Start optimistic execution
resp, err := c.ProcessProposal(context.Background(), ppReq)
require.NoError(t, err)
require.Equal(t, resp.Status, abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT)

// Initialize FinalizeBlock with correct hash - should use optimistic result
theHash = sha256.Sum256([]byte("test"))
fbReq := &abciproto.FinalizeBlockRequest{
Height: 2,
Hash: theHash[:],
Time: ppReq.Time,
Txs: ppReq.Txs,
}
fbResp, err := c.FinalizeBlock(context.Background(), fbReq)
require.Error(t, err)
require.ErrorContains(t, err, "test error") // from optimisticMockFunc
require.Equal(t, 1, calledTimes)

resp, err = c.ProcessProposal(context.Background(), ppReq)
require.NoError(t, err)
require.Equal(t, resp.Status, abciproto.PROCESS_PROPOSAL_STATUS_ACCEPT)

theWrongHash := sha256.Sum256([]byte("wrong_hash"))
fbReq.Hash = theWrongHash[:]

// Initialize FinalizeBlock with wrong hash - should abort optimistic execution
// Because is aborted, the result comes from the normal execution
fbResp, err = c.FinalizeBlock(context.Background(), fbReq)
require.NotNil(t, fbResp)
require.NoError(t, err)
require.Equal(t, 2, calledTimes)

// Verify optimistic execution was reset
require.False(t, c.optimisticExec.Initialized())
}
160 changes: 160 additions & 0 deletions server/v2/cometbft/oe/optimistic_execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package oe

import (
"bytes"
"context"
"encoding/hex"
"math/rand"
"sync"
"time"

abci "github.com/cometbft/cometbft/api/cometbft/abci/v1"

"cosmossdk.io/log"
)

// FinalizeBlockFunc is the function that is called by the OE to finalize the
// block. It is the same as the one in the ABCI app.
type FinalizeBlockFunc func(context.Context, *abci.FinalizeBlockRequest) (*abci.FinalizeBlockResponse, error)

// OptimisticExecution is a struct that contains the OE context. It is used to
// run the FinalizeBlock function in a goroutine, and to abort it if needed.
type OptimisticExecution struct {
finalizeBlockFunc FinalizeBlockFunc // ABCI FinalizeBlock function with a context
logger log.Logger

mtx sync.Mutex
stopCh chan struct{}
request *abci.FinalizeBlockRequest
response *abci.FinalizeBlockResponse
err error
cancelFunc func() // cancel function for the context
initialized bool // A boolean value indicating whether the struct has been initialized

// debugging/testing options
abortRate int // number from 0 to 100 that determines the percentage of OE that should be aborted
}

// NewOptimisticExecution initializes the Optimistic Execution context but does not start it.
func NewOptimisticExecution(logger log.Logger, fn FinalizeBlockFunc, opts ...func(*OptimisticExecution)) *OptimisticExecution {
logger = logger.With(log.ModuleKey, "oe")
oe := &OptimisticExecution{logger: logger, finalizeBlockFunc: fn}
for _, opt := range opts {
opt(oe)
}
return oe
}

// WithAbortRate sets the abort rate for the OE. The abort rate is a number from
// 0 to 100 that determines the percentage of OE that should be aborted.
// This is for testing purposes only and must not be used in production.
func WithAbortRate(rate int) func(*OptimisticExecution) {
return func(oe *OptimisticExecution) {
oe.abortRate = rate
}
}

// Reset resets the OE context. Must be called whenever we want to invalidate
// the current OE.
func (oe *OptimisticExecution) Reset() {
oe.mtx.Lock()
defer oe.mtx.Unlock()
oe.request = nil
oe.response = nil
oe.err = nil
oe.initialized = false
}

func (oe *OptimisticExecution) Enabled() bool {
return oe != nil
}

// Initialized returns true if the OE was initialized, meaning that it contains
// a request and it was run or it is running.
func (oe *OptimisticExecution) Initialized() bool {
if oe == nil {
return false
}
oe.mtx.Lock()
defer oe.mtx.Unlock()

return oe.initialized
}

// Execute initializes the OE and starts it in a goroutine.
func (oe *OptimisticExecution) Execute(req *abci.ProcessProposalRequest) {
oe.mtx.Lock()
defer oe.mtx.Unlock()

oe.stopCh = make(chan struct{})
oe.request = &abci.FinalizeBlockRequest{
Txs: req.Txs,
DecidedLastCommit: req.ProposedLastCommit,
Misbehavior: req.Misbehavior,
Hash: req.Hash,
Height: req.Height,
Time: req.Time,
NextValidatorsHash: req.NextValidatorsHash,
ProposerAddress: req.ProposerAddress,
}

oe.logger.Debug("OE started", "height", req.Height, "hash", hex.EncodeToString(req.Hash), "time", req.Time.String())
ctx, cancel := context.WithCancel(context.Background())
oe.cancelFunc = cancel
oe.initialized = true

go func() {
start := time.Now()

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
resp, err := oe.finalizeBlockFunc(ctx, oe.request)

oe.mtx.Lock()

executionTime := time.Since(start)
oe.logger.Debug("OE finished", "duration", executionTime.String(), "height", oe.request.Height, "hash", hex.EncodeToString(oe.request.Hash))
oe.response, oe.err = resp, err

close(oe.stopCh)
oe.mtx.Unlock()
}()
Comment on lines +106 to +118

Check notice

Code scanning / CodeQL

Spawning a Go routine Note

Spawning a Go routine may be a possible source of non-determinism
}

// AbortIfNeeded aborts the OE if the request hash is not the same as the one in
// the running OE. Returns true if the OE was aborted.
func (oe *OptimisticExecution) AbortIfNeeded(reqHash []byte) bool {
if oe == nil {
return false
}

oe.mtx.Lock()
defer oe.mtx.Unlock()

if !bytes.Equal(oe.request.Hash, reqHash) {
oe.logger.Error("OE aborted due to hash mismatch", "oe_hash", hex.EncodeToString(oe.request.Hash), "req_hash", hex.EncodeToString(reqHash), "oe_height", oe.request.Height, "req_height", oe.request.Height)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Correct the logging of req_height in error message

In the error log within AbortIfNeeded, both oe_height and req_height are using oe.request.Height, which may not accurately reflect the current request's height. This could lead to confusion when diagnosing issues.

Adjust the log statement to use the correct height for req_height:

	oe.logger.Error("OE aborted due to hash mismatch",
		"oe_hash", hex.EncodeToString(oe.request.Hash),
		"req_hash", hex.EncodeToString(reqHash),
		"oe_height", oe.request.Height,
-		"req_height", oe.request.Height)
+		"req_height", currentRequestHeight)

If currentRequestHeight is not available in this context, you may need to pass the request's height as an additional parameter to AbortIfNeeded.

Committable suggestion skipped: line range outside the PR's diff.

oe.cancelFunc()
return true
Comment on lines +131 to +134
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent potential nil pointer dereference when accessing oe.request

In the AbortIfNeeded method, if oe.request is nil, accessing oe.request.Hash will cause a runtime panic. This could happen if AbortIfNeeded is called before Execute, or if Execute failed to initialize oe.request.

Consider adding a nil check for oe.request before accessing its fields:

+	if oe.request == nil {
+		oe.logger.Error("OE aborted due to missing request")
+		oe.cancelFunc()
+		return true
+	}
	if !bytes.Equal(oe.request.Hash, reqHash) {
		oe.logger.Error("OE aborted due to hash mismatch", "oe_hash", hex.EncodeToString(oe.request.Hash), "req_hash", hex.EncodeToString(reqHash), "oe_height", oe.request.Height, "req_height", oe.request.Height)
		oe.cancelFunc()
		return true
	}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if !bytes.Equal(oe.request.Hash, reqHash) {
oe.logger.Error("OE aborted due to hash mismatch", "oe_hash", hex.EncodeToString(oe.request.Hash), "req_hash", hex.EncodeToString(reqHash), "oe_height", oe.request.Height, "req_height", oe.request.Height)
oe.cancelFunc()
return true
if oe.request == nil {
oe.logger.Error("OE aborted due to missing request")
oe.cancelFunc()
return true
}
if !bytes.Equal(oe.request.Hash, reqHash) {
oe.logger.Error("OE aborted due to hash mismatch", "oe_hash", hex.EncodeToString(oe.request.Hash), "req_hash", hex.EncodeToString(reqHash), "oe_height", oe.request.Height, "req_height", oe.request.Height)
oe.cancelFunc()
return true

} else if oe.abortRate > 0 && rand.Intn(100) < oe.abortRate {
// this is for test purposes only, we can emulate a certain percentage of
// OE needed to be aborted.
oe.cancelFunc()
oe.logger.Error("OE aborted due to test abort rate")
return true
}

return false
}

// Abort aborts the OE unconditionally and waits for it to finish.
func (oe *OptimisticExecution) Abort() {
if oe == nil || oe.cancelFunc == nil {
return
}

oe.cancelFunc()
<-oe.stopCh
}

// WaitResult waits for the OE to finish and returns the result.
func (oe *OptimisticExecution) WaitResult() (*abci.FinalizeBlockResponse, error) {
<-oe.stopCh
return oe.response, oe.err
}
Loading
Loading