Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/catalyst: build the execution payload async #24866

Merged
merged 7 commits into from
May 18, 2022
Merged
30 changes: 10 additions & 20 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,18 +188,19 @@ func (api *ConsensusAPI) ForkchoiceUpdatedV1(update beacon.ForkchoiceStateV1, pa
// sealed by the beacon client. The payload will be requested later, and we
// might replace it arbitrarily many times in between.
if payloadAttributes != nil {
log.Info("Creating new payload for sealing")
start := time.Now()

data, err := api.assembleBlock(update.HeadBlockHash, payloadAttributes)
// Create an empty block first which can be used as a fallback
empty, err := api.eth.Miner().GetSealingBlockSync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, true)
if err != nil {
return valid(nil), err
}
// Send a request to generate a full block in the background.
// The result can be obtained via the returned channel.
resChan, err := api.eth.Miner().GetSealingBlockAsync(update.HeadBlockHash, payloadAttributes.Timestamp, payloadAttributes.SuggestedFeeRecipient, payloadAttributes.Random, false)
if err != nil {
log.Error("Failed to create sealing payload", "err", err)
return valid(nil), err // valid setHead, invalid payload
return valid(nil), err
}
id := computePayloadId(update.HeadBlockHash, payloadAttributes)
api.localBlocks.put(id, data)

log.Info("Created payload for sealing", "id", id, "elapsed", time.Since(start))
api.localBlocks.put(id, &payload{empty: empty, result: resChan})
return valid(&id), nil
}
return valid(nil), nil
Expand Down Expand Up @@ -335,14 +336,3 @@ func (api *ConsensusAPI) invalid(err error) beacon.PayloadStatusV1 {
errorMsg := err.Error()
return beacon.PayloadStatusV1{Status: beacon.INVALID, LatestValidHash: &currentHash, ValidationError: &errorMsg}
}

// assembleBlock creates a new block and returns the "execution
// data" required for beacon clients to process the new block.
func (api *ConsensusAPI) assembleBlock(parentHash common.Hash, params *beacon.PayloadAttributesV1) (*beacon.ExecutableDataV1, error) {
log.Info("Producing block", "parentHash", parentHash)
block, err := api.eth.Miner().GetSealingBlock(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random)
if err != nil {
return nil, err
}
return beacon.BlockToExecutableData(block), nil
}
82 changes: 78 additions & 4 deletions eth/catalyst/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func TestEth2AssembleBlock(t *testing.T) {
blockParams := beacon.PayloadAttributesV1{
Timestamp: blocks[9].Time() + 5,
}
execData, err := api.assembleBlock(blocks[9].Hash(), &blockParams)
execData, err := assembleBlock(api, blocks[9].Hash(), &blockParams)
if err != nil {
t.Fatalf("error producing block, err=%v", err)
}
Expand All @@ -114,7 +114,7 @@ func TestEth2AssembleBlockWithAnotherBlocksTxs(t *testing.T) {
blockParams := beacon.PayloadAttributesV1{
Timestamp: blocks[8].Time() + 5,
}
execData, err := api.assembleBlock(blocks[8].Hash(), &blockParams)
execData, err := assembleBlock(api, blocks[8].Hash(), &blockParams)
if err != nil {
t.Fatalf("error producing block, err=%v", err)
}
Expand Down Expand Up @@ -273,7 +273,7 @@ func TestEth2NewBlock(t *testing.T) {
tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey)
ethservice.TxPool().AddLocal(tx)

execData, err := api.assembleBlock(parent.Hash(), &beacon.PayloadAttributesV1{
execData, err := assembleBlock(api, parent.Hash(), &beacon.PayloadAttributesV1{
Timestamp: parent.Time() + 5,
})
if err != nil {
Expand Down Expand Up @@ -313,7 +313,7 @@ func TestEth2NewBlock(t *testing.T) {
)
parent = preMergeBlocks[len(preMergeBlocks)-1]
for i := 0; i < 10; i++ {
execData, err := api.assembleBlock(parent.Hash(), &beacon.PayloadAttributesV1{
execData, err := assembleBlock(api, parent.Hash(), &beacon.PayloadAttributesV1{
Timestamp: parent.Time() + 6,
})
if err != nil {
Expand Down Expand Up @@ -527,3 +527,77 @@ func TestExchangeTransitionConfig(t *testing.T) {
t.Fatalf("expected no error on valid config, got %v", err)
}
}

func TestEmptyBlocks(t *testing.T) {
genesis, preMergeBlocks := generatePreMergeChain(10)
n, ethservice := startEthService(t, genesis, preMergeBlocks)
ethservice.Merger().ReachTTD()
defer n.Close()
var (
api = NewConsensusAPI(ethservice)
parent = ethservice.BlockChain().CurrentBlock()
// This EVM code generates a log when the contract is created.
logCode = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
)
for i := 0; i < 10; i++ {
statedb, _ := ethservice.BlockChain().StateAt(parent.Root())
nonce := statedb.GetNonce(testAddr)
tx, _ := types.SignTx(types.NewContractCreation(nonce, new(big.Int), 1000000, big.NewInt(2*params.InitialBaseFee), logCode), types.LatestSigner(ethservice.BlockChain().Config()), testKey)
ethservice.TxPool().AddLocal(tx)

params := beacon.PayloadAttributesV1{
Timestamp: parent.Time() + 1,
Random: crypto.Keccak256Hash([]byte{byte(i)}),
SuggestedFeeRecipient: parent.Coinbase(),
}

fcState := beacon.ForkchoiceStateV1{
HeadBlockHash: parent.Hash(),
SafeBlockHash: common.Hash{},
FinalizedBlockHash: common.Hash{},
}
resp, err := api.ForkchoiceUpdatedV1(fcState, &params)
if err != nil {
t.Fatalf("error preparing payload, err=%v", err)
}
if resp.PayloadStatus.Status != beacon.VALID {
t.Fatalf("error preparing payload, invalid status: %v", resp.PayloadStatus.Status)
}
payload, err := api.GetPayloadV1(*resp.PayloadID)
if err != nil {
t.Fatalf("can't get payload: %v", err)
}
// TODO(493456442, marius) this test can be flaky since we rely on a 100ms
// allowance for block generation internally.
if len(payload.Transactions) == 0 {
t.Fatalf("payload should not be empty")
}
execResp, err := api.NewPayloadV1(*payload)
if err != nil {
t.Fatalf("can't execute payload: %v", err)
}
if execResp.Status != beacon.VALID {
t.Fatalf("invalid status: %v", execResp.Status)
}
fcState = beacon.ForkchoiceStateV1{
HeadBlockHash: payload.BlockHash,
SafeBlockHash: payload.ParentHash,
FinalizedBlockHash: payload.ParentHash,
}
if _, err := api.ForkchoiceUpdatedV1(fcState, nil); err != nil {
t.Fatalf("Failed to insert block: %v", err)
}
if ethservice.BlockChain().CurrentBlock().NumberU64() != payload.Number {
t.Fatalf("Chain head should be updated")
}
parent = ethservice.BlockChain().CurrentBlock()
}
}

func assembleBlock(api *ConsensusAPI, parentHash common.Hash, params *beacon.PayloadAttributesV1) (*beacon.ExecutableDataV1, error) {
block, err := api.eth.Miner().GetSealingBlockSync(parentHash, params.Timestamp, params.SuggestedFeeRecipient, params.Random, false)
if err != nil {
return nil, err
}
return beacon.BlockToExecutableData(block), nil
}
49 changes: 43 additions & 6 deletions eth/catalyst/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package catalyst

import (
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/beacon"
Expand All @@ -34,11 +35,47 @@ const maxTrackedPayloads = 10
// latest one; but have a slight wiggle room for non-ideal conditions.
const maxTrackedHeaders = 10

// payload wraps the execution payload generated by miner
MariusVanDerWijden marked this conversation as resolved.
Show resolved Hide resolved
type payload struct {
lock sync.Mutex
done bool
empty *types.Block
block *types.Block
result chan *types.Block
}

// resolve extracts the generated full block from the given channel if possible
// or fallback to empty block as an alternative.
func (req *payload) resolve() *beacon.ExecutableDataV1 {
// this function can be called concurrently, prevent any
// concurrency issue in the first place.
req.lock.Lock()
defer req.lock.Unlock()

// Try to resolve the full block first if it's not obtained
// yet. The returned block can be nil if the generation fails.
if !req.done {
select {
case req.block = <-req.result:
req.done = true
case <-time.NewTimer(time.Millisecond * 100).C:
// TODO(rjl49345642, Marius), should we keep this
// 100ms timeout allowance? Why not just use the
// default and then fallback to empty directly?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the idea here is that if the beacon client insta-asks us for a block, we should have a chance to provide something "meaningful". In that case, I think 500ms would be preferable, since a block processing on mainnet takes 200-300ms, and mining is even more expensive due to no pre-fetching.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, also, new time used like this will leak goroutines. You should do it prior to select:

timeout := time.NewTimer(500 * time.Millisecond)
defer timeout.Stop()

select {
...
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also always use the number first and then * time.Millisecond. The reverse way used above reads weird.

Copy link
Member Author

@MariusVanDerWijden MariusVanDerWijden May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats the asian way, from right to left

}
}
block := req.empty
if req.block != nil {
block = req.block
}
return beacon.BlockToExecutableData(block)
MariusVanDerWijden marked this conversation as resolved.
Show resolved Hide resolved
}

// payloadQueueItem represents an id->payload tuple to store until it's retrieved
// or evicted.
type payloadQueueItem struct {
id beacon.PayloadID
payload *beacon.ExecutableDataV1
id beacon.PayloadID
data *payload
}

// payloadQueue tracks the latest handful of constructed payloads to be retrieved
Expand All @@ -57,14 +94,14 @@ func newPayloadQueue() *payloadQueue {
}

// put inserts a new payload into the queue at the given id.
func (q *payloadQueue) put(id beacon.PayloadID, data *beacon.ExecutableDataV1) {
func (q *payloadQueue) put(id beacon.PayloadID, data *payload) {
q.lock.Lock()
defer q.lock.Unlock()

copy(q.payloads[1:], q.payloads)
q.payloads[0] = &payloadQueueItem{
id: id,
payload: data,
id: id,
data: data,
}
}

Expand All @@ -78,7 +115,7 @@ func (q *payloadQueue) get(id beacon.PayloadID) *beacon.ExecutableDataV1 {
return nil // no more items
}
if item.id == id {
return item.payload
return item.data.resolve()
}
}
return nil
Expand Down
30 changes: 24 additions & 6 deletions miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,14 +235,32 @@ func (miner *Miner) DisablePreseal() {
miner.worker.disablePreseal()
}

// GetSealingBlock retrieves a sealing block based on the given parameters.
// The returned block is not sealed but all other fields should be filled.
func (miner *Miner) GetSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*types.Block, error) {
return miner.worker.getSealingBlock(parent, timestamp, coinbase, random)
}

// SubscribePendingLogs starts delivering logs from pending transactions
// to the given channel.
func (miner *Miner) SubscribePendingLogs(ch chan<- []*types.Log) event.Subscription {
return miner.worker.pendingLogsFeed.Subscribe(ch)
}

// GetSealingBlockAsync requests to generate a sealing block according to the
// given parameters. Regardless of whether the generation is successful or not,
// there is always a result that will be returned through the result channel.
// The difference is that if the execution fails, the returned result is nil
// and the concrete error is dropped silently.
func (miner *Miner) GetSealingBlockAsync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (chan *types.Block, error) {
resChan, _, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs)
if err != nil {
return nil, err
}
return resChan, nil
}

// GetSealingBlockSync creates a sealing block according to the given parameters.
// If the generation is failed or the underlying work is already closed, an error
// will be returned.
func (miner *Miner) GetSealingBlockSync(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (*types.Block, error) {
resChan, errChan, err := miner.worker.getSealingBlock(parent, timestamp, coinbase, random, noTxs)
if err != nil {
return nil, err
}
return <-resChan, <-errChan
}
35 changes: 20 additions & 15 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ type newWorkReq struct {
// getWorkReq represents a request for getting a new sealing work with provided parameters.
type getWorkReq struct {
params *generateParams
err error
result chan *types.Block
result chan *types.Block // non-blocking channel
err chan error
}

// intervalAdjust represents a resubmitting interval adjustment.
Expand Down Expand Up @@ -536,12 +536,12 @@ func (w *worker) mainLoop() {
case req := <-w.getWorkCh:
block, err := w.generateWork(req.params)
if err != nil {
req.err = err
req.err <- err
req.result <- nil
} else {
req.err <- nil
req.result <- block
}

case ev := <-w.chainSideCh:
// Short circuit for duplicate side blocks
if _, exist := w.localUncles[ev.Block.Hash()]; exist {
Expand Down Expand Up @@ -969,6 +969,7 @@ type generateParams struct {
random common.Hash // The randomness generated by beacon chain, empty before the merge
noUncle bool // Flag whether the uncle block inclusion is allowed
noExtra bool // Flag whether the extra field assignment is allowed
noTxs bool // Flag whether an empty block without any transaction is expected
}

// prepareWork constructs the sealing task according to the given parameters,
Expand Down Expand Up @@ -1090,8 +1091,9 @@ func (w *worker) generateWork(params *generateParams) (*types.Block, error) {
}
defer work.discard()

w.fillTransactions(nil, work)

if !params.noTxs {
w.fillTransactions(nil, work)
}
return w.engine.FinalizeAndAssemble(w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
}

Expand Down Expand Up @@ -1128,7 +1130,6 @@ func (w *worker) commitWork(interrupt *int32, noempty bool, timestamp int64) {
work.discard()
return
}

w.commit(work.copy(), w.fullTaskHook, true, start)

// Swap out the old work with the new one, terminating any leftover
Expand Down Expand Up @@ -1177,7 +1178,13 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti
}

// getSealingBlock generates the sealing block based on the given parameters.
func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash) (*types.Block, error) {
// The generation result will be passed back via the given channel no matter
// the generation itself succeeds or not.
func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase common.Address, random common.Hash, noTxs bool) (chan *types.Block, chan error, error) {
var (
resChan = make(chan *types.Block, 1)
MariusVanDerWijden marked this conversation as resolved.
Show resolved Hide resolved
errChan = make(chan error, 1)
)
req := &getWorkReq{
params: &generateParams{
timestamp: timestamp,
Expand All @@ -1187,18 +1194,16 @@ func (w *worker) getSealingBlock(parent common.Hash, timestamp uint64, coinbase
random: random,
noUncle: true,
noExtra: true,
noTxs: noTxs,
},
result: make(chan *types.Block, 1),
result: resChan,
err: errChan,
}
select {
case w.getWorkCh <- req:
block := <-req.result
if block == nil {
return nil, req.err
}
return block, nil
return resChan, errChan, nil
case <-w.exitCh:
return nil, errors.New("miner closed")
return nil, nil, errors.New("miner closed")
}
}

Expand Down
8 changes: 6 additions & 2 deletions miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,9 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co

// This API should work even when the automatic sealing is not enabled
for _, c := range cases {
block, err := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random)
resChan, errChan, _ := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random, false)
block := <-resChan
err := <-errChan
if c.expectErr {
if err == nil {
t.Error("Expect error but get nil")
Expand All @@ -654,7 +656,9 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co
// This API should work even when the automatic sealing is enabled
w.start()
for _, c := range cases {
block, err := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random)
resChan, errChan, _ := w.getSealingBlock(c.parent, timestamp, c.coinbase, c.random, false)
block := <-resChan
err := <-errChan
if c.expectErr {
if err == nil {
t.Error("Expect error but get nil")
Expand Down