Skip to content

Commit

Permalink
feat: make handshake cancelable
Browse files Browse the repository at this point in the history
it'll make the handshake work with graceful shutdown.

see: cosmos/cosmos-sdk#16202
  • Loading branch information
yihuang committed May 18, 2023
1 parent 2e13f73 commit 76398bb
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 14 deletions.
17 changes: 13 additions & 4 deletions consensus/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package consensus

import (
"bytes"
"context"
"fmt"
"hash/crc32"
"io"
Expand Down Expand Up @@ -238,7 +239,7 @@ func (h *Handshaker) NBlocks() int {
}

// TODO: retry the handshake/replay if it fails ?
func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
func (h *Handshaker) Handshake(ctx context.Context, proxyApp proxy.AppConns) error {

// Handshake is done via ABCI Info on the query conn.
res, err := proxyApp.Query().InfoSync(proxy.RequestInfo)
Expand All @@ -265,7 +266,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
}

// Replay blocks up to the latest in the blockstore.
_, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp)
appHash, err = h.ReplayBlocks(ctx, h.initialState, appHash, blockHeight, proxyApp)
if err != nil {
return fmt.Errorf("error on replay: %v", err)
}
Expand All @@ -282,6 +283,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
// matches the current state.
// Returns the final AppHash or an error.
func (h *Handshaker) ReplayBlocks(
ctx context.Context,
state sm.State,
appHash []byte,
appBlockHeight int64,
Expand Down Expand Up @@ -390,7 +392,7 @@ func (h *Handshaker) ReplayBlocks(
// Either the app is asking for replay, or we're all synced up.
if appBlockHeight < storeBlockHeight {
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false)
return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, false)

} else if appBlockHeight == storeBlockHeight {
// We're good!
Expand All @@ -405,7 +407,7 @@ func (h *Handshaker) ReplayBlocks(
case appBlockHeight < stateBlockHeight:
// the app is further behind than it should be, so replay blocks
// but leave the last block to go through the WAL
return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true)
return h.replayBlocks(ctx, state, proxyApp, appBlockHeight, storeBlockHeight, true)

case appBlockHeight == stateBlockHeight:
// We haven't run Commit (both the state and app are one block behind),
Expand Down Expand Up @@ -435,6 +437,7 @@ func (h *Handshaker) ReplayBlocks(
}

func (h *Handshaker) replayBlocks(
ctx context.Context,
state sm.State,
proxyApp proxy.AppConns,
appBlockHeight,
Expand All @@ -461,6 +464,12 @@ func (h *Handshaker) replayBlocks(
firstBlock = state.InitialHeight
}
for i := firstBlock; i <= finalBlock; i++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

h.logger.Info("Applying block", "height", i)
block := h.store.LoadBlock(i)
// Extra check to ensure the app was not changed in a way it shouldn't have.
Expand Down
2 changes: 1 addition & 1 deletion consensus/replay_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo

handshaker := NewHandshaker(stateStore, state, blockStore, gdoc)
handshaker.SetEventBus(eventBus)
err = handshaker.Handshake(proxyApp)
err = handshaker.Handshake(context.Background(), proxyApp)
if err != nil {
cmtos.Exit(fmt.Sprintf("Error on handshake: %v", err))
}
Expand Down
9 changes: 5 additions & 4 deletions consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,8 @@ func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uin
}
})

err := handshaker.Handshake(proxyApp)
// perform the replay protocol to sync Tendermint and the application
err = handshaker.Handshake(context.Background(), proxyApp)
if expectError {
require.Error(t, err)
return
Expand Down Expand Up @@ -942,7 +943,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {

assert.Panics(t, func() {
h := NewHandshaker(stateStore, state, store, genDoc)
if err = h.Handshake(proxyApp); err != nil {
if err = h.Handshake(context.Background(), proxyApp); err != nil {
t.Log(err)
}
})
Expand All @@ -966,7 +967,7 @@ func TestHandshakePanicsIfAppReturnsWrongAppHash(t *testing.T) {

assert.Panics(t, func() {
h := NewHandshaker(stateStore, state, store, genDoc)
if err = h.Handshake(proxyApp); err != nil {
if err = h.Handshake(context.Background(), proxyApp); err != nil {
t.Log(err)
}
})
Expand Down Expand Up @@ -1239,7 +1240,7 @@ func TestHandshakeUpdatesValidators(t *testing.T) {
t.Error(err)
}
})
if err := handshaker.Handshake(proxyApp); err != nil {
if err := handshaker.Handshake(context.Background(), proxyApp); err != nil {
t.Fatalf("Error on abci handshake: %v", err)
}
// reload the state, check the validator set was updated
Expand Down
5 changes: 3 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,8 @@ func startStateSync(ssR *statesync.Reactor, bcR blockSyncReactor, conR *cs.React
}

// NewNode returns a new, ready to go, CometBFT Node.
func NewNode(config *cfg.Config,
func NewNode(ctx context.Context,
config *cfg.Config,
privValidator types.PrivValidator,
nodeKey *p2p.NodeKey,
clientCreator proxy.ClientCreator,
Expand Down Expand Up @@ -775,7 +776,7 @@ func NewNode(config *cfg.Config,
// and replays any blocks as necessary to sync CometBFT with the app.
consensusLogger := logger.With("module", "consensus")
if !stateSync {
if err := doHandshake(stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
if err := doHandshake(ctx, stateStore, state, blockStore, genDoc, eventBus, proxyApp, consensusLogger); err != nil {
return nil, err
}

Expand Down
3 changes: 2 additions & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
require.NoError(t, err)

n, err := NewNode(config,
n, err := NewNode(context.Background(),
config,
privval.LoadOrGenFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()),
nodeKey,
proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()),
Expand Down
2 changes: 1 addition & 1 deletion rpc/test/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func NewTendermint(app abci.Application, opts *Options) *nm.Node {
if err != nil {
panic(err)
}
node, err := nm.NewNode(config, pv, nodeKey, papp,
node, err := nm.NewNode(context.Background(), config, pv, nodeKey, papp,
nm.DefaultGenesisDocProviderFunc(config),
nm.DefaultDBProvider,
nm.DefaultMetricsProvider(config.Instrumentation),
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func startNode(cfg *Config) error {
return fmt.Errorf("failed to setup config: %w", err)
}

n, err := node.NewNode(cmtcfg,
n, err := node.NewNode(context.Background(), cmtcfg,
privval.LoadOrGenFilePV(cmtcfg.PrivValidatorKeyFile(), cmtcfg.PrivValidatorStateFile()),
nodeKey,
proxy.NewLocalClientCreator(app),
Expand Down

0 comments on commit 76398bb

Please sign in to comment.