Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/catalyst: add locking around newpayload #25816

Merged
merged 4 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type ConsensusAPI struct {
lastNewPayloadLock sync.Mutex

forkchoiceLock sync.Mutex // Lock for the forkChoiceUpdated method
newPayloadLock sync.Mutex // Lock for the NewPayload method
}

// NewConsensusAPI creates a new consensus api for the given backend.
Expand Down Expand Up @@ -342,6 +343,22 @@ func (api *ConsensusAPI) GetPayloadV1(payloadID beacon.PayloadID) (*beacon.Execu

// NewPayloadV1 creates an Eth1 block, inserts it in the chain, and returns the status of the chain.
func (api *ConsensusAPI) NewPayloadV1(params beacon.ExecutableDataV1) (beacon.PayloadStatusV1, error) {
// The locking here is, strictly, not required. Without these locks, this can happen:
//
// 1. NewPayload( execdata-N ) is invoked from the CL. It goes all the way down to
// api.eth.BlockChain().InsertBlockWithoutSetHead, where it is blocked on
// e.g database compaction.
// 2. The call times out on the CL layer, which issues another NewPayload (execdata-N) call.
// Similarly, this also get stuck on the same place. Importantly, since the
// first call has not gone through, the early checks for "do we already have this block"
// will all return false.
// 3. When the db compaction ends, then N calls inserting the same payload are processed
// sequentially.
// Hence, we use a lock here, to be sure that the previous call has finished before we
// check whether we already have the block locally.
api.newPayloadLock.Lock()
defer api.newPayloadLock.Unlock()

log.Trace("Engine API request received", "method", "ExecutePayload", "number", params.Number, "hash", params.BlockHash)
block, err := beacon.ExecutableDataToBlock(params)
if err != nil {
Expand Down
112 changes: 107 additions & 5 deletions eth/catalyst/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"fmt"
"math/big"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -277,10 +278,12 @@ func TestEth2NewBlock(t *testing.T) {
t.Fatalf("Failed to convert executable data to block %v", err)
}
newResp, err := api.NewPayloadV1(*execData)
if err != nil || newResp.Status != "VALID" {
switch {
case err != nil:
t.Fatalf("Failed to insert block: %v", err)
}
if ethservice.BlockChain().CurrentBlock().NumberU64() != block.NumberU64()-1 {
case newResp.Status != "VALID":
t.Fatalf("Failed to insert block: %v", newResp.Status)
case ethservice.BlockChain().CurrentBlock().NumberU64() != block.NumberU64()-1:
t.Fatalf("Chain head shouldn't be updated")
}
checkLogEvents(t, newLogCh, rmLogsCh, 0, 0)
Expand All @@ -292,8 +295,8 @@ func TestEth2NewBlock(t *testing.T) {
if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil {
t.Fatalf("Failed to insert block: %v", err)
}
if ethservice.BlockChain().CurrentBlock().NumberU64() != block.NumberU64() {
t.Fatalf("Chain head should be updated")
if have, want := ethservice.BlockChain().CurrentBlock().NumberU64(), block.NumberU64(); have != want {
t.Fatalf("Chain head should be updated, have %d want %d", have, want)
}
checkLogEvents(t, newLogCh, rmLogsCh, 1, 0)

Expand Down Expand Up @@ -855,3 +858,102 @@ func TestNewPayloadOnInvalidTerminalBlock(t *testing.T) {
t.Fatalf("error sending invalid forkchoice, invalid status: %v", resp.PayloadStatus.Status)
}
}

// TestSimultaneousNewBlock does several parallel inserts, both as
// newPayLoad and forkchoiceUpdate. This is to test that the api behaves
// well even of the caller is not being 'serial'.
func TestSimultaneousNewBlock(t *testing.T) {
genesis, preMergeBlocks := generatePreMergeChain(10)
n, ethservice := startEthService(t, genesis, preMergeBlocks)
defer n.Close()

var (
api = NewConsensusAPI(ethservice)
parent = preMergeBlocks[len(preMergeBlocks)-1]
)
for i := 0; i < 10; i++ {
statedb, _ := ethservice.BlockChain().StateAt(parent.Root())
ethservice.TxPool().AddLocal(types.MustSignNewTx(testKey, types.LatestSigner(ethservice.BlockChain().Config()),
&types.DynamicFeeTx{
Nonce: statedb.GetNonce(testAddr),
Value: big.NewInt(0),
GasFeeCap: big.NewInt(2 * params.InitialBaseFee),
GasTipCap: big.NewInt(2 * params.InitialBaseFee),
ChainID: genesis.Config.ChainID,
Gas: 1000000,
To: &common.Address{99},
}))
execData, err := assembleBlock(api, parent.Hash(), &beacon.PayloadAttributesV1{
Timestamp: parent.Time() + 5,
})
if err != nil {
t.Fatalf("Failed to create the executable data %v", err)
}
// Insert it 10 times in parallel. Should be ignored.
{
var (
wg sync.WaitGroup
testErr error
errMu sync.Mutex
)
wg.Add(10)
for ii := 0; ii < 10; ii++ {
go func() {
defer wg.Done()
if newResp, err := api.NewPayloadV1(*execData); err != nil {
errMu.Lock()
testErr = fmt.Errorf("Failed to insert block: %w", err)
errMu.Unlock()
} else if newResp.Status != "VALID" {
errMu.Lock()
testErr = fmt.Errorf("Failed to insert block: %v", newResp.Status)
errMu.Unlock()
}
}()
}
wg.Wait()
if testErr != nil {
t.Fatal(testErr)
}
}
block, err := beacon.ExecutableDataToBlock(*execData)
if err != nil {
t.Fatalf("Failed to convert executable data to block %v", err)
}
if ethservice.BlockChain().CurrentBlock().NumberU64() != block.NumberU64()-1 {
t.Fatalf("Chain head shouldn't be updated")
}
fcState := beacon.ForkchoiceStateV1{
HeadBlockHash: block.Hash(),
SafeBlockHash: block.Hash(),
FinalizedBlockHash: block.Hash(),
}
{
var (
wg sync.WaitGroup
testErr error
errMu sync.Mutex
)
wg.Add(10)
// Do each FCU 10 times
for ii := 0; ii < 10; ii++ {
go func() {
defer wg.Done()
if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil {
errMu.Lock()
testErr = fmt.Errorf("Failed to insert block: %w", err)
errMu.Unlock()
}
}()
}
wg.Wait()
if testErr != nil {
t.Fatal(testErr)
}
}
if have, want := ethservice.BlockChain().CurrentBlock().NumberU64(), block.NumberU64(); have != want {
t.Fatalf("Chain head should be updated, have %d want %d", have, want)
}
parent = block
}
}